Kafka Consumer无法为任何密钥库类型和路径加载SSL密钥库Logstash ArcSight模块

问题描述

我需要为Kafka Consumer提供用于客户端身份验证的证书,但是,它始终会失败,并出现以下异常(无法加载SSL密钥库):

    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2,TLSv1.1,TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /etc/logstash/truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2020-10-13T10:20:40,578][ERROR][logstash.inputs.kafka    ][module-arcsight][47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer,:cause=>org.apache.kafka.common.KafkaException: 

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts of type JKS}

[2020-10-13T10:20:40,596][ERROR][logstash.javapipeline    ][module-arcsight][47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:module-arcsight
  Plugin: <LogStash::Inputs::Kafka ssl_keystore_location=>"/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts",topics=>["th-cef"],ssl_truststore_location=>"/etc/logstash/truststore.jks",ssl_truststore_password=><password>,ssl_truststore_type=>"JKS",type=>"_doc",bootstrap_servers=>"server:9093",codec=><LogStash::Codecs::CEF id=>"cef_51af8920-59eb-4309-9e7f-4ebb1f774df6",enable_metric=>true,vendor=>"Elasticsearch",product=>"Logstash",version=>"1.0",signature=>"Logstash",name=>"Logstash",severity=>"6",reverse_mapping=>false>,ssl_keystore_password=><password>,security_protocol=>"SSL",id=>"47e090c366f9d0ce03be089496421cdb989d3de7cc9fe63aa9bf4f6109a239b2",ssl_keystore_type=>"JKS",auto_commit_interval_ms=>5000,check_crcs=>true,client_dns_lookup=>"default",client_id=>"logstash",connections_max_idle_ms=>540000,consumer_threads=>1,enable_auto_commit=>true,fetch_max_bytes=>52428800,fetch_max_wait_ms=>500,group_id=>"logstash",heartbeat_interval_ms=>3000,isolation_level=>"read_uncommitted",key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer",max_poll_interval_ms=>300000,max_partition_fetch_bytes=>1048576,max_poll_records=>500,Metadata_max_age_ms=>300000,receive_buffer_bytes=>32768,reconnect_backoff_ms=>50,request_timeout_ms=>40000,retry_backoff_ms=>100,send_buffer_bytes=>131072,session_timeout_ms=>10000,value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer",poll_timeout_ms=>100,ssl_endpoint_identification_algorithm=>"https",decorate_events=>false>
  Error: Failed to construct kafka consumer
  Exception: Java::OrgApacheKafkaCommon::KafkaException
  Stack: org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:666)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:646)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:253)
org.jruby.RubyClass.newInstance(org/jruby/RubyClass.java:939)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.create_consumer(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:346)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.RUBY$method$create_consumer$0$__VaraRGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:243)
org.jruby.RubyEnumerable$22.call(org/jruby/RubyEnumerable.java:902)
org.jruby.RubyEnumerator$2.call(org/jruby/RubyEnumerator.java:404)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:291)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:393)
org.jruby.RubyEnumerator.__each__(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:497)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:487)
org.jruby.RubyEnumerable.callEach19(org/jruby/RubyEnumerable.java:119)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:894)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:886)
org.jruby.RubyEnumerable$INVOKER$s$0$0$map.call(org/jruby/RubyEnumerable$INVOKER$s$0$0$map.gen)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb:243)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java.lib.logstash.inputs.kafka.RUBY$method$run$0$__VaraRGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_integration_minus_kafka_minus_10_dot_4_dot_0_minus_java/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.4.0-java/lib/logstash/inputs/kafka.rb)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:378)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0$__VaraRGS__(usr/share/logstash/logstash_minus_core/lib/logstash//usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:369)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)
java.lang.Thread.run(java/lang/Thread.java:748)

我已经尝试了所有可能的选项:

  • JKS和PKCS12密钥库
  • Java认密钥库(/ usr / lib / jvm / java-8-openjdk-amd64 / jre / lib / security / cacerts)和我自己的密钥库(例如,在/etc/logstash/keystore.jks中)
  • 以logstash用户等身份检查所有权限。

在Logstash ArcSight模块中,密钥库设置指定如下(在logstash.yml中):

modules:
  - name: arcsight
    var.input.eventbroker.bootstrap_servers: "server:9093"
    var.input.eventbroker.security_protocol: "SSL"
    var.input.eventbroker.topics: "th-cef"
    var.input.eventbroker.ssl_keystore_location: "/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/security/cacerts"
    var.input.eventbroker.ssl_keystore_password: "changeit"
    var.input.eventbroker.ssl_keystore_type: "JKS"
    var.input.eventbroker.ssl_truststore_location: "/etc/logstash/truststore.jks"
    var.input.eventbroker.ssl_truststore_password: "changeit"
    var.input.eventbroker.ssl_truststore_type: "JKS"
    var.elasticsearch.hosts: "https://host1:9200,https://host2:9200"
    var.kibana.host: "kibana:5601"
    var.elasticsearch.username: "user"
    var.elasticsearch.password: "pw"
    var.kibana.username: "k"
    var.kibana.password: "p"
    var.elasticsearch.ssl.enabled: "true"
    var.cacert: "/etc/logstash/CA.crt"
    var.kibana.ssl.enabled: "true"
    var.manage_template: "false"

您是否知道为什么KafkaConsumer拒绝加载任何密钥库?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)