读取 avro 文件的第一个事件后出现 EOFException

问题描述

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;

import com.quotes.Quotes;

public class BinaryDecoderApp {
    public static void main(String[] args) throws IOException {
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/lpquotes.avsc"));

        File avroFile = new File("src/main/resources/FlumeData.1619451750874");

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new FileInputStream(avroFile),null);
        SpecificDatumReader<Quotes> datumReader = new SpecificDatumReader<>(schema);

        while (!decoder.isEnd()) {

            Quotes record = datumReader.read(null,decoder);
            System.out.println(record);
        }
    }
}

上面的代码是我用来解码我在 HDFS 中使用 Flume 摄取的 avro 事件的代码。 我遇到的问题是,在阅读文件的第一个事件(正确打印)后,出现异常:

Exception in thread "main" java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.readDouble(BinaryDecoder.java:272)
    at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:197)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:201)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at com.quotes.reader.BinaryDecoderApp.main(BinaryDecoderApp.java:27)

显然,Flume 可能会在每个 avro 事件的末尾放置一个字符,并产生此异常。

但是我找不到正确阅读它的方法

这里是水槽配置:

tier1.channels = c1
tier1.sources = r1
tier1.sinks = k1

# AvroSource r1
tier1.sources.r1.channels = c1

tier1.sources.r1.type = avro
tier1.sources.r1.bind = 0.0.0.0
tier1.sources.r1.port = 60001

# Channel c1
tier1.channels.c1.type = file

# HdfsSink k1
tier1.sinks.k1.channel=c1
tier1.sinks.k1.type=hdfs

## HdfsSink k1 sinking properties
tier1.sinks.k1.hdfs.path=/
tier1.sinks.k1.hdfs.fileType = DataStream
tier1.sinks.k1.hdfs.batchSize = 100000
tier1.sinks.k1.hdfs.rollSize = 0
tier1.sinks.k1.hdfs.rollCount = 0
tier1.sinks.k1.hdfs.rollInterval = 60
tier1.sinks.k1.hdfs.threadPoolSize = 500
tier1.sinks.k1.hdfs.callTimeout = 180000
tier1.sinks.k1.hdfs.serializer = avro_event
tier1.sinks.k1.hdfs.writeFormat = Text

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)