问题描述
我有一个使用 Flink 的非常简单的应用程序,它从 Twitter 创建一个数据流并尝试匹配一个简单的模式。我尝试了不同的条件,但没有匹配。问题是无论那个模式是什么,都没有匹配的。这是我的代码片段:
final DataStream<Tweet> notDeletedTweets = originalTweets
.filter(tweet -> !tweet.contains("delete"))
.map(Tweet::parse);
final Pattern<Tweet,Tweet> pattern = Pattern.<Tweet>begin("FirstTweet").where(new SimpleCondition<Tweet>() {
@Override
public boolean filter(final Tweet pTweet) throws Exception {
return pTweet.getRetweeted() > 0;
}
}).times(2).within(Time.seconds(15));
final KeyedStream<Tweet,String> keyed = notDeletedTweets.keyBy(Tweet::getLang);
// keyed.print();
PatternStream<Tweet> patternStream = CEP.pattern(keyed,pattern);
patternStream.process(new PatternProcessFunction<Tweet,Tweet>() {
@Override
public void processMatch(final Map<String,List<Tweet>> match,final Context ctx,final Collector<Tweet> out) throws Exception {
out.collect(match.get("FirstTweet").get(0));
}
}).print();
DataStream<String> manyMentions = patternStream.select((PatternSelectFunction<Tweet,String>) map -> {
LoggerFactory.getLogger(TweetsstreamingJob.class).info("Current map is {} ",map);
return map.get("FirstTweet").get(0).toString();
});
manyMentions.print();
env.execute("Flink-Tweet-Streaming-Api");
这是 Tweet 对象:
public class Tweet implements Serializable {
private Long id;
private String user;
private String lang;
private String tweet;
private Long timestamp;
private int retweeted;
private static ObjectMapper jsonParser;
private String originalTweet;
private List<String> hashtags;
public Tweet() {
}
public static Tweet parse(final String value) {
if (jsonParser == null) {
jsonParser = new ObjectMapper();
}
JsonNode jsonNode;
final Tweet tweetobj = new Tweet();
try {
jsonNode = jsonParser.readValue(value,JsonNode.class);
if (jsonNode.has("id")) {
tweetobj.id = jsonNode.get("id").asLong();
}
if (jsonNode.has("timestamp_ms")) {
tweetobj.timestamp = jsonNode.get("timestamp_ms").asLong();
}
if (jsonNode.has("text")) {
tweetobj.tweet = jsonNode.get("text").asText();
}
if (jsonNode.has("user")) {
tweetobj.user = jsonNode.get("user").get("name").asText();
}
if (jsonNode.has("lang")) {
tweetobj.lang = jsonNode.get("lang").asText();
}
if (jsonNode.has("retweeted_status")) {
tweetobj.retweeted = jsonNode.get("retweeted_status").get("retweet_count").asInt();
}
if (jsonNode.has("entities")) {
final JsonNode tags = jsonNode.get("entities").get("hashtags");
tags
.elements()
.forEachRemaining(tagNode -> tweetobj.getHashtags().add(tagNode.get("text").textValue()));
}
}
catch (IOException pE) {
// ignore
}
return tweetobj;
} ... getters and setters
我错过了什么?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)