问题描述
我有以下POJO课,
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
@Table(keyspace = "testKey",name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
}
Mapper代码是
DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String,Person>() {
@Override
public void flatMap(String value,Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value,Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).getSideOutput(new OutputTag<>("contact",TypeInformation.of(Person.class)));
env.execute();
CassandraSink.addSink(sideOutput)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
没有.getSideOutput(new OutputTag<>("contact",TypeInformation.of(Person.class)));
也无法正常工作。
sideOutput
不会散发出要存储在Cassandra中的值。知道我做错了什么吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)