问题描述
我正在尝试将 Confluent kafka 代理 api 包装在一个可以处理生产和消费的类中。
按照此链接:https://docs.confluent.io/platform/current/kafka-rest/api.html 我尝试按如下方式实现它:
def send(self,topic,data):
try:
r = requests.post(self._url('/topics/' + topic),json=data,headers=headers_v2)
if not r.ok:
raise Exception("Error: ",r.reason)
except Exception as e:
print(" ")
print('Event streams send request Failed')
print(Exception,e)
print(" ")
return e
但我最终使用了 2 个版本的 api (v2/v3),因为我没有在一个实现中找到一些 api,反之亦然...
比如我在v2中没有找到如何创建topic,就用v3实现了。
我现在的问题是 send
方法,我得到 Internal server error
,但我找不到原因!
也许是因为创建主题是使用 v3 完成的,而我正在尝试使用 v2 生成消息。
解决方法
我将发送的数据负载更改为:data = {"records": [{"value": data}]}
并发送通过,
使用时轮询通过:r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'),headers={'Accept': 'application/vnd.kafka.json.v2+json'})