问题描述
我正在尝试使用 Java 连接到 SASL 中的 Kafka 服务器,但它不起作用。我没有托管服务器,这是我拥有的唯一凭据:
- KAFKA_broKER URL / PORT
- KAFKA_SCHEMA_REGISTRY
- KAFKA_USER
- KAFKA_PASSWORD
- KAFKA_TOPIC_NAME
- The Kafka certificate (ca.pem)
我能够像这样用 Python 连接到服务器:
'bootstrap.servers': 'kafka-************.com:****','ssl.ca.location':'/home/******/Downloads/ca.pem','security.protocol':'sasl_ssl','group.id': '*****','sasl.mechanism':'SCRAM-SHA-256','sasl.username':'******','sasl.password':'******'
这工作正常,问题是“ssl.ca.location”参数在 Java 中不可用,我看到人们正在使用 KeyStore / TrustStore 信息,但我没有这些信息。
你知道我如何使用 Java 提供证书信息吗?
Here is my actual Java code:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"kafka-*********.com:****");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONfig,"*****");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
properties.put("security.protocol","SASL_SSL");
properties.put("sasl.mechanism","SCRAM-SHA-256");
properties.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"******\" password=\"********\";");
System.out.println("conf== " + properties.get("sasl.jaas.config"));
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,true);
Consumer<String,String> consumer = new KafkaConsumer<>(properties);
这是我得到的错误:
(sun.security.validator.ValidatorException) PKIX path building Failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
解决方法
这是我的解决方案(抱歉,我不是 Kafka 专家)。多亏了证书文件,我生成了一个 Truststore 文件(而不是密钥库,我仍然不清楚它们之间的区别)。以下是我添加的参数以使我工作:
properties.put("ssl.endpoint.identification.algorithm",""); properties.put("ssl.truststore.location","C:\\Users\\**\\Downloads\\keystore.jks");
properties.put("ssl.truststore.password","****");
谢谢各位:)