जब स्पार्क 2.0 का उपयोग कर काफ्का से स्ट्रीमिंग, मैं निम्न त्रुटि हो रही है साथ रिकॉर्ड:Serializable नहीं अपवाद है जब पढ़ने काफ्का स्पार्क स्ट्रीमिंग
org.apache.spark.SparkException:
Job aborted due to stage failure:
Task 0.0 in stage 1.0 (TID 1) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class:
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337,
CreateTime = 1472871209063, checksum = 2826679694,
serialized key size = -1, serialized value size = 95874,
key = null, value = <JSON GOES HERE...>
यहाँ कोड के संबंधित भाग हैं:
val ssc = new StreamingContext(sc, Seconds(2))
val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
.map(_.value())
.flatMap(message => {
// parsing here...
})
.foreachRDD(rdd => {
// processing here...
})
ssc.start()
जो मैं बता सकता हूं, यह वह पंक्ति है जो .map(_.value())
समस्या का कारण बन रही है, यह कैसे तय किया जा सकता है?