如何将 KAFKA 的属性外部化为不同的类并将其调用到主类?

问题描述

我一直在尝试使用抽象 OOP 来外部化属性代码,这样我就可以将它调用到生产者类,但我似乎无法调用它。这是代码,任何帮助将不胜感激。

public class Producer{

    private static final Logger logger = LogManager.getLogger(Producer.class);
    public static void main(String[] args) {
        logger.info("Creating Kafka Producer...");

        KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs().prodProps());

        logger.info("Start sending messages...");

        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<>(AppConfigs.topicName,i,"Message " + i + " Test"),new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata,Exception e) {
                    if(e == null){
                        logger.info("\nReceived Metadata" + " Topic:" + recordMetadata.topic() + " Partition: " + recordMetadata.partition() + " Offset: " + recordMetadata.offset() + " Time: " + recordMetadata.timestamp() + "\n");
                    } else {
                        logger.error("Error",e);
                    }
                }
            });
        }

        logger.info("Finished - Closing Kafka Producer.");
        producer.flush();
        producer.close();

    }
}

我想将所有 props.setProperty 转移到不同的类,然后将其调用到生产者类。这就是我想做的:

package org.timothy.producer.common;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class PropConfigs {

    public static Properties prodProps(){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.CLIENT_ID_CONfig,AppConfigs.applicationID);
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,AppConfigs.bootstrapServers);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,IntegerSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONfig,"all");
        props.setProperty(ProducerConfig.RETRIES_CONfig,"3");
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");

        return new Properties(props);
    }

}

如何将其调用到主类?或者也许将其应用于: Properties props = new Properties(); KafkaProducer producer = new KafkaProducer(props);

解决方法

 KafkaProducer<Integer,String> producer = new KafkaProducer<>((new PropConfigs()).prodProps()); 

或者将它们设为静态...

 KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs.prodProps()); 

https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java