问题描述
我有一个Databricks Kafka Producer,需要将62M条记录写入Kafka主题。如果同时写入62M记录会不会有问题?还是我需要迭代20次并在每次迭代中写入3M记录。
这是代码。
error_clear_last();
$isDir = @is_dir('/');
if (error_get_last() !== null) {
$isDir = 'cannot-detect';
}
我的问题是-如果strDf.count是62M,我可以直接将其写入Kafka还是需要迭代cmd#3。
解决方法
使用针对Kafka的Spark结构化流将数据存储到Kafka中没有限制。您将在下面看到,流查询将创建KafkaProducer
(的池),该池用于迭代Dataframe
中的行。 Kafka可以处理如此多的消息,并且没有限制。
可能有趣的是,在这批消息实际写入代理之前,Kafka会将一些消息缓冲到一批中。这是通过KafkaProducer Configs linger.ms
,batch.size
和max.request.size
的配置来控制的,因此将这些设置调整为您的整体设置可能会有用。
这是spark-kafka-sql
库的代码:
在内部,Spark将在InternalKafkaProducerPool.scala中创建一个KafkaProducers池:
private def createKafkaProducer(paramsSeq: Seq[(String,Object)]): Producer = {
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
if (log.isDebugEnabled()) {
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
}
kafkaProducer
}
您的查询随后将转换为RDD,并针对每个分区遍历KafkaWriter.scala中的元素:
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters,schema,topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}
数据的实际产生将在KafkaWriteTask中发生:
def execute(iterator: Iterator[InternalRow]): Unit = {
producer = Some(InternalKafkaProducerPool.acquire(producerConfiguration))
val internalProducer = producer.get.producer
while (iterator.hasNext && failedWrite == null) {
val currentRow = iterator.next()
sendRow(currentRow,internalProducer)
}
}