Akk / Alpakka:将压缩数据写入HDFS时出错

问题描述

我有一个简单的akka​​流媒体应用程序(使用alpakka hdfs连接器),可将数据写入HDFS。 持久存储纯数据(json是主要格式)时,一切都很好,但是当我切换为以压缩格式(Gzip)写入数据时,我会遇到下一个错误

Exception in thread "main" java.lang.IllegalArgumentException: requirement Failed: Compressor cannot be null
    at scala.Predef$.require(Predef.scala:340)
    at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter.<init>(CompressedDataWriter.scala:32)
    at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter$.apply(CompressedDataWriter.scala:73)
    at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressedWithPassthrough(HdfsFlow.scala:112)
    at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressed(HdfsFlow.scala:82)
    at HdfsPersistance$.main(HdfsPersistance.scala:59)
    at HdfsPersistance.main(HdfsPersistance.scala)

此特定错误仅在尝试使用Gzip时出现,在尝试使用DefaultCodec时没有此错误。 这是到目前为止我正在使用的代码片段:

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.alpakka.hdfs._
import akka.stream.alpakka.hdfs.scaladsl.HdfsFlow
import akka.stream.scaladsl.{Sink,Source}
import akka.util.ByteString
import org.apache.hadoop.io.compress.GzipCodec

object StackOverflow {
  def main(args: Array[String]): Unit = {

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.FileSystem
    implicit val system = ActorSystem("QuickStart")

    val conf = new Configuration()
    conf.set("fs.defaultFS","hdfs://192.168.0.8:9000/")

    val pathGenerator = FilePathGenerator( (rotationCount: Long,timestamp: Long) => s"/data/$rotationCount-$timestamp")
    val settings =
      HdfsWritingSettings()
        .withOverwrite(true)
        .withNewLine(true)
        .withLineseparator(System.getProperty("line.separator"))
        .withPathGenerator(pathGenerator)

    val fs: FileSystem = FileSystem.get(conf)

    val elements = (0 to 100000).map(s => s"$s\n")
    val source = Source(elements)

    val codec = new GzipCodec()
    codec.setConf(fs.getConf)

    val result = source
      .map { json =>
        HdfsWriteMessage(ByteString(json))
      }
      .via(
        HdfsFlow.compressed(
          fs,SyncStrategy.count(100),RotationStrategy.count(500),codec,settings
        )
      )
      .runWith(Sink.ignore)

    implicit val ec = system.dispatcher
    result.onComplete {
      case _ => system.terminate()
    }
  }
}

所以,我的问题是,我应该尝试配置客户端应用程序吗?解决错误?还是特别需要在HDFS端启用此功能?我确实尝试在HDFS上启用压缩,但是看起来没有成功。 还是我想我可能会缺少某种依赖性? FY-> val AkkaVersion =“ 2.6.8”

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)