问题描述
我是Flink的新手,我想将Kafka流数据存储到Cassandra中。我已经将String转换为POJO。我的POJO如下,
@Table(keyspace = "sample",name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
我的转换发生在下面,
stream.flatMap(new FlatMapFunction<String,Person>() {
public void flatMap(String value,Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value,Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print(); // I need to use proper method to convert to Datastream.
env.execute();
我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
Cassandra Sink接受DataStream实例。我需要转换我的转换并将其存储到Kafka中。
Cant create Cassandra Pojo Sink也给我一些想法。
有方法.forward()
返回 DataStream<Reading> forward
,并将实例传递给时,
CassandraSink.addSink(forward)
.setHost("localhost")
.build();
cannot access org.apache.flink.streaming.api.scala.DataStream
如何将POJO转换为存储在Cassandra中?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)