问题描述
我想使用融合的 Kafka 库“github.com/confluentinc/confluent-kafka-go/kafka”在 Golang 中配置具有 Kerberos 身份验证的 Kafka 消费者。还需要使用 Avro 类型的 Kafka 消息读取消息。 在此处获取 Java 代码。
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
public class KerberosConsumer {
static final Logger logger = Logger.getLogger(AvroConsumer.class);
public static void main(String[] args) throws InterruptedException,FileNotFoundException,IOException {
PropertyConfigurator.configure(args[0]);
Properties prop = new Properties();
prop.load(new FileInputStream(args[1]));
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"BOOTSTRAP_SERVER_1:9093,BOOTSTRAP_SERVER_2:9093");
properties.setProperty(ConsumerConfig.CLIENT_ID_CONfig,"ConsumerClient");
properties.setProperty(ConsumerConfig.GROUP_ID_CONfig,"<CONSUMER_GROUP_ID>");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
properties.setProperty(CommonClientConfigs.Security_PROTOCOL_CONfig,"SASL_SSL");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONfig,"<PATH>/kafka-client.truststore.jks");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONfig,"password");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
properties.setProperty("schema.registry.url","<SCHEMA_REGISTRY_URL>");
System.setProperty("java.security.krb5.conf","<PATH>/krb5.conf");
System.setProperty("java.security.auth.login.config","<PATH>/kafka_client_jaas.conf");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
consumer.subscribe(Collections.singletonList("<TOPIC_NAME>"));
consumer.poll(0); // without this the below statement never got any records
ObjectMapper mapper = new ObjectMapper();
try {
while (true) {
ConsumerRecords<String,String> records = consumer.poll(0);
for (ConsumerRecord<String,String> record : records)
logger.info("Message received successfully - Partition:" + record.partition() + " - Offset:" + record.offset() + " - Message:" + mapper.writeValueAsstring(record.value().toString()));
}
} finally {
consumer.close();
}
}
}
这在golang中应该怎么做?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)