有没有办法拆分 Apache Storm 元组并将它们合并回来?

问题描述

场景:Storm 从 Kafka 接收文本消息(例如新闻、推文)并将每条消息拆分为句子。然后,每个句子将被不同的 NLP 模块标记,例如 NER、POS 和情感分析。最后,这些句子将根据它们的 id 合并成一条消息。使用 Trident 或 Spark 可以实现这些场景,但我只需要使用 Apache Storm 来实现。这是我的错误代码。这是我的句子分割器 Bolt:

import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.Map;
public class SentenceBoltSplitter extends BaseRichBolt {
    private OutputCollector collector;
    private SentenceDetectorME detector;
    String ModelPath;
    public SentenceBoltSplitter(String path)
    {
        ModelPath=path;
    }
    public SentenceBoltSplitter()
    {
        ModelPath="fail";
    }
    @Override
    public void prepare(Map<String,Object> conf,TopologyContext context,OutputCollector collector) {
        this.collector = collector;
        Constructor<?> constructor;

        InputStream inputStream;
        try {
            inputStream = new FileInputStream(ModelPath);
            SentenceModel model = new SentenceModel(inputStream);
            detector = new SentenceDetectorME(model);
        } catch ( IOException e) {
            // Todo Auto-generated catch block
            e.printstacktrace();
        }
    }
    @Override
    public void execute(Tuple tuple) {

        JSONObject json = null;
        try {
            json = new JSONObject(tuple.getValue(4).toString());
        }
        catch (Exception e)
        {
            System.out.println("json error catched");
        }
        String sentences[] = detector.sentDetect(json.get("content").toString());
        JSONArray sentencesArr = new JSONArray();
        for(String temp:sentences) {
            JSONObject sent=new JSONObject();
            sent.put("sentence",temp);
            sent.put("id",json.get("id"));
            collector.emit(tuple.getSourceStreamId(),new Values( json.get("id"),sent.toString()));
        }
        collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id","doc"));
        declarer.declareStream("StreamEnglish",new Fields("id","doc"));
    }
}

这里是我的句子合并 Bolt:

import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.*;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.BasicOutputCollector;
public class Output_merger extends BaseBasicBolt {
    JSONObject docs = new JSONObject();
    @Override
    public void execute(Tuple tuple,BasicOutputCollector collector) {
        String id="";
        id= tuple.getValue(0).toString();
        JSONArray sentences=docs.getJSONArray("senttences");
        sentences.put(new JSONObject(tuple.getValue(1).toString()));
        docs.put("sentences",sentences);
        collector.emit( new Values(id,docs));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("id","message"));
    }
}

这是我的 Java 拓扑:

import SentenceBoltSplitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupletoKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.Kafkaspout;
import org.apache.storm.kafka.spout.KafkaspoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class PipelinetopologySplitter {
    public static void main(String args[]) throws Exception
    {
        Properties props = new Properties();
        props.put("Metadata.broker.list","10.XX.XX.XX:9092");
        props.put("bootstrap.servers","10.XX.XX.XX:9092");
        props.put("request.required.acks","1");
        props.put("serializer.class","kafka.serializer.StringEncoder");
        Config conf = new Config();
        props.put("bootstrap.servers","kafka.serializer.StringEncoder");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        conf.put("kafka.broker.config",props);
        conf.put(KafkaBolt.TOPIC,"test-output");
        int fourGB = 1 * 1000;
        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,fourGB);
        conf.setMaxTaskParallelism(300);
        conf.setNumWorkers(2);
        conf.put("topology.subprocess.timeout.secs",1000);
        KafkaBolt<String,String> kafkabolt = new KafkaBolt<String,String>()
                .withTopicSelector(new DefaultTopicSelector("test-output"))
                .withTupletoKafkaMapper(new FieldNameBasedTupletoKafkaMapper("id","message"))
                .withProducerProperties(props);
        final TopologyBuilder tp = new TopologyBuilder();
        String path= "StormProject/python/";
        String path_resources="StormProject/resources/";
        tp.setspout("kafka_spout",new  Kafkaspout<>(KafkaspoutConfig.builder("10.XX.XX.XX:9092","test_08May").build()),2);
        tp.setBolt("LanguageBoltJavaFastext",new  MyBolt("lid.176.bin","LanguageBoltFasttext"),2).shuffleGrouping("kafka_spout");
        tp.setBolt("SentenceBoltEnglish",new SentenceBoltSplitter(path_resources+"en-sent.bin"),2).shuffleGrouping("LanguageBoltJavaFastext","StreamEnglish");
        tp.setBolt("Output",new Output_merger(),1).fieldsGrouping("SentenceBoltEnglish","StreamEnglish",new Fields("id"));
        tp.setBolt("forwardToKafka",kafkabolt,1).shuffleGrouping("Output");
        LocalCluster localCluster = new LocalCluster();
        try {
            localCluster.submitTopology("nlp-pipeline-topology-local-debuggig",conf,tp.createtopology());
            Thread.sleep(100000000);
        } catch (InterruptedException e)
        {
            e.printstacktrace(); localCluster.shutdown();
        }
    }
}

这是接收错误

Exception in thread "main" java.lang.IllegalStateException: Bolt 'Output' contains a non-serializable field of type org.json.JSONObject,which was instantiated prior to topology creation. org.json.JSONObject should be instantiated within the prepare method of 'Output at the earliest.
    at org.apache.storm.topology.TopologyBuilder.createtopology(TopologyBuilder.java:122)
    at PipelinetopologySplitter.main(PipelinetopologySplitter.java:56)

我试图在我的代码中模仿字数统计示例的逻辑,但欢迎任何其他解决方案,除非使用 Trident 或其他库/框架。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...