2015-06-26 14 views
7

पर जेएसओएन संदेश से स्पार्क स्ट्रीमिंग में स्पार्क डेटाफ्रेम बनाएं, मैं स्कैला में स्पार्क स्ट्रीमिंग के कार्यान्वयन पर काम कर रहा हूं, जहां मैं एक कफका विषय से JSON स्ट्रिंग खींच रहा हूं और उन्हें डेटाफ्रेम में लोड करना चाहता हूं। क्या ऐसा करने का कोई तरीका है जहां स्पार्क आरडीडी [स्ट्रिंग] से स्वयं के स्कीमा का उल्लंघन करता है?कफका

sqlContext.read 
//.schema(schema) //optional, makes it a bit faster, if you've processed it before you can get the schema using df.schema 
.json(jsonRDD) //RDD[String] 

मैं क्या करने की कोशिश कर रहा हूँ:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    val yourDataFrame = hiveContext.createDataFrame(yourRDD) 

उत्तर

2

चिंगारी 1.4 में, आप RDD से एक Dataframe उत्पन्न करने के लिए निम्न विधि की कोशिश कर सकते इस समय एक ही। मुझे उत्सुकता है कि आपको कफका से आरडीडी [स्ट्रिंग] कैसे मिला, हालांकि, मैं अभी भी इंप्रेशन के तहत हूं स्पार्क + कफका केवल एक बार बैच के बाहर "बाहर निकलने के बजाय स्ट्रीमिंग करता है"। :)

+1

यह निम्नलिखित प्रश्न के रूप में समान है: http: //stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark – sparklearner

3

हाँ, आप निम्न का उपयोग कर सकते हैं:

+3

आप काफ्का से गैर-स्ट्रीमिंग आरडीडी प्राप्त करने के लिए KafkaUtils.createRDD का उपयोग कर सकते हैं –

1

आप, काफ्का से संदेश की धारा में पढ़ने के लिए नीचे दिए गए कोड का उपयोग कर सकते JSON मूल्यों निकालें और उन्हें DataFrame में बदलने का:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

messages.foreachRDD { rdd => 
//extracting the values only 
    val df = sqlContext.read.json(rdd.map(x => x._2)) 
    df.show() 
} 
संबंधित मुद्दे