无法使用Apache flink编译加入流代码Kinesis数据分析

问题描述

我无法使用Kinesis Data Analytics在Apache Flink中编译和运行联接代码。遵循文档并进行一些研究后,我的代码如下所示:

stream_1.join(stream_2)
      .where((x: String) => x.orderId)
        .equalTo((x: String) => x.orderId)
          .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
          .apply{ (e1,e2) => e1 + "," + e2 }.addSink(createSinkFromStaticConfig())

此操作失败,并显示错误:value orderId is not a member of String

因为我的原始数据是json,所以我也尝试过类似的操作:

    stream_1.join(stream_2)
      .where((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)
        .equalTo((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)
          .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
          .apply{ (e1," + e2 }.addSink(createSinkFromStaticConfig())

失败并出现错误:

 error: overloaded method value where with alternatives:
[INFO]   [KEY](x$1: org.apache.flink.api.java.functions.KeySelector[String,KEY],x$2: org.apache.flink.api.common.typeinfo.TypeInformation[KEY])org.apache.flink.streaming.api.datastream.JoinedStreams[String,String]#Where[KEY] <and>
[INFO]   [KEY](x$1: org.apache.flink.api.java.functions.KeySelector[String,KEY])org.apache.flink.streaming.api.datastream.JoinedStreams[String,String]#Where[KEY]
[INFO]  cannot be applied to (String => String)
[INFO]       .where((x: String) => jsonParser.readValue(x,classOf[JsonNode]).get("orderId").asText)

我不确定按select键输入什么文档:

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1," + e2 }

有人可以帮助我如何从我的信息流中选择加入密钥吗?

更多详细信息:

  • 来源:Kinesis数据流
  • 流类型:DataStream [String]

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...