问题描述
node 用于我的 nodejs 代码。我有一个用于请求/响应的 api。
首先,我发出一个请求 http://localhost:3000/number1
,在我启动一个消费者从一个 kafka 主题和一个分区“接收”消费消息之后,我尝试找到 id = number1
的消息。在我想使用此值向用户返回响应之后。所以我创建了一个像下面这样的消费者:
options = {
kafkaHost: 'kafka:9092'
}
const client_node = new kafka_node.KafkaClient(options);
var Consumer = kafka_node.Consumer
var consumer_node = new Consumer(
client_node,[
{ topic: 'receive.kafka.entities',partition: 0,offset: 0}
],{
autoCommit: false,fetchMaxWaitMs: 100,fromOffset: 'earliest',groupId: 'kafka-node-group',asyncpush: false,}
);
const read = (callback)=>{
let ret = "1"
consumer_node.on('message',async function (message) {
var parse1 = JSON.parse(message.value)
var parse2 = JSON.parse(parse1.payload)
var id = parse2.fullDocument.id
var lastOffset = message.highWaterOffset - 1
//check if there is a query
if(lastOffset <= message.offset || ret !== "1"){
return callback(ret)
}
else if(id === back2){
ret = parse2.fullDocument
}
});
}
let error = {
id: "The entity " + back2 + " not found "
}
read((data)=>{
consumer_node.close(true,function(message){
if(data != "1"){
res.status(200).send(data)
}
else{
res.status(404).send(error)
}
})
})
如果我尝试发出一个连续的请求,在第一个请求之后我会得到响应:
{
"message": "broker not available (loadMetadataForTopics)"
}
我的 Docker-compose file1 如下:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
container_name: stellio-zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- default
- localnet
kafka:
image: confluentinc/cp-enterprise-kafka:latest
container_name: kafka
ports:
- 9092:9092
- 9101:9101
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_Security_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_broKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_broKER_ID: 1
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on:
- zookeeper
networks:
- default
- localnet
- my-proxy-net-kafka
networks:
default: # this network (app2)
driver: bridge
my-proxy-net-kafka:
external:
name: kafka_network
Docker-compose file2
app:
container_name: docker-node
hostname: docker-node
restart: always
build: .
command: nodemon /usr/src/app/index.js
networks:
- default
- proxynet-kafka
ports:
- '3000:3000'
volumes:
- .:/usr/src/app
networks:
default:
driver: bridge
proxynet-kafka:
name: kafka_network
为什么会这样?你能帮我解决这个问题吗?
[如果您想了解更多信息,请随时问我:)]
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)