2016-04-18 18 views
7

देता है मैं काफ़का 0.0.8V के साथ अपाचे एवरो स्कीमा का उपयोग करता हूं। मैं निर्माता/उपभोक्ता सिरों पर एक ही स्कीमा का उपयोग करता हूं। स्कीमा में कोई भी परिवर्तन नहीं है। लेकिन जब मैं संदेशों का उपभोग करने की कोशिश करता हूं, तो मुझे उपभोक्ता में कुछ अपवाद मिलता है। मुझे यह त्रुटि क्यों मिलती है?एवरो डिकोडिंग java.io.EOFException

निर्माता

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException { 
    BinaryEncoder encoder = null; 
    ByteArrayOutputStream out = null; 
    try { 
     DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema); 
     out = new ByteArrayOutputStream(); 
     encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload, encoder); 
     encoder.flush(); 

     byte[] serializedBytes = out.toByteArray(); 

     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes); 

      producer.send(message); 
     } 

उपभोक्ता

public void run() { 
     try { 
      ConsumerIterator<byte[], byte[]> itr = stream.iterator(); 
      while (itr.hasNext()) { 

       byte[] data = itr.next().message(); 

       Schema schema = new Schema.Parser() 
         .parse(new File("/Users/xx/avro_schemas/file.avsc")); 

       DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); 

       GenericRecord payload = reader.read(null, decoder); 
       System.out.println("Message received --: " + payload); 

लेकिन मैं अपवाद निम्न हो जब पाठक डिकोडर से संदेश पढ़ने की कोशिश .;

java.io.EOFException 
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) 
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) 
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259) 
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

उपभोक्ता गुण

enable.auto.commit=true 
auto.commit.interval.ms=101 
session.timeout.ms=7000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
zookeeper.connect=zookeeper.xx.com\:2181 
heartbeat.interval.ms=1000 
auto.offset.reset=smallest 
serializer.class=kafka.serializer.DefaultEncoder 
bootstrap.servers=kafka.xx.com\:9092 
group.id=test 
consumer.timeout.ms=-1 
fetch.min.bytes=1 
receive.buffer.bytes=262144 

उत्तर

1

समस्या अपने एवरो निर्माता द्वारा निर्मित है।

sendFile() विधि में, आप एन्कोडर को फ्लश नहीं कर रहे हैं, और ByteArrayOutputStream() को बंद नहीं कर रहे हैं, जिससे EOFException उत्पन्न हो रहा है।

public class TestSerializer<T> { 



    final private Class<T> avroType; 

    public TestSerializer(Class<T> avroType) { 
     this.avroType = avroType; 
    } 

    public byte[] serialize(T object) 
    { 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType); 
     try 
     { 
      writer.write(object, encoder); 
      out.close(); 
     } catch (IOException e) 
     { 
      throw new RuntimeException(e); 
     } finally 
     { 
      //Here is the flushing and closing 
      try 
      { 
       if (encoder != null) 
       { 
        encoder.flush(); 
       } 
       if (out != null) 
       { 
        out.close(); 
       } 
      } catch (IOException e) 
      { 
       throw new RuntimeException(e); 
      } 
     } 

     return out.toByteArray(); 

    } 

} 
:

यहाँ आप एक सामान्य क्रमबद्धता वर्ग का एक उदाहरण है

संबंधित मुद्दे