使用Java读取Spark Redis保存的数据

问题描述

我使用spark-redis将数据集保存到Redis。 然后,我使用Spring data redis读取了这些数据:

我保存到redis的对象

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
    private static final long serialVersionUID = 8755574422193819444L;

    @Id
    private String id;

    @Indexed
    private int user;

    @Indexed
    private String product;

    private double productN;
    private double rating;
    private float prediction;

    public static RatingResult convert(Row row) {
        int user = row.getAs("user");
        String product = row.getAs("product");
        double productN = row.getAs("productN");
        double rating = row.getAs("rating");
        float prediction = row.getAs("prediction");
        String id = user + product;

        return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
                .prediction(prediction).build();
    }

}

使用spark-redis保存对象

JavaRDD<RatingResult> result = ...
...
sparkSession.createDataFrame(result,RatingResult.class).write().format("org.apache.spark.sql.redis")
            .option("table","collaborative_filtering").mode(SaveMode.Overwrite).save();

存储库:

@Repository
public interface RatingResultRepository extends JpaRepository<RatingResult,String> {

}

我无法读取使用Spring数据redis保存在Redis中的数据,因为spark-redis和spring数据redis保存的结构数据不相同(我检查了spark-redis和spring数据redis创建的键的值使用命令不同:redis-cli -p 6379 keys \*redis-cli hgetall $key

那么如何使用Java或Java中的任何库保存了如何读取此数据?

解决方法

以下对我有用。

从spark-redis写入数据。

我在这里使用Scala,但是本质上与Java中相同。我唯一更改的是添加了.option("key.column","id")以指定哈希ID。

    val ratingResult = new RatingResult("1",1,"product1",2.0,3.0,4)

    val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
    spark
      .createDataFrame(result,classOf[RatingResult])
      .write
      .format("org.apache.spark.sql.redis")
      .option("key.column","id")
      .option("table","collaborative_filtering")
      .mode(SaveMode.Overwrite)
      .save()

在spring-data-redis中,我有以下内容:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
    private static final long serialVersionUID = 8755574422193819444L;

    @Id
    private String id;

    @Indexed
    private int user;

    @Indexed
    private String product;

    private double productN;
    private double rating;
    private float prediction;

    @Override
    public String toString() {
        return "RatingResult{" +
                "id='" + id + '\'' +
                ",user=" + user +
                ",product='" + product + '\'' +
                ",productN=" + productN +
                ",rating=" + rating +
                ",prediction=" + prediction +
                '}';
    }
}

我使用CrudRepository代替JPA:

@Repository
public interface RatingResultRepository extends CrudRepository<RatingResult,String> {

}

查询:

     RatingResult found = ratingResultRepository.findById("1").get();
     System.out.println("found = " + found);

输出:

found = RatingResult{id='null',user=1,product='product1',productN=2.0,rating=3.0,prediction=4.0}

您可能会注意到没有填充id字段,因为存储的spark-redis具有哈希ID,而不是哈希属性。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...