मैं [डाटाबेस] [1] से उदाहरण को पुन: उत्पन्न करने की कोशिश कर रहा था और इसे कफका और स्पार्क संरचित स्ट्रीमिंग के नए कनेक्टर पर लागू करता हूं, हालांकि मैं स्पार्क में आउट-ऑफ-द-बॉक्स विधियों का उपयोग करके जेसन को सही तरीके से पार्स नहीं कर सकता ..kafka कनेक्ट 0.10 और स्पार्क संरचित स्ट्रीमिंग से_json का उपयोग कैसे करें?
नोट: विषय जेसन प्रारूप में कफका में लिखा गया है।
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
निम्नलिखित कोड काम नहीं करेगा, क्योंकि मेरा मानना है कि स्तंभ json एक स्ट्रिंग है और विधि from_json हस्ताक्षर से मेल नहीं खाता है कि है ...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
किसी भी सुझाव दिए गए?
[अद्यतन] उदाहरण काम कर रहे: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala
आप संकलक चेतावनी अगर "मूल्य $ सदस्य नहीं है ..." कृपया मत भूलना आयात के बारे में spark.implicits._ यह मेरे अतिरिक्त 5-10 मिनट लग गए सवाल मेरे लिए – user1459144
यह पता लगाने की क्या, कौन सी लाइब्रेरी "from_json" नामक फ़ंक्शन प्रदान कर रही है? मुझे लगता है कि ऐसा नहीं लगता !!! कृपया मदद करें .. – Gyan
@ राघव -> आयात org.apache.spark.sql.functions._ यहां उदाहरण देखें: https://github.com/katsou55/kafka-spark- संरचित- स्ट्रीमिंग-example/blob/ मास्टर/src/मुख्य/स्केला-2.11/Main.scala –