Apache Beam 在一段时间后停止处理 PubSub 消息

问题描述

我正在尝试编写一个简单的 Apache Beam 管道(将在 Dataflow 运行器上运行)来执行以下操作:

  • 订阅中读取包含 GCS 上文件路径的 PubSub 消息。
  • 对于每条消息,读取与消息关联的文件中包含的数据(文件可以是各种格式(csv、jsonl、json、xml 等))。
  • 对每条记录进行一些处理。
  • 在 GCS 上写回结果。

我在消息上使用了 10 秒的固定窗口。由于传入的文件已经分块(最大大小为 10MB),我决定不使用可拆分的 do 函数来读取文件,以避免增加无用的复杂性(特别是对于不能简单拆分的文件)。

这是一个简化的代码示例,它给出了与完整代码完全相同的问题:

package skytv.ingester

import java.io.{BufferedReader,InputStreamReader}
import java.nio.charset.StandardCharsets

import kantan.csv.rfc
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.{Compression,FileIO,FileSystems,TextIO,WriteFilesResult}
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubIO,PubsubMessage}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow,FixedWindows,PaneInfo,Window}
import org.apache.beam.sdk.transforms.{Contextful,DoFn,MapElements,PTransform,ParDo,SerializableFunction,SimpleFunction,WithTimestamps}
import org.apache.beam.sdk.values.{KV,PCollection}
import org.joda.time.{Duration,Instant}
import skytv.cloudstorage.CloudStorageClient
import skytv.common.Closeable
import kantan.csv.ops._
import org.apache.beam.sdk.io.FileIO.{Sink,Write}

class FileReader extends DoFn[String,List[String]] {

  private def getFileReader(filePath: String) = {
    val cloudStorageClient = new CloudStorageClient()
    val inputStream = cloudStorageClient.getInputStream(filePath)
    val isr = new InputStreamReader(inputStream,StandardCharsets.UTF_8)
    new BufferedReader(isr)
  }

  private def getRowsIterator(fileReader: BufferedReader) = {
    fileReader
      .asUnsafeCsvReader[Seq[String]](rfc
        .withCellSeparator(',')
        .withoutHeader
        .withQuote('"'))
      .toIterator
  }

  @ProcessElement
  def processElement(c: ProcessContext): Unit = {
    val filePath = c.element()

    Closeable.tryWithResources(
      getFileReader(filePath)
    ) {
      fileReader => {

        getRowsIterator(fileReader)
          .foreach(record => c.output(record.toList))

      }
    }


  }
}

class DataWriter(tempFolder: String) extends PTransform[PCollection[List[String]],WriteFilesResult[String]] {

  private val convertRecord = Contextful.fn[List[String],String]((dr: List[String]) => {
    dr.mkString(",")
  })

  private val getSink = Contextful.fn[String,Sink[String]]((destinationKey: String) => {
    TextIO.sink()
  })

  private val getPartitioningKey = new SerializableFunction[List[String],String] {
    override def apply(input: List[String]): String = {
      input.head
    }
  }

  private val getNaming = Contextful.fn[String,Write.FileNaming]((destinationKey: String) => {
    new Write.FileNaming {
      override def getFilename(
                                window: BoundedWindow,pane: PaneInfo,numShards: Int,shardindex: Int,compression: Compression
                              ): String = {
        s"$destinationKey-${window.maxTimestamp()}-${pane.getIndex}.csv"
      }
    }
  })

  override def expand(input: PCollection[List[String]]): WriteFilesResult[String] = {
    val fileWritingTransform = FileIO
      .writeDynamic[String,List[String]]()
      .by(getPartitioningKey)
      .withDestinationCoder(input.getPipeline.getCoderRegistry.getCoder(classOf[String]))
      .withTempDirectory(tempFolder)
      .via(convertRecord,getSink)
      .withNaming(getNaming)
      .withNumShards(1)

    input
      .apply("WritetoAvro",fileWritingTransform)
  }

}

object EnhancedIngesterScalaSimplified {

  private val SUBSCRIPTION_NAME = "projects/<project>/subscriptions/<subscription>"
  private val TMP_LOCATION = "gs://<path>"

  private val WINDOW_SIZE = Duration.standardSeconds(10)

  def main(args: Array[String]): Unit = {

    val options = PipelineOptionsFactory.fromArgs(args: _*).withValidation().create()

    FileSystems.setDefaultPipelineOptions(options)

    val p = Pipeline.create(options)

    val messages = p
      .apply("ReadMessages",PubsubIO.readMessagesWithAttributes.fromSubscription(SUBSCRIPTION_NAME))
//      .apply("AddProcessingTimeTimestamp",WithTimestamps.of(new SerializableFunction[PubsubMessage,Instant] {
//        override def apply(input: PubsubMessage): Instant = Instant.Now()
//      }))

    val parsedMessages = messages
      .apply("ApplyWindow",Window.into[PubsubMessage](FixedWindows.of(WINDOW_SIZE)))
      .apply("ParseMessages",MapElements.via(new SimpleFunction[PubsubMessage,String]() {
        override def apply(msg: PubsubMessage): String = new String(msg.getPayload,StandardCharsets.UTF_8)
      }))

    val dataReadResult = parsedMessages
      .apply("ReadData",ParDo.of(new FileReader))

    val writerResult = dataReadResult.apply(
      "WriteData",new DataWriter(TMP_LOCATION)
    )

    writerResult.getPerDestinationOutputFilenames.apply(
      "FilesWritten",MapElements.via(new SimpleFunction[KV[String,String],String]() {
      override def apply(input: KV[String,String]): String = {
        println(s"Written ${input.getKey},${input.getValue}")
        input.getValue
      }
    }))

    p.run.waitUntilFinish()
  }
}

问题是在处理了一些消息(大约 1000 条)后,作业停止处理新消息,并且它们永远留在 PubSub 订阅中未被确认。

我看到在这种情况下,水印停止前进,数据新鲜度无限线性增加

这是数据流的屏幕截图:

The dataflow behavior on the read step

这里是 PubSub 队列的情况:

The PubSub message queue state

是否可能有一些消息停留在填充它们的数据流队列中,从而无法添加新消息?

我认为PubsubIO如何计算时间戳有问题,所以我试图强制时间戳等于每条消息的处理时间,但没有成功。

如果我将数据流作业保留在这种状态,它似乎会不断地重新处理相同的消息,而不会将任何数据写入存储。

你知道如何解决这个问题吗?

谢谢!

解决方法

很可能管道在处理管道中的一个(或多个)元素时遇到错误(并且它不应该与 PubsubIO 计算时​​间戳的方式有任何关系),这会阻止水印自失败的工作将在数据流上一次又一次地重试。

您可以从日志中检查是否有任何故障,特别是来自工作程序或线束组件的故障。如果存在未处理的运行时异常,例如解析错误等,很可能是流管道卡住的根本原因。

如果没有 UserCodeException,则可能是由 Dataflow 后端引起的其他问题,您可以联系 Dataflow 客户支持,以便工程师可以调查您管道的后端问题。