Kafka 消费者Kerberos 身份验证Golang

问题描述

我想使用融合的 Kafka 库“github.com/confluentinc/confluent-kafka-go/kafka”在 Golang 中配置具有 Kerberos 身份验证的 Kafka 消费者。还需要使用 Avro 类型的 Kafka 消息读取消息。 在此处获取 Java 代码

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
 
import com.fasterxml.jackson.databind.ObjectMapper;
 
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
 
public class KerberosConsumer {
     
    static final Logger logger = Logger.getLogger(AvroConsumer.class);
 
    public static void main(String[] args) throws InterruptedException,FileNotFoundException,IOException {
        PropertyConfigurator.configure(args[0]);
         
        Properties prop = new Properties();
        prop.load(new FileInputStream(args[1]));
 
        Properties properties = new Properties();
         
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"BOOTSTRAP_SERVER_1:9093,BOOTSTRAP_SERVER_2:9093");
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONfig,"ConsumerClient");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONfig,"<CONSUMER_GROUP_ID>");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
        properties.setProperty(CommonClientConfigs.Security_PROTOCOL_CONfig,"SASL_SSL");
        properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONfig,"<PATH>/kafka-client.truststore.jks");
        properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONfig,"password");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
        properties.setProperty("schema.registry.url","<SCHEMA_REGISTRY_URL>");
         
        System.setProperty("java.security.krb5.conf","<PATH>/krb5.conf");
        System.setProperty("java.security.auth.login.config","<PATH>/kafka_client_jaas.conf");
         
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
        consumer.subscribe(Collections.singletonList("<TOPIC_NAME>"));
        consumer.poll(0); // without this the below statement never got any records
        ObjectMapper mapper = new ObjectMapper();
        try {
            while (true) {
                ConsumerRecords<String,String> records = consumer.poll(0);
                for (ConsumerRecord<String,String> record : records)
                    logger.info("Message received successfully - Partition:" + record.partition() + " - Offset:" + record.offset() + " - Message:" + mapper.writeValueAsstring(record.value().toString()));
            }
        } finally {
            consumer.close();
        }
    }
}

这在golang中应该怎么做?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)