Apache Kafka Spout SSL

问题描述

我们正在创建一个带有 kafka spout 的 STORM 拓扑。 Kafka 上的数据使用生产者的 SSL 加密。我们使用直接的 Storm Bolt 集成来访问来自 Kafka spout 的数据,如下所示

import java.io.FileInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.brokerHosts;
import org.apache.storm.kafka.Kafkaspout;
import org.apache.storm.kafka.spoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

   public class Topology {

public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();
    Config conf = new Config();
    // getting properties from topology.properties
    Properties prop = new Properties();
    InputStream input = null;
    input = new FileInputStream(args[0]);
    prop.load(input);
    spoutId = prop.getProperty("spoutId");
    topologyName = prop.getProperty("topologyName");
    zkhost = prop.getProperty("zkhost");

    // setting workers for each bolt from properties
    numspout = Integer.parseInt(prop.getProperty("numspout"));
    numConverter = Integer.parseInt(prop.getProperty("numConverter"));
    numTaskspout = Integer.parseInt(prop.getProperty("numTaskspout"));
    numTaskConverter = Integer.parseInt(prop.getProperty("numTaskConverter"));

    spoutConfig spoutConfig = new spoutConfig(hosts,inputTopic,"/" + Kafkabroker,consumerGroup);
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
    spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
    spoutConfig.useStartOffsetTimeIfOffsetoutOfRange = true;
    spoutConfig.ignoreZkOffsets = true;
    spoutConfig.zkPort = 2181;
    spoutConfig.socketTimeoutMs = socketTimeoutMs;
    spoutConfig.fetchSizeBytes = fetchSizeBytes;

    Kafkaspout kafkaspout = new Kafkaspout(spoutConfig);
    builder.setspout(spoutId,kafkaspout,numspout).setNumTasks(numTaskspout)
            .addConfiguration("offset.commit.period.ms",offsetCommitPeriodMs)
            .addConfiguration("max.uncommitted.offsets",maxUncommittedOffsets)
            .addConfiguration("poll.timeout.ms",pollTimeoutMs);

    builder.setBolt("Converter",new Converter(),numConverter).shuffleGrouping(spoutId)
            .setNumTasks(numTaskConverter);

我们如何配置Kafka spout来访问加密数据??

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...