DataStax LocalDate时间戳连接问题

问题描述

我正在读取Kafka并使用Flink将数据写入Cassandra,我遇到LocalDateTime编解码器问题,并在我的POJO类中为该问题添加了@Column(name = "timeStamp",codec = LocalDateTimeCodec.class),如下所示,

import com.datastax.driver.extras.codecs.jdk8.LocalDateTimeCodec;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
@NoArgsConstructor
@Table(keyspace = "personKey",name = "Person")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "id")
    private UUID id;

    @Column(name = "name")
    private String name;

   
    @Column(name = "timeStamp",codec = LocalDateTimeCodec.class) // If i remove codec,no linkage error
    private LocalDateTime timeStamp;

}

部分代码是

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>
                                     ("test-topic",new SimpleStringSchema(),prop);
        myConsumer.setStartFromEarliest();

        try (CqlSession session = CqlSession.builder()
                .addContactPoint(new InetSocketAddress("localhost",9042))
                .withLocalDatacenter("datacenter1")
                .build()) {

            CreateKeyspace createKs = SchemaBuilder.createKeyspace("personKey")
                    .withSimpleStrategy(1);
            session.execute(createKs.build());

            CreateTable createTable = SchemaBuilder.createTable("personKey","Person")
                    .ifNotExists()
                    .withPartitionKey("id",DataTypes.UUID)
                    .withColumn("name",DataTypes.TEXT)
                    .withColumn("timeStamp",DataTypes.TIMESTAMP);

            session.execute(createTable.build());
        }


        DataStream<String> stream = env.addSource(myConsumer);
        DataStream<Person> sideOutput = stream.flatMap(new FlatMapFunction<String,Person>() {
            @Override
            public void flatMap(String value,Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value,Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        });

        CassandraSink.addSink(sideOutput)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();

        env.execute();

Maven是

 <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-parameter-names</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-core</artifactId>
            <version>4.7.2</version>
        </dependency>
        <dependency>
            <groupId>com.codahale.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>3.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-query-builder</artifactId>
            <version>4.7.2</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-mapper-runtime</artifactId>
            <version>4.7.2</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-mapping</artifactId>
            <version>3.10.1</version>
        </dependency>

        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-extras</artifactId>
            <version>3.10.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

我遇到链接错误,

Caused by: java.lang.IncompatibleClassChangeError:
Class com.datastax.driver.core.DefaultResultSetFuture does not implement the requested interface org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.ListenableFuture

我也尝试了以下配置,弹出相同的错误,我可以在Cassandra中保存数据而无需使用LocalDateTime

cluster.getConfiguration().getCodecRegistry()
    .register(LocalDateTimeCodec.instance);

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...