问题描述
我正在尝试编写一个简单的 Apache Beam 管道(将在 Dataflow 运行器上运行)来执行以下操作:
- 从订阅中读取包含 GCS 上文件路径的 PubSub 消息。
- 对于每条消息,读取与消息关联的文件中包含的数据(文件可以是各种格式(csv、jsonl、json、xml 等))。
- 对每条记录进行一些处理。
- 在 GCS 上写回结果。
我在消息上使用了 10 秒的固定窗口。由于传入的文件已经分块(最大大小为 10MB),我决定不使用可拆分的 do 函数来读取文件,以避免增加无用的复杂性(特别是对于不能简单拆分的文件)。
package skytv.ingester
import java.io.{BufferedReader,InputStreamReader}
import java.nio.charset.StandardCharsets
import kantan.csv.rfc
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.{Compression,FileIO,FileSystems,TextIO,WriteFilesResult}
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubIO,PubsubMessage}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow,FixedWindows,PaneInfo,Window}
import org.apache.beam.sdk.transforms.{Contextful,DoFn,MapElements,PTransform,ParDo,SerializableFunction,SimpleFunction,WithTimestamps}
import org.apache.beam.sdk.values.{KV,PCollection}
import org.joda.time.{Duration,Instant}
import skytv.cloudstorage.CloudStorageClient
import skytv.common.Closeable
import kantan.csv.ops._
import org.apache.beam.sdk.io.FileIO.{Sink,Write}
class FileReader extends DoFn[String,List[String]] {
private def getFileReader(filePath: String) = {
val cloudStorageClient = new CloudStorageClient()
val inputStream = cloudStorageClient.getInputStream(filePath)
val isr = new InputStreamReader(inputStream,StandardCharsets.UTF_8)
new BufferedReader(isr)
}
private def getRowsIterator(fileReader: BufferedReader) = {
fileReader
.asUnsafeCsvReader[Seq[String]](rfc
.withCellSeparator(',')
.withoutHeader
.withQuote('"'))
.toIterator
}
@ProcessElement
def processElement(c: ProcessContext): Unit = {
val filePath = c.element()
Closeable.tryWithResources(
getFileReader(filePath)
) {
fileReader => {
getRowsIterator(fileReader)
.foreach(record => c.output(record.toList))
}
}
}
}
class DataWriter(tempFolder: String) extends PTransform[PCollection[List[String]],WriteFilesResult[String]] {
private val convertRecord = Contextful.fn[List[String],String]((dr: List[String]) => {
dr.mkString(",")
})
private val getSink = Contextful.fn[String,Sink[String]]((destinationKey: String) => {
TextIO.sink()
})
private val getPartitioningKey = new SerializableFunction[List[String],String] {
override def apply(input: List[String]): String = {
input.head
}
}
private val getNaming = Contextful.fn[String,Write.FileNaming]((destinationKey: String) => {
new Write.FileNaming {
override def getFilename(
window: BoundedWindow,pane: PaneInfo,numShards: Int,shardindex: Int,compression: Compression
): String = {
s"$destinationKey-${window.maxTimestamp()}-${pane.getIndex}.csv"
}
}
})
override def expand(input: PCollection[List[String]]): WriteFilesResult[String] = {
val fileWritingTransform = FileIO
.writeDynamic[String,List[String]]()
.by(getPartitioningKey)
.withDestinationCoder(input.getPipeline.getCoderRegistry.getCoder(classOf[String]))
.withTempDirectory(tempFolder)
.via(convertRecord,getSink)
.withNaming(getNaming)
.withNumShards(1)
input
.apply("WritetoAvro",fileWritingTransform)
}
}
object EnhancedIngesterScalaSimplified {
private val SUBSCRIPTION_NAME = "projects/<project>/subscriptions/<subscription>"
private val TMP_LOCATION = "gs://<path>"
private val WINDOW_SIZE = Duration.standardSeconds(10)
def main(args: Array[String]): Unit = {
val options = PipelineOptionsFactory.fromArgs(args: _*).withValidation().create()
FileSystems.setDefaultPipelineOptions(options)
val p = Pipeline.create(options)
val messages = p
.apply("ReadMessages",PubsubIO.readMessagesWithAttributes.fromSubscription(SUBSCRIPTION_NAME))
// .apply("AddProcessingTimeTimestamp",WithTimestamps.of(new SerializableFunction[PubsubMessage,Instant] {
// override def apply(input: PubsubMessage): Instant = Instant.Now()
// }))
val parsedMessages = messages
.apply("ApplyWindow",Window.into[PubsubMessage](FixedWindows.of(WINDOW_SIZE)))
.apply("ParseMessages",MapElements.via(new SimpleFunction[PubsubMessage,String]() {
override def apply(msg: PubsubMessage): String = new String(msg.getPayload,StandardCharsets.UTF_8)
}))
val dataReadResult = parsedMessages
.apply("ReadData",ParDo.of(new FileReader))
val writerResult = dataReadResult.apply(
"WriteData",new DataWriter(TMP_LOCATION)
)
writerResult.getPerDestinationOutputFilenames.apply(
"FilesWritten",MapElements.via(new SimpleFunction[KV[String,String],String]() {
override def apply(input: KV[String,String]): String = {
println(s"Written ${input.getKey},${input.getValue}")
input.getValue
}
}))
p.run.waitUntilFinish()
}
}
问题是在处理了一些消息(大约 1000 条)后,作业停止处理新消息,并且它们永远留在 PubSub 订阅中未被确认。
是否可能有一些消息停留在填充它们的数据流队列中,从而无法添加新消息?
我认为PubsubIO如何计算时间戳有问题,所以我试图强制时间戳等于每条消息的处理时间,但没有成功。
如果我将数据流作业保留在这种状态,它似乎会不断地重新处理相同的消息,而不会将任何数据写入存储。
你知道如何解决这个问题吗?
谢谢!
解决方法
很可能管道在处理管道中的一个(或多个)元素时遇到错误(并且它不应该与 PubsubIO 计算时间戳的方式有任何关系),这会阻止水印自失败的工作将在数据流上一次又一次地重试。
您可以从日志中检查是否有任何故障,特别是来自工作程序或线束组件的故障。如果存在未处理的运行时异常,例如解析错误等,很可能是流管道卡住的根本原因。
如果没有 UserCodeException,则可能是由 Dataflow 后端引起的其他问题,您可以联系 Dataflow 客户支持,以便工程师可以调查您管道的后端问题。