问题描述
场景:Storm 从 Kafka 接收文本消息(例如新闻、推文)并将每条消息拆分为句子。然后,每个句子将被不同的 NLP 模块标记,例如 NER、POS 和情感分析。最后,这些句子将根据它们的 id 合并成一条消息。使用 Trident 或 Spark 可以实现这些场景,但我只需要使用 Apache Storm 来实现。这是我的错误代码。这是我的句子分割器 Bolt:
import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.Map;
public class SentenceBoltSplitter extends BaseRichBolt {
private OutputCollector collector;
private SentenceDetectorME detector;
String ModelPath;
public SentenceBoltSplitter(String path)
{
ModelPath=path;
}
public SentenceBoltSplitter()
{
ModelPath="fail";
}
@Override
public void prepare(Map<String,Object> conf,TopologyContext context,OutputCollector collector) {
this.collector = collector;
Constructor<?> constructor;
InputStream inputStream;
try {
inputStream = new FileInputStream(ModelPath);
SentenceModel model = new SentenceModel(inputStream);
detector = new SentenceDetectorME(model);
} catch ( IOException e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
}
@Override
public void execute(Tuple tuple) {
JSONObject json = null;
try {
json = new JSONObject(tuple.getValue(4).toString());
}
catch (Exception e)
{
System.out.println("json error catched");
}
String sentences[] = detector.sentDetect(json.get("content").toString());
JSONArray sentencesArr = new JSONArray();
for(String temp:sentences) {
JSONObject sent=new JSONObject();
sent.put("sentence",temp);
sent.put("id",json.get("id"));
collector.emit(tuple.getSourceStreamId(),new Values( json.get("id"),sent.toString()));
}
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","doc"));
declarer.declareStream("StreamEnglish",new Fields("id","doc"));
}
}
这里是我的句子合并 Bolt:
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.*;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.BasicOutputCollector;
public class Output_merger extends BaseBasicBolt {
JSONObject docs = new JSONObject();
@Override
public void execute(Tuple tuple,BasicOutputCollector collector) {
String id="";
id= tuple.getValue(0).toString();
JSONArray sentences=docs.getJSONArray("senttences");
sentences.put(new JSONObject(tuple.getValue(1).toString()));
docs.put("sentences",sentences);
collector.emit( new Values(id,docs));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("id","message"));
}
}
这是我的 Java 拓扑:
import SentenceBoltSplitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupletoKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.Kafkaspout;
import org.apache.storm.kafka.spout.KafkaspoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class PipelinetopologySplitter {
public static void main(String args[]) throws Exception
{
Properties props = new Properties();
props.put("Metadata.broker.list","10.XX.XX.XX:9092");
props.put("bootstrap.servers","10.XX.XX.XX:9092");
props.put("request.required.acks","1");
props.put("serializer.class","kafka.serializer.StringEncoder");
Config conf = new Config();
props.put("bootstrap.servers","kafka.serializer.StringEncoder");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
conf.put("kafka.broker.config",props);
conf.put(KafkaBolt.TOPIC,"test-output");
int fourGB = 1 * 1000;
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,fourGB);
conf.setMaxTaskParallelism(300);
conf.setNumWorkers(2);
conf.put("topology.subprocess.timeout.secs",1000);
KafkaBolt<String,String> kafkabolt = new KafkaBolt<String,String>()
.withTopicSelector(new DefaultTopicSelector("test-output"))
.withTupletoKafkaMapper(new FieldNameBasedTupletoKafkaMapper("id","message"))
.withProducerProperties(props);
final TopologyBuilder tp = new TopologyBuilder();
String path= "StormProject/python/";
String path_resources="StormProject/resources/";
tp.setspout("kafka_spout",new Kafkaspout<>(KafkaspoutConfig.builder("10.XX.XX.XX:9092","test_08May").build()),2);
tp.setBolt("LanguageBoltJavaFastext",new MyBolt("lid.176.bin","LanguageBoltFasttext"),2).shuffleGrouping("kafka_spout");
tp.setBolt("SentenceBoltEnglish",new SentenceBoltSplitter(path_resources+"en-sent.bin"),2).shuffleGrouping("LanguageBoltJavaFastext","StreamEnglish");
tp.setBolt("Output",new Output_merger(),1).fieldsGrouping("SentenceBoltEnglish","StreamEnglish",new Fields("id"));
tp.setBolt("forwardToKafka",kafkabolt,1).shuffleGrouping("Output");
LocalCluster localCluster = new LocalCluster();
try {
localCluster.submitTopology("nlp-pipeline-topology-local-debuggig",conf,tp.createtopology());
Thread.sleep(100000000);
} catch (InterruptedException e)
{
e.printstacktrace(); localCluster.shutdown();
}
}
}
这是接收错误:
Exception in thread "main" java.lang.IllegalStateException: Bolt 'Output' contains a non-serializable field of type org.json.JSONObject,which was instantiated prior to topology creation. org.json.JSONObject should be instantiated within the prepare method of 'Output at the earliest.
at org.apache.storm.topology.TopologyBuilder.createtopology(TopologyBuilder.java:122)
at PipelinetopologySplitter.main(PipelinetopologySplitter.java:56)
我试图在我的代码中模仿字数统计示例的逻辑,但欢迎任何其他解决方案,除非使用 Trident 或其他库/框架。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)