初衷:
至于为什么写这个,因为我前面在本地虚拟机部署是完全没问题的,但是由于迁移到阿里云服务器,存在公网ip和私有ip的区别,
很多以前配置的潜在问题就暴露出来了~~主要是conf文件夹下的server.properties的配置需要注意
注意点:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
集群的话,这个id会不同~~
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://xx.xxx.xxx.xxx:9092
如果这里配置的是你买的服务器的公网ip,会直接启动不了kafka。
org.apache.kafka.common.KafkaException: Socket server failed to bind to xx.xxx.xxx.xxx:9092: Cannot assign requested address.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:450)
at kafka.network.Acceptor.<init>(SocketServer.scala:340)
at kafka.network.SocketServer$$anonfun$createAcceptorAndProcessors$1.apply(SocketServer.scala:146)
at kafka.network.SocketServer$$anonfun$createAcceptorAndProcessors$1.apply(SocketServer.scala:142)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.createAcceptorAndProcessors(SocketServer.scala:142)
at kafka.network.SocketServer.startup(SocketServer.scala:91)
at kafka.server.KafkaServer.startup(KafkaServer.scala:250)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:75)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Cannot assign requested address
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:446)
... 11 more
但是如果是虚拟机(直接写设置的ip值)是不会出现任何问题的~~
上面需要配置成 listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
然而如果只是单纯配置第二条注意的点,是启动不了消费者的~~ 因为连接不到服务,会报超时错误,见下面的客户端代码和错误描述
要配置advertised.listeners
advertised.listeners=PLAINTEXT://xx.xxx.xxx.xxx:9092
客户端代码:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient({kafkaHost: 'xx.xxx.xxx.xxx:9092'}),
consumer = new Consumer(
client,
[
{ topic: 'Processing', partition: 0 }
],
{
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log(err);
});
超时报错:
Request timed out after 30000ms
TimeoutError: Request timed out after 30000ms at new TimeoutError