在 Spark 流任务中实现多线程

问题描述

我有一个 Spark 流应用程序,它从 Kafka 主题中读取数据,对每条记录的内容运行一系列操作,并将结果写入数据库

Spark 流应用程序实现如下:

  • 使用 mapPartitions 将 Java 函数应用于每个分区
  • 函数中,依次定义所有操作

操作包括 IO 密集型和计算密集型操作。

Spark 驱动程序在执行器上的每个内核最多运行一项任务。这导致执行器上 cpu 的利用率降低。我希望一定有一种方法可以通过在我的计算密集型操作中实现多线程来提高吞吐量。有没有人这样做过?您可以在此帮助提及的任何该做和不该做的事?

我关心的一个特殊问题是如何控制执行程序中线程的创建/重用。以下步骤是否可以执行此操作:

  • 围绕控制最大线程数和其他配置的 Java 线程池实现(例如 java.util.concurrent.Executor)编写单例
  • mapPartitions中使用的Java函数中引用这个单例来提交子任务并并行运行

上面的实现是否可以限制为在每个执行器中运行mapPartitions函数而创建的线程数?有没有更好的方法来实现这一点?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)