WARN org.apache.kafka.clients.NetworkClient-[生产者clientId = producer-1]引导代理127.0.0.1:9092标识:-1机架:空已断开连接

问题描述

我正在尝试使用Java程序使Kafka制作人。但是当我运行程序时,我得到一些警告,没有任何错误,但是生产者没有发送数据,并且警告如下。

sympy

POM.XML文件

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) Could not be established. broker may not be available.

First_producer.java文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>KafkaProject</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>

    </dependencies>

</project>

Shell中的消费者命令

package Kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class First_Producer {
    public static void main(String[] args) {


        String bootstrapServer = "127.0.0.1:9092";

        //create producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServer);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());

        //create the producer
        KafkaProducer<String,String> producer= new KafkaProducer<String,String>(properties);

        //create a producer record
        ProducerRecord<String,String>  record = new ProducerRecord<String,String>("first_topic","hello_world");

        //send data
        producer.send(record);
        producer.flush();
        producer.close();

    }
}


任何人都可以帮助我解决这个问题吗?

解决方法

您的日志显示:

经纪人可能不可用。

使用属性侦听器的正确IP /主机名更新config/server.properties文件。

listeners=PLAINTEXT://X.X.X.X:9092

并使用与您的生产者配置相同的IP和端口配置

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG

如果使用iptable / firewall配置在服务器外部运行生产者,请确保将端口号从服务器外部访问。