如何在 Amazon MSK 上运行 Kafka Camel 连接器

问题描述

背景: 我按照此链接 on setting up AWS MSK and testing a producer and consumer 进行了设置并正常工作。我能够通过 2 个单独的 EC2 实例发送和接收消息,这些实例都使用相同的 Kafka 集群(我的 MSK 集群)。现在,我想建立一个从 Eventhubs 到 AWS Firehose 的数据管道,其格式如下:

Azure Eventhub -> Eventhub-to-Kafka Camel 连接器 -> AWS MSK -> Kafka-to-Kinesis-Firehose Camel 连接器 -> AWS Kinesis Firehose

我能够在不使用 MSK(通过普通的旧 Kafka)的情况下成功地做到这一点,但出于未说明的原因,现在需要使用 MSK,但我无法让它工作。

问题: 在尝试启动 AWS MSK 和我正在使用的两个 Camel 连接器之间的连接器时,我收到以下错误

Bug

这是有问题的两个连接器:

  1. AWS Kinesis Firehose to Kafka Connector (Kafka -> Consumer)
  2. Azure Eventhubs to Kafka Connector (Producer -> Kafka)

目标:让这些连接器与 MSK 一起工作,就像他们直接与 Kafka 一起工作时那样。

以下是 Firehose 的问题:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

这是 Azure 的一个

[2021-05-04 14:09:56,848] WARN Load balancing for event processor Failed - If you are using a StorageSharedKeyCredential,and the server returned an error message that says 'Signature did not match',you can compare the string to sign with the one generated by the SDK. To log the string to sign,pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token,pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403,"<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)

解决方法

MSK 不提供 Kafka Connect 作为服务。您需要在自己的计算机或其他 AWS 计算资源上安装它。从那里,您需要安装 Camel 连接器插件

,

Kafka Connect 是一个与 Kafka(MSK、开源或任何其他 kafka 发行版)配合使用的框架。但是,它不带有任何连接器。 Kafka Connect 与开源 kafka 捆绑在一起。

作为最佳实践,永远不要在与代理节点相同的服务器上运行 kafka connect。因为他们共享二进制文件。调整代理可能会导致 kafka 代理出现意外问题。此外,Kafka Connect 应用程序是应用程序,您不会在同一节点上运行 kafka 消费者或生产者应用程序。因此,创建一个 EC2 实例并在那里部署 kafka 连接。

使用 TLS - 如果您启用客户端 TLS 身份验证 - 您需要查找 boostrap_broker_tls。