2016-12-14 14 views
6

को ऑब्जेक्ट्स मैं अपने कस्टम जावा वस्तु है और लाभ उठाने के लिए इच्छा JVM निर्मित क्रमबद्धता में यह एक काफ्का विषय के लिए भेजने के लिए है, लेकिन क्रमबद्धता नीचे त्रुटिभेजें कस्टम जावा काफ्का विषय

org.apache.kafka साथ विफल रहता है। common.errors.SerializationException: वर्ग org.apache.kafka.common.serialization.ByteArraySerializer निर्दिष्ट करने के लिए वर्ग com.spring.kafka.Payload की मूल्य कनवर्ट नहीं कर सकता value.serializer

पेलोड में। जावा

public class Payload implements Serializable { 

    private static final long serialVersionUID = 123L; 

    private String name="vinod"; 

    private int anInt = 5; 

    private Double aDouble = new Double("5.0"); 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public int getAnInt() { 
     return anInt; 
    } 

    public void setAnInt(int anInt) { 
     this.anInt = anInt; 
    } 

    public Double getaDouble() { 
     return aDouble; 
    } 

    public void setaDouble(Double aDouble) { 
     this.aDouble = aDouble; 
    } 

} 

निर्माता की मेरी रचना के दौरान, मैं निम्नलिखित गुण निर्धारित किया है

<entry key="key.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 
       <entry key="value.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 

मेरे भेजने आह्वान क्या एक कस्टम जावा ऑब्जेक्ट भेजने का सही तरीका क्या है

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload())); 

नीचे के रूप में है एक निर्माता के माध्यम से एक कफका विषय के माध्यम से?

+0

अन्य विकल्प है JSON प्रारूप में बदलने के लिए और भेज – ravthiru

उत्तर

8

हम 2 विकल्प

1) नीचे सूचीबद्ध के रूप में हम निर्माता के लिए कस्टम जावा वस्तुओं भेजने का इरादा है, हम एक serializer जो org.apache.kafka.common.serialization.Serializer और लागू करता है बनाने की जरूरत नीचे

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { 

    public void configure(Map map, boolean b) { 

    } 

    public byte[] serialize(String s, Object o) { 

     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    public void close() { 

    } 
} 

अपने निर्माता

कोड संदर्भ के निर्माण के दौरान कि serializer वर्ग पारित और तदनुसार

०१२३५१६४१०६ मूल्य serializer सेट
<entry key="value.serializer" 
         value="com.spring.kafka.PayloadSerializer" /> 

2) कस्टम धारावाहिक वर्ग बनाने की कोई आवश्यकता नहीं है। मौजूदा ByteArraySerializer का प्रयोग करें, लेकिन भेजने के दौरान प्रक्रिया

जावा वस्तु का पालन करें -> स्ट्रिंग (अधिमानतः JSON के बजाय represenation toString) -> bytearray

3

चूंकि आप ByteArraySerializer का उपयोग कर रहे हैं, तो आपको बाइट [] निर्माता को तुरंत चालू करने की आवश्यकता है।

Producer<byte[],byte[]> producer = new KafkaProducer<>(props); 

और फिर, जबकि उदाहरण के लिए, [] serializing या किसी अन्य विधि के बाद बाइट पारित उत्पादन,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes())); 

आप निर्माता के लिए सिर्फ एक पेलोड वस्तु से गुजर रहे हैं, तो यह करने के लिए बेहतर होगा कुंजी धारावाहिक और मूल्य serializer है जो भी आप पास करना चाहते हैं और पढ़ने के दौरान आपको उस डेटा से पढ़ने की जरूरत है।

सीरियलज़ेबल और बाइटअरेयसेरियलाइज़र/बाइटएरे डीसेरियलाइज़र का उपयोग करना अच्छा अभ्यास है।

संबंधित मुद्दे