2017-08-11 26 views
5

1) हम संरचित स्ट्रीमिंग का उपयोग करके कफका से उपभोग कर रहे हैं और संसाधित डेटा सेट को एस 3 पर लिख रहे हैं। हम भी संसाधित डेटा को कफका आगे बढ़ाना चाहते हैं, क्या यह एक ही स्ट्रीमिंग क्वेरी से करना संभव है? (स्पार्क संस्करण 2.1.1)स्पार्क संरचित स्ट्रीमिंग: एकाधिक सिंक

2) लॉग में, मुझे स्ट्रीमिंग क्वेरी प्रगति आउटपुट दिखाई देता है और मेरे पास लॉग से जेएसओएन नमूना अवधि है, क्या कोई कृपया एडबैच के बीच अंतर के बारे में अधिक स्पष्टता प्रदान कर सकता है और getBatch?

3) ट्रिगर एक्सेक्यूशन - क्या यह समय निकाला गया डेटा संसाधित करने और सिंक को लिखने का समय है?

"durationMs" : { 
    "addBatch" : 2263426, 
    "getBatch" : 12, 
    "getOffset" : 273, 
    "queryPlanning" : 13, 
    "triggerExecution" : 2264288, 
    "walCommit" : 552 
    }, 

संबंध aravias

उत्तर

6

1) हां।

स्पार्क 2.1.1 में, आप काफ़का में अपना डेटा लिखने के लिए writeStream.foreach का उपयोग कर सकते हैं। इस ब्लॉग में एक उदाहरण है: https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

या आप स्पार्क 2.2.0 का उपयोग कर सकते हैं जो कफका को आधिकारिक तौर पर कफका को लिखने का समर्थन करने के लिए जोड़ता है।

2) getBatch मापता है कि स्रोत से डेटाफ्रेम बनाने में कितना समय लगता है। यह आमतौर पर बहुत तेज़ है। addBatch एक सिंक में डेटाफ्रेम को चलाने में कितना समय लगता है।

3) triggerExecution उपायों कितनी देर तक एक ट्रिगर निष्पादन को चलाने के लिए, आम तौर पर लगभग रूप में getOffset + getBatch + addBatch में ही है।

+0

धन्यवाद, आप कृपया स्पष्ट कर सकते हैं निम्नलिखित - जब एक डेटासेट एक स्रोत विषय से बनाए लिखने के लिए दोनों S3 और काफ्का की जांच की ओर इशारा करते हैं, उन डूब से प्रत्येक के लिए अलग से निर्दिष्ट किया जाना है तो यह उचित होगा उम्मीद है कि डेटा को स्रोत स्रोत से दो बार पढ़ा जाएगा, भले ही उस स्रोत से बनाए गए समान डेटासेट का उपयोग इन 2 diff सिंकों को लिखने के लिए किया जाए? – user2221654

+0

यदि आपके पास दो सिंक हैं, तो इसका मतलब है कि आपके पास दो प्रश्न हैं। प्रत्येक प्रश्न का अपना काफ्का उपभोक्ता होता है और स्वतंत्र रूप से काफ्का से डेटा प्राप्त करता है। – zsxwing

0

इसी तरह की स्थिति से संबंधित प्रश्न मिला, मैं दो कफका सिंकों को डेटा लिखने की कोशिश कर रहा हूं। मुझे नीचे क्लासएक्सएक्सप्शन मिल रहा है। कोड इस तरह दिखता है

final Dataset<String> eventDataset = feedMessageDataset 
      .map(toEvent(nodeCodeToAliasBroadcast), OBSERVED_EVENT_ENCODER) 
      .map(SparkFeedReader::serializeToJson, STRING()); 
    final StreamingQuery eventQuery = kafkaStreamWriterForEvents(eventDataset, configuration, feedReaderEngineName).start(); 

    final Dataset<String> splunkEventDataset = feedMessageDataset 
      .map(toSplunkEvent(), SPLUNK_OBSERVED_EVENT_ENCODER) 
      .filter(event -> !event.getIndicatorCode().equals(HEARBEAT_INDICATOR_CODE)) 
      .map(SparkFeedReader::serializeToJson, STRING()); 

    final StreamingQuery splunkEventQuery = kafkaStreamWriterForSplunkEvents(splunkEventDataset, configuration, feedReaderEngineName).start(); 

यदि मैं किसी एक सिंक पर टिप्पणी करता हूं तो यह ठीक काम करता है। यह स्पार्क 2.2.0 में होता है। प्रतिक्रिया के लिए

java.lang.ClassCastException: x.SplunkObservedEvent cannot be cast to x.ObservedEvent 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
संबंधित मुद्दे