देता है मैं काफ़का 0.0.8V के साथ अपाचे एवरो स्कीमा का उपयोग करता हूं। मैं निर्माता/उपभोक्ता सिरों पर एक ही स्कीमा का उपयोग करता हूं। स्कीमा में कोई भी परिवर्तन नहीं है। लेकिन जब मैं संदेशों का उपभोग करने की कोशिश करता हूं, तो मुझे उपभोक्ता में कुछ अपवाद मिलता है। मुझे यह त्रुटि क्यों मिलती है?एवरो डिकोडिंग java.io.EOFException
निर्माता
public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
BinaryEncoder encoder = null;
ByteArrayOutputStream out = null;
try {
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
producer.send(message);
}
उपभोक्ता
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
while (itr.hasNext()) {
byte[] data = itr.next().message();
Schema schema = new Schema.Parser()
.parse(new File("/Users/xx/avro_schemas/file.avsc"));
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord payload = reader.read(null, decoder);
System.out.println("Message received --: " + payload);
लेकिन मैं अपवाद निम्न हो जब पाठक डिकोडर से संदेश पढ़ने की कोशिश .;
java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
उपभोक्ता गुण
enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kafka.xx.com\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144