Flink CEP 不匹配任何东西

问题描述

我有一个使用 Flink 的非常简单的应用程序,它从 Twitter 创建一个数据流并尝试匹配一个简单的模式。我尝试了不同的条件,但没有匹配。问题是无论那个模式是什么,都没有匹配的。这是我的代码片段:

final DataStream<Tweet> notDeletedTweets = originalTweets
        .filter(tweet -> !tweet.contains("delete"))
        .map(Tweet::parse);

final Pattern<Tweet,Tweet> pattern = Pattern.<Tweet>begin("FirstTweet").where(new SimpleCondition<Tweet>() {

    @Override
    public boolean filter(final Tweet pTweet) throws Exception {
        return pTweet.getRetweeted() > 0;
    }
}).times(2).within(Time.seconds(15));

final KeyedStream<Tweet,String> keyed = notDeletedTweets.keyBy(Tweet::getLang);
//        keyed.print();
PatternStream<Tweet> patternStream = CEP.pattern(keyed,pattern);
patternStream.process(new PatternProcessFunction<Tweet,Tweet>() {

    @Override
    public void processMatch(final Map<String,List<Tweet>> match,final Context ctx,final Collector<Tweet> out) throws Exception {
         out.collect(match.get("FirstTweet").get(0));
    }
}).print();
DataStream<String> manyMentions = patternStream.select((PatternSelectFunction<Tweet,String>) map -> {
    LoggerFactory.getLogger(TweetsstreamingJob.class).info("Current map is {} ",map);
    return map.get("FirstTweet").get(0).toString();
});
manyMentions.print();
env.execute("Flink-Tweet-Streaming-Api");

这是 Tweet 对象:

public class Tweet implements Serializable {

    private Long id;

    private String user;

    private String lang;

    private String tweet;

    private Long timestamp;

    private int retweeted;

    private static ObjectMapper jsonParser;

    private String originalTweet;
    
    private List<String> hashtags;
  public Tweet() {

}



public static Tweet parse(final String value) {
    if (jsonParser == null) {
        jsonParser = new ObjectMapper();
    }
    JsonNode jsonNode;
    final Tweet tweetobj = new Tweet();
    try {
        jsonNode = jsonParser.readValue(value,JsonNode.class);
        if (jsonNode.has("id")) {
            tweetobj.id = jsonNode.get("id").asLong();
        }
        if (jsonNode.has("timestamp_ms")) {
            tweetobj.timestamp = jsonNode.get("timestamp_ms").asLong();
        }
        if (jsonNode.has("text")) {
            tweetobj.tweet = jsonNode.get("text").asText();
        }
        if (jsonNode.has("user")) {
            tweetobj.user = jsonNode.get("user").get("name").asText();
        }
        if (jsonNode.has("lang")) {
            tweetobj.lang = jsonNode.get("lang").asText();
        }
        if (jsonNode.has("retweeted_status")) {
            tweetobj.retweeted = jsonNode.get("retweeted_status").get("retweet_count").asInt();
        }
        if (jsonNode.has("entities")) {
            final JsonNode tags = jsonNode.get("entities").get("hashtags");
            tags
                    .elements()
                    .forEachRemaining(tagNode -> tweetobj.getHashtags().add(tagNode.get("text").textValue()));
        }
    }
    catch (IOException pE) {
        // ignore
    }
    return tweetobj;
} ... getters and setters

我错过了什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...