2016-09-03 16 views
5

जब स्पार्क 2.0 का उपयोग कर काफ्का से स्ट्रीमिंग, मैं निम्न त्रुटि हो रही है साथ रिकॉर्ड:Serializable नहीं अपवाद है जब पढ़ने काफ्का स्पार्क स्ट्रीमिंग

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...> 

यहाँ कोड के संबंधित भाग हैं:

val ssc = new StreamingContext(sc, Seconds(2)) 

val topics = Array("ecfs") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream 
    .map(_.value()) 
    .flatMap(message => { 
    // parsing here... 
    }) 
    .foreachRDD(rdd => { 
    // processing here... 
    }) 

ssc.start() 

जो मैं बता सकता हूं, यह वह पंक्ति है जो .map(_.value()) समस्या का कारण बन रही है, यह कैसे तय किया जा सकता है?

उत्तर

0

आप डीस्ट्रीम पर मैप का उपयोग नहीं कर सकते: [स्ट्रिंग, स्ट्रिंग] जैसे आप वहां इस्तेमाल करते थे। मुझे लगता है कि आप को बदलने का उपयोग करें और फिर नक्शे लागू के रूप में

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

इस प्रकार है या आप .map उपयोग कर सकते हैं के रूप में आप का इस्तेमाल किया बल्कि _.value() आप नक्शे में एक समारोह भेजने का प्रयास करना चाहिए, मैं की तरह कर सकते हैं

stream.map{case (x, y) => (y.toString)} 
से नीचे किया
संबंधित मुद्दे