通过 Fabric8io kubernetes-client 加载 yaml Kafka 配置时出现 JsonMappingException

问题描述

我在使用 fabric8io kubernetes-client 时遇到问题。

我想要的是:在 Kubernetes 中使用 Stramzi 运算符创建 Kafka 集群。 如果我使用 CLI 和 kubectl 执行 Strimzi quickstart guide 中的所有步骤,一切都很好。

但是当我从 Java 代码中使用 kubernetes-client:5.2.1 库加载 yaml 资源时,发生了异常:

io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:53)
  at io.fabric8.kubernetes.client.utils.Serialization.unmarshal(Serialization.java:140)
  at io.fabric8.kubernetes.client.utils.Serialization.unmarshal(Serialization.java:101)
  at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.<init>(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:272)
  at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.<init>(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:252)
  at io.fabric8.kubernetes.client.DefaultKubernetesClient$1.<init>(DefaultKubernetesClient.java:175)
  at io.fabric8.kubernetes.client.DefaultKubernetesClient.load(DefaultKubernetesClient.java:175)
  at app.test.main(test.java:38)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No resource type found for:kafka.strimzi.io/v1#Kafka
 at [Source: (String)"{"apiVersion":"kafka.strimzi.io/v1","kind":"Kafka","Metadata":{"name":"my-cluster"},"spec":{"kafka":{"replicas":1,"listeners":[{"name":"plain","port":9092,"type":"internal","tls":false},{"name":"tls","port":9093,"tls":true,"authentication":{"type":"tls"}},{"name":"external","port":9094,"type":"nodeport","tls":false}],"storage":{"type":"jbod","volumes":[{"id":0,"type":"persistent-claim","size":"1Gi","deleteClaim":false}]},"config":{"offsets.topic.replication.factor":1,"transacti"[truncated 226 chars]; line: 1,column: 726]

加载此文件时出现问题:yaml config

我加载资源如下:

 KubernetesClient client = new DefaultKubernetesClient();
InputStream is = ...;
client.load(is).inNamespace("my_namespace").createOrReplace();

请帮帮我!

P.S:对不起我的英语。

解决方法

我来自 Fabric8 团队。 Kafka 是自定义资源,这意味着它的模型未在 KubernetesClient 中注册,因此这就是您面临来自 KubenetesClient 的 No resource type found for:kafka.strimzi.io/v1#Kafka 错误的原因。 KubernetesClient 提供了两种处理自定义资源的方法:

  1. 无类型 API - 使用 CustomResources 作为原始 Hashmaps
  2. Typed API - 为 CustomResource 类型提供 POJO

我将提供使用这两种 API 加载 Kafka yaml 片段的示例。

无类型 API:

对于 Typeless API,您需要提供一个 CustomResourceDefinitionContext,一个包含 CustomResource 组、版本、种类、复数等详细信息的对象。它的外观如下: KafkaLoadTypeless.java

try (KubernetesClient client = new DefaultKubernetesClient()) {
    CustomResourceDefinitionContext context = new CustomResourceDefinitionContext.Builder()
            .withScope("Namespaced")
            .withVersion("v1beta2")
            .withGroup("kafka.strimzi.io")
            .withPlural("kafkas")
            .build();

    InputStream is = KafkaLoadTypeles.class.getResourceAsStream("/test-kafka.yml");

    Map<String,Object> createdKafka = client.customResource(context).inNamespace("default").createOrReplace(is);
} catch (IOException ioException) {
    ioException.printStackTrace();
}

类型化 API:

对于类型化 API,您需要为 Kafka 资源提供类型。您可以在 Strimzi API 依赖项中找到这些类型。我必须在我的项目中添加这个依赖项才能使用 Kafka 类:

        <dependency>
            <groupId>io.strimzi</groupId>
            <artifactId>api</artifactId>
            <version>0.22.0</version>
            <exclusions>
                <exclusion>
                    <groupId>io.fabric8</groupId>
                    <artifactId>kubernetes-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

添加后,我可以像这样使用 Typed API: KafkaLoadTyped.java

try (KubernetesClient client = new DefaultKubernetesClient()) {
    MixedOperation<Kafka,KafkaList,Resource<Kafka>> kafkaClient = client.customResources(Kafka.class,KafkaList.class);

    InputStream is = KafkaLoadTyped.class.getResourceAsStream("/test-kafka.yml");
    Kafka myClusterkafka = kafkaClient.load(is).get();
    kafkaClient.inNamespace("default").createOrReplace(myClusterkafka);
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...