问题描述
问题
如何在 Apache Beam 中最好地执行内存密集型管道?
背景
我编写了一个 pipeline,它采用 Naemura Bird dataset 并将图像和注释转换为 TF 记录,其中包含用于 TF 对象检测 API 的 required format 的 TF 示例。
我使用 DirectRunner 和一小部分图像(4 或 5)测试了管道,并且运行良好。
问题
使用更大的数据集(第 1 天,共 3 天,约 21GB)运行管道时,它会在一段时间后崩溃,并显示非描述性的 SIGKILL
。
我确实在崩溃前看到了内存峰值,并假设该进程因内存负载过高而被终止。
我通过 strace
运行管道。这些是跟踪中的最后几行:
[pid 53702] 10:00:09.105069 poll([{fd=10,events=POLLIN},{fd=11,{fd=12,{fd=13,{fd=14,{fd=15,{fd=16,{fd=17,{fd=18,{fd=19,{fd=20,events=POLLIN}],11,100) = 0 (Timeout)
[pid 53702] 10:00:09.205826 poll([{fd=10,100 <unfinished ...>
[pid 53534] 10:00:09.259806 mmap(NULL,63082496,PROT_READ|PROT_WRITE,MAP_PRIVATE|MAP_ANONYMOUS,-1,0) = 0x7f3aa43d7000
[pid 53694] 10:00:09.297140 <... clock_nanosleep resumed>NULL) = 0
[pid 53694] 10:00:09.297273 clock_nanosleep(CLOCK_REALTIME,{tv_sec=0,tv_nsec=200000000},<unfinished ...>
[pid 53702] 10:00:09.306409 <... poll resumed>) = 0 (Timeout)
[pid 53702] 10:00:09.306478 poll([{fd=10,100) = 0 (Timeout)
[pid 53702] 10:00:09.406866 poll([{fd=10,100 <unfinished ...>
[pid 53710] 10:03:55.844910 <... futex resumed>) = ?
[pid 53709] 10:03:57.797618 <... futex resumed>) = ?
[pid 53708] 10:03:57.797737 <... futex resumed>) = ?
[pid 53707] 10:03:57.797793 <... futex resumed>) = ?
[pid 53706] 10:03:57.797847 <... futex resumed>) = ?
[pid 53705] 10:03:57.797896 <... futex resumed>) = ?
[pid 53704] 10:03:57.797983 <... futex resumed>) = ?
[pid 53703] 10:03:57.798035 <... futex resumed>) = ?
[pid 53702] 10:03:57.798085 +++ killed by SIGKILL +++
[pid 53701] 10:03:57.798124 <... futex resumed>) = ?
[pid 53700] 10:03:57.798173 <... futex resumed>) = ?
[pid 53699] 10:03:57.798224 <... futex resumed>) = ?
[pid 53698] 10:03:57.798272 <... futex resumed>) = ?
[pid 53697] 10:03:57.798321 <... accept4 resumed> <unfinished ...>) = ?
[pid 53694] 10:03:57.798372 <... clock_nanosleep resumed> <unfinished ...>) = ?
[pid 53693] 10:03:57.798426 <... futex resumed>) = ?
[pid 53660] 10:03:57.798475 <... futex resumed>) = ?
[pid 53641] 10:03:57.798523 <... futex resumed>) = ?
[pid 53640] 10:03:57.798572 <... futex resumed>) = ?
[pid 53639] 10:03:57.798620 <... futex resumed>) = ?
[pid 53710] 10:03:57.798755 +++ killed by SIGKILL +++
[pid 53709] 10:03:57.798792 +++ killed by SIGKILL +++
[pid 53708] 10:03:57.798828 +++ killed by SIGKILL +++
[pid 53707] 10:03:57.798864 +++ killed by SIGKILL +++
[pid 53706] 10:03:57.798900 +++ killed by SIGKILL +++
[pid 53705] 10:03:57.798937 +++ killed by SIGKILL +++
[pid 53704] 10:03:57.798973 +++ killed by SIGKILL +++
[pid 53703] 10:03:57.799008 +++ killed by SIGKILL +++
[pid 53701] 10:03:57.799044 +++ killed by SIGKILL +++
[pid 53700] 10:03:57.799079 +++ killed by SIGKILL +++
[pid 53699] 10:03:57.799116 +++ killed by SIGKILL +++
[pid 53698] 10:03:57.799152 +++ killed by SIGKILL +++
[pid 53697] 10:03:57.799187 +++ killed by SIGKILL +++
[pid 53694] 10:03:57.799245 +++ killed by SIGKILL +++
[pid 53693] 10:03:57.799282 +++ killed by SIGKILL +++
[pid 53660] 10:03:57.799318 +++ killed by SIGKILL +++
[pid 53641] 10:03:57.799354 +++ killed by SIGKILL +++
[pid 53640] 10:03:57.799390 +++ killed by SIGKILL +++
[pid 53639] 10:03:57.910349 +++ killed by SIGKILL +++
10:03:57.910381 +++ killed by SIGKILL +++
解决方法
多种原因可能导致这种行为,因为管道在数据较少的情况下运行良好,分析发生的变化可能会导致我们找到解决方案。
选项 1:清理输入数据
您提供的日志的第三行可能表明您正在更大的管道中处理不干净的数据 mmap(NULL,
可能意味着 | "Get Content" >> beam.Map(lambda x: x.read_utf8())
正在尝试读取空值。
某处有空文件吗?你的文件是 utf8 编码的吗?
选项 2:使用较小的文件作为输入
我猜使用 fileio.ReadMatches()
会尝试将整个文件加载到内存中,如果您的文件大于您的内存,这可能会导致错误。你能把你的数据拆分成更小的文件吗?
选项 3:使用更大的基础设施
如果文件对于您当前使用 DirectRunner
的机器来说太大,您可以尝试使用云上的另一个运行器(例如 DataflowRunner
)来使用按需基础架构