无法使用Spark结构的流将数据发送到MongoDB

问题描述

我遵循了Unable to send data to MongoDB using Kafka-Spark Structured Streaming,将数据从Spark结构化流传输到mongoDB,并成功实现了它,但存在一个问题。 就像when函数

override def process(record: Row): Unit = {

    val doc: Document = Document(record.prettyJson.trim)
    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

代码正在执行,没有任何问题,但是没有数据发送到MongoDB

但是如果我添加这样的打印语句

override def process(record: Row): Unit = {
    val doc: Document = Document(record.prettyJson.trim)

    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    result.foreach(println) //print statement
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

数据正在插入MongoDB

我不知道为什么????

解决方法

foreach 初始化写入器接收器。如果没有 foreach,您的数据框永远不会被计算。

Try this :

val df = // your df here
df.map(r => process(r))
df.count()

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...