问题描述
我有一个简单的akka流媒体应用程序(使用alpakka hdfs连接器),可将数据写入HDFS。 持久存储纯数据(json是主要格式)时,一切都很好,但是当我切换为以压缩格式(Gzip)写入数据时,我会遇到下一个错误。
Exception in thread "main" java.lang.IllegalArgumentException: requirement Failed: Compressor cannot be null
at scala.Predef$.require(Predef.scala:340)
at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter.<init>(CompressedDataWriter.scala:32)
at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter$.apply(CompressedDataWriter.scala:73)
at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressedWithPassthrough(HdfsFlow.scala:112)
at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressed(HdfsFlow.scala:82)
at HdfsPersistance$.main(HdfsPersistance.scala:59)
at HdfsPersistance.main(HdfsPersistance.scala)
此特定错误仅在尝试使用Gzip时出现,在尝试使用DefaultCodec时没有此错误。 这是到目前为止我正在使用的代码片段:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.alpakka.hdfs._
import akka.stream.alpakka.hdfs.scaladsl.HdfsFlow
import akka.stream.scaladsl.{Sink,Source}
import akka.util.ByteString
import org.apache.hadoop.io.compress.GzipCodec
object StackOverflow {
def main(args: Array[String]): Unit = {
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
implicit val system = ActorSystem("QuickStart")
val conf = new Configuration()
conf.set("fs.defaultFS","hdfs://192.168.0.8:9000/")
val pathGenerator = FilePathGenerator( (rotationCount: Long,timestamp: Long) => s"/data/$rotationCount-$timestamp")
val settings =
HdfsWritingSettings()
.withOverwrite(true)
.withNewLine(true)
.withLineseparator(System.getProperty("line.separator"))
.withPathGenerator(pathGenerator)
val fs: FileSystem = FileSystem.get(conf)
val elements = (0 to 100000).map(s => s"$s\n")
val source = Source(elements)
val codec = new GzipCodec()
codec.setConf(fs.getConf)
val result = source
.map { json =>
HdfsWriteMessage(ByteString(json))
}
.via(
HdfsFlow.compressed(
fs,SyncStrategy.count(100),RotationStrategy.count(500),codec,settings
)
)
.runWith(Sink.ignore)
implicit val ec = system.dispatcher
result.onComplete {
case _ => system.terminate()
}
}
}
所以,我的问题是,我应该尝试配置客户端应用程序吗?解决此错误?还是特别需要在HDFS端启用此功能?我确实尝试在HDFS上启用压缩,但是看起来没有成功。 还是我想我可能会缺少某种依赖性? FY-> val AkkaVersion =“ 2.6.8”
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)