问题描述
我一直在尝试使用抽象 OOP 来外部化属性代码,这样我就可以将它调用到生产者类,但我似乎无法调用它。这是代码,任何帮助将不胜感激。
public class Producer{
private static final Logger logger = LogManager.getLogger(Producer.class);
public static void main(String[] args) {
logger.info("Creating Kafka Producer...");
KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs().prodProps());
logger.info("Start sending messages...");
for (int i = 1; i <= AppConfigs.numEvents; i++) {
producer.send(new ProducerRecord<>(AppConfigs.topicName,i,"Message " + i + " Test"),new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e) {
if(e == null){
logger.info("\nReceived Metadata" + " Topic:" + recordMetadata.topic() + " Partition: " + recordMetadata.partition() + " Offset: " + recordMetadata.offset() + " Time: " + recordMetadata.timestamp() + "\n");
} else {
logger.error("Error",e);
}
}
});
}
logger.info("Finished - Closing Kafka Producer.");
producer.flush();
producer.close();
}
}
我想将所有 props.setProperty 转移到不同的类,然后将其调用到生产者类。这就是我想做的:
package org.timothy.producer.common;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class PropConfigs {
public static Properties prodProps(){
Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONfig,AppConfigs.applicationID);
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,AppConfigs.bootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONfig,"all");
props.setProperty(ProducerConfig.RETRIES_CONfig,"3");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
return new Properties(props);
}
}
如何将其调用到主类?或者也许将其应用于:
Properties props = new Properties();
KafkaProducer
解决方法
KafkaProducer<Integer,String> producer = new KafkaProducer<>((new PropConfigs()).prodProps());
或者将它们设为静态...
KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs.prodProps());
https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java