2017-02-28 18 views
7

मैं [डाटाबेस] [1] से उदाहरण को पुन: उत्पन्न करने की कोशिश कर रहा था और इसे कफका और स्पार्क संरचित स्ट्रीमिंग के नए कनेक्टर पर लागू करता हूं, हालांकि मैं स्पार्क में आउट-ऑफ-द-बॉक्स विधियों का उपयोग करके जेसन को सही तरीके से पार्स नहीं कर सकता ..kafka कनेक्ट 0.10 और स्पार्क संरचित स्ट्रीमिंग से_json का उपयोग कैसे करें?

नोट: विषय जेसन प्रारूप में कफका में लिखा गया है।

val ds1 = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", IP + ":9092") 
      .option("zookeeper.connect", IP + ":2181") 
      .option("subscribe", TOPIC) 
      .option("startingOffsets", "earliest") 
      .option("max.poll.records", 10) 
      .option("failOnDataLoss", false) 
      .load() 

निम्नलिखित कोड काम नहीं करेगा, क्योंकि मेरा मानना ​​है कि स्तंभ json एक स्ट्रिंग है और विधि from_json हस्ताक्षर से मेल नहीं खाता है कि है ...

val df = ds1.select($"value" cast "string" as "json") 
       .select(from_json("json") as "data") 
       .select("data.*") 

किसी भी सुझाव दिए गए?

[अद्यतन] उदाहरण काम कर रहे: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

उत्तर

9

सबसे पहले आप अपने JSON संदेश के लिए स्कीमा को परिभाषित करने की जरूरत है। उदाहरण के लिए

val schema = new StructType() 
    .add($"id".string) 
    .add($"name".string) 

अब आप इस स्कीमा का उपयोग नीचे from_json विधि में कर सकते हैं।

val df = ds1.select($"value" cast "string" as "json") 
      .select(from_json($"json", schema) as "data") 
      .select("data.*") 
+0

आप संकलक चेतावनी अगर "मूल्य $ सदस्य नहीं है ..." कृपया मत भूलना आयात के बारे में spark.implicits._ यह मेरे अतिरिक्त 5-10 मिनट लग गए सवाल मेरे लिए – user1459144

+0

यह पता लगाने की क्या, कौन सी लाइब्रेरी "from_json" नामक फ़ंक्शन प्रदान कर रही है? मुझे लगता है कि ऐसा नहीं लगता !!! कृपया मदद करें .. – Gyan

+0

@ राघव -> आयात org.apache.spark.sql.functions._ यहां उदाहरण देखें: https://github.com/katsou55/kafka-spark- संरचित- स्ट्रीमिंग-example/blob/ मास्टर/src/मुख्य/स्केला-2.11/Main.scala –

संबंधित मुद्दे