在单个Produce命令中可以为Kafka主题生成的记录数量是否有限制 这是spark-kafka-sql库的代码:

问题描述

我有一个Databricks Kafka Producer,需要将62M条记录写入Kafka主题。如果同时写入62M记录会不会有问题?还是我需要迭代20次并在每次迭代中写入3M记录。

这是代码

error_clear_last();
$isDir = @is_dir('/');
if (error_get_last() !== null) {
   $isDir = 'cannot-detect';
}

我的问题是-如果strDf.count是62M,我可以直接将其写入Kafka还是需要迭代cmd#3。

解决方法

使用针对Kafka的Spark结构化流将数据存储到Kafka中没有限制。您将在下面看到,流查询将创建KafkaProducer(的池),该池用于迭代Dataframe中的行。 Kafka可以处理如此多的消息,并且没有限制。

可能有趣的是,在这批消息实际写入代理之前,Kafka会将一些消息缓冲到一批中。这是通过KafkaProducer Configs linger.msbatch.sizemax.request.size的配置来控制的,因此将这些设置调整为您的整体设置可能会有用。

这是spark-kafka-sql库的代码:

在内部,Spark将在InternalKafkaProducerPool.scala中创建一个KafkaProducers池:

  private def createKafkaProducer(paramsSeq: Seq[(String,Object)]): Producer = {
    val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
    if (log.isDebugEnabled()) {
      val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
      logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
    }
    kafkaProducer
  }

您的查询随后将转换为RDD,并针对每个分区遍历KafkaWriter.scala中的元素:

  queryExecution.toRdd.foreachPartition { iter =>
      val writeTask = new KafkaWriteTask(kafkaParameters,schema,topic)
      Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
        finallyBlock = writeTask.close())
    }
  }

数据的实际产生将在KafkaWriteTask中发生:

  def execute(iterator: Iterator[InternalRow]): Unit = {
    producer = Some(InternalKafkaProducerPool.acquire(producerConfiguration))
    val internalProducer = producer.get.producer
    while (iterator.hasNext && failedWrite == null) {
      val currentRow = iterator.next()
      sendRow(currentRow,internalProducer)
    }
  }