有没有办法将 Kafka Connect 与 REST 代理一起使用?

问题描述

Kafka Connect 源和接收器连接器为配置数据管道提供了几乎理想的功能集,而无需编写任何代码。就我而言,我想用它来集成来自公共 Internet 上的多个数据库服务器(生产者)的数据。

然而,一些生产者不能直接访问 Kafka 代理,因为他们的网络/防火墙配置只允许流量到特定主机(端口 443)。不幸的是,我无法真正更改这些设置。

我的想法是使用 Confluent REST 代理,但我了解到 Kafka Connect 使用 KafkaProducer API,因此它需要直接访问代理。

我找到了几个可能的解决方法,但没有一个是完美的:

  1. SSH 隧道 - 如:Consume from a Kafka Cluster through SSH Tunnel
  2. 中所述
  3. 使用 REST 代理,但将 Kafka Connect 替换为自定义生产者,在 How can we configure kafka producer behind a firewall/proxy?
  4. 中提到
  5. 使用SSHL解复用器将流量路由到代理(但只有一个代理)

有没有人遇到过类似的挑战?你是怎么解决的?

解决方法

正如@OneCricketeer 推荐的那样,我尝试了使用 REST 代理方法的 HTTP 接收器连接器。 我设法配置了 Confluent HTTP Sink 连接器以及替代连接器 (github.com/llofberg/kafka-connect-rest) 以使用 Confluent REST 代理。

我正在添加连接器配置,以防它为尝试这种方法的任何人节省一些时间。

Confluent HTTP Sink 连接器

cars

Kafka Connect REST 连接器

    {
    "name": "connector-sink-rest","config": {
        "topics": "test","tasks.max": "1","connector.class": "io.confluent.connect.http.HttpSinkConnector","headers": "Content-Type:application/vnd.kafka.json.v2+json","http.api.url": "http://rest:8082/topics/test","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter.schemas.enable": "false","batch.prefix": "{\"records\":[","batch.suffix": "]}","batch.max.size": "1","regex.patterns":"^~$","regex.replacements":"{\"value\":~}","regex.separator":"~","confluent.topic.bootstrap.servers": "localhost:9092","confluent.topic.replication.factor": "1"
    }
}
,

接收器连接器(写入外部系统的连接器)不使用生产者 API。

话虽如此,您可以使用一些 HTTP Sink 连接器向 REST 代理端点发出 POST 请求。这并不理想,但可以解决问题。注意:这意味着您有两个集群 - 您正在使用其中一个集群以通过 Connect 发出 HTTP 请求,另一个在代理后面。


总的来说,我不明白这个问题对 Connect 有何独特之处,因为任何其他尝试通过唯一开放的 HTTPS 端口将数据写入 Kafka 都会遇到类似问题。