问题描述
我已经设置了Kafka的Single broker实例以及Zookeeper,Kafka-tools,Schema-registry和control-center.setup是使用docker compose和Confluent提供的图像完成的。这是docker-compose的外观喜欢:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
broker:
image: confluentinc/cp-server:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_broKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_Security_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
kafka-tools:
image: confluentinc/cp-kafka:latest
hostname: kafka-tools
container_name: kafka-tools
command: ["tail","-f","/dev/null"]
network_mode: "host"
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
尝试使用confluent-kafka python客户端创建Producer和Consumer应用程序。以下是我的Kafka生产者的代码:
from datetime import datetime
import os
import json
from uuid import uuid4
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
class BotExecutionProducer(object):
"""
Class represents the
Bot execution Stats
"""
def __init__(self,ticketId,accountId,executionTime,status):
self.ticketId = ticketId
self.accountId = accountId
self.executionTime = executionTime
self.timestamp = str(datetime.Now())
self.status = status
def botexecution_to_dict(self,botexecution,ctx):
"""
Returns a Dict representation of the
KafkaBotExecution instance
botexecution : KafkaBotExecution instance
ctx: SerializaionContext
"""
return dict(ticketId=self.ticketId,accountId=self.accountId,executionTime=self.executionTime,timestamp=self.timestamp,status=self.status
)
def delivery_report(self,err,msg):
"""
Reports the failure or success of a message delivery.
"""
if err is not None:
print("Delivery Failed for User record {}: {}".format(msg.key(),err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(),msg.topic(),msg.partition(),msg.offset()))
def send(self):
"""
Will connect to Kafka broker
validate and send the message
"""
topic = "bots.execution"
schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#","title": "BotExecutions","description": "BotExecution Stats","type": "object","properties": {
"ticketId": {
"description": "Ticket ID","type": "string"
},"accountId": {
"description": "Customer's AccountID","type": "string"
},"executionTime": {
"description": "Bot Execution time in seconds","type": "number"
},"timestamp": {
"description": "Timestamp","status": {
"description": "Execution Status","type": "string"
}
},"required": [ "ticketId","accountId","executionTime","timestamp","status"]
}
"""
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
json_serializer = JSONSerializer(schema_str,schema_registry_client,self.botexecution_to_dict)
producer_conf = {
'bootstrap.servers': "localhost:9092",'key.serializer': StringSerializer('utf_8'),'value.serializer': json_serializer,'acks': 0,}
producer = SerializingProducer(producer_conf)
print(f'Producing records to topic {topic}')
producer.poll(0.0)
try:
print(self)
producer.produce(topic=topic,key=str(uuid4()),partition=1,value=self,on_delivery=self.delivery_report)
except ValueError:
print("Invalid Input,discarding record.....")
现在,当我执行代码时,它应该创建一个Kafka主题并将JSON数据推送到该主题,但这似乎不起作用,它总是显示错误,当只有一个时,将复制因子指定为3 broker.is是否可以在上面的代码中定义复制因子。当使用Kafka cli进行复制时,kafka borker可以完美地工作。 有什么我想念的吗?
解决方法
我不确定cp-server和cp-kafka映像之间的全部区别,但是您可以为自动创建的主题的默认复制因子添加一个变量
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
如果这不起作用,请导入并使用AdminClient