2016-03-09 14 views
5

के साथ स्पार्क नौकरी मैं यूआरएल की सूची से आरडीडी का निर्माण करता हूं, और उसके बाद कुछ एसिंक http कॉल के साथ डेटा लाने की कोशिश करता हूं। मुझे अन्य गणना करने से पहले सभी परिणामों की आवश्यकता है। आदर्श रूप से, मुझे विचारों को स्केल करने के लिए अलग-अलग नोड्स पर http कॉल करने की आवश्यकता है।Async HTTP कॉल

मैं कुछ इस तरह किया:

//init spark 
val sparkContext = new SparkContext(conf) 
val datas = Seq[String]("url1", "url2") 

//create rdd 
val rdd = sparkContext.parallelize[String](datas) 

//httpCall return Future[String] 
val requests = rdd.map((url: String) => httpCall(url)) 

//await all results (Future.sequence may be better) 
val responses = requests.map(r => Await.result(r, 10.seconds)) 

//print responses 
response.collect().foreach((s: String) => println(s)) 

//stop spark 
sparkContext.stop() 

इस काम है, लेकिन स्पार्क काम खत्म कभी नहीं!

तो मुझे आश्चर्य है कि क्या है स्पार्क का उपयोग कर भविष्य के साथ काम कर (भविष्य [RDD] या) के लिए सर्वोत्तम प्रथाओं हैं।

मुझे लगता है कि यह उपयोग केस बहुत आम दिखता है, लेकिन अभी तक कोई जवाब नहीं मिला।

सादर

उत्तर

4

उपयोग के इस मामले

नहीं वास्तव में बहुत आम लग रहा है, क्योंकि यह बस के रूप में आप (शायद) की उम्मीद काम नहीं करता। चूंकि प्रत्येक कार्य मानक स्कैला Iterators पर संचालित होता है, इसलिए इन परिचालनों को एक साथ squashed किया जाएगा। इसका मतलब है कि सभी परिचालन अभ्यास में अवरुद्ध हो जाएंगे। मान लें कि आप तीन URL की [ "एक्स", "वाई", "Z"] क्या आप कोड एक निम्न क्रम में निष्पादित किया जाएगा है:

Await.result(httpCall("x", 10.seconds)) 
Await.result(httpCall("y", 10.seconds)) 
Await.result(httpCall("z", 10.seconds)) 

आप आसानी से स्थानीय स्तर पर एक ही व्यवहार को पुनः कर सकते हैं। आप अपने कोड निष्पादित करने के लिए चाहते हैं एसिंक्रोनस रूप से आप स्पष्ट रूप से mapPartitions का उपयोग कर इस संभाल चाहिए:

rdd.mapPartitions(iter => { 
    ??? // Submit requests 
    ??? // Wait until all requests completed and return Iterator of results 
}) 

लेकिन यह अपेक्षाकृत मुश्किल है। किसी दिए गए विभाजन के लिए सभी डेटा मेमोरी में फ़िट होने की कोई गारंटी नहीं है, इसलिए आपको शायद कुछ बैचिंग तंत्र की भी आवश्यकता होगी।

यह सब कहा जा रहा है कि मैंने जिस समस्या का वर्णन किया है उसे पुन: उत्पन्न नहीं कर सकता है, यह कुछ कॉन्फ़िगरेशन समस्या हो सकती है या httpCall के साथ समस्या हो सकती है।

एक साइड नोट पर पूरे कार्य को मारने के लिए एक टाइमआउट की अनुमति देने से एक अच्छा विचार नहीं दिखता है।

1

यह काम नहीं करेगा।

आप अनुरोध वस्तुओं को वितरित नहीं कर सकते हैं और क्लस्टर पर अन्य नोड्स द्वारा एकत्रित प्रतिक्रियाओं की अपेक्षा नहीं कर सकते हैं। यदि आप करते हैं तो भविष्य के लिए स्पार्क कॉल कभी खत्म नहीं होगा। इस मामले में वायदा कभी काम नहीं करेगा।

तो अपने नक्शे() बनाने के सिंक (HTTP) अनुरोध तो एक ही कार्रवाई/परिवर्तन कॉल के भीतर प्रतिक्रियाओं इकट्ठा कृपया और उसके बाद परिणाम (प्रतिक्रियाएं) आगे मानचित्र के अधीन/अन्य कॉल को कम /।

आपके मामले में, तर्क सिंक में प्रत्येक कॉल के लिए प्रतिक्रियाओं को इकट्ठा पुनर्लेखन और वायदा की धारणा को दूर तो सब ठीक होना चाहिए कृपया।

+0

समस्या 'अनुरोध' और 'प्रतिक्रियाओं' के बीच कोई डेटा आंदोलन नहीं होना चाहिए, इसलिए दोनों परिवर्तन एक ही चरण में निष्पादित किए जाने चाहिए, इसलिए एक ही निष्पादक और संदर्भ। – zero323

1

अंत में मैंने इसे डिस्पैच के बजाय स्केलज-http का उपयोग करके बनाया। कॉल तुल्यकालिक हैं, लेकिन यह मेरे उपयोग के मामले से मेल खाता है।

मुझे लगता है कि स्पार्क नौकरी डिस्पैच का उपयोग कभी खत्म नहीं करती है क्योंकि एचटीपी कनेक्शन ठीक से बंद नहीं किया गया था।

बेस्ट सादर

1

मैं इस लक्ष्य को हासिल करने के लिए एक आसान तरीका लगता है सके। लेकिन कई बार पुनरावृत्ति के पुनरावृत्ति के बाद मैंने यही किया और यह प्रश्नों की एक बड़ी सूची के लिए काम कर रहा है। असल में हमने कई उप प्रश्नों में एक विशाल क्वेरी के लिए बैच ऑपरेशन करने के लिए इसका इस्तेमाल किया।

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries 
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing 
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) 

// Then map each one those to a Spark Task, in this case its a Future that returns a string 
val tasks: RDD[Future[String]] = queries.map(query => { 
    val task = makeHttpCall(query) // Method returns http call response as a Future[String] 
    task.recover { 
     case ex => logger.error("recover: " + ex.printStackTrace()) } 
    task onFailure { 
     case t => logger.error("execution failed: " + t.getMessage) } 
    task 
}) 

// Note:: Http call is still not invoked, you are including this as part of the lineage 

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it 
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved 

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => 
    val searchFuture: Future[Iterator[String]] = Future sequence f 
    Await.result(searchFuture, threadWaitTime.seconds) 
} 

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

आप प्रतिक्रिया पेलोड पार्स करने, आदि तो फिर तुम foreachPartition बजाय mapPartitions इस्तेमाल कर सकते हैं उन सभी के लिए http तुरंत कॉल प्रदर्शन करने के लिए की तरह सामग्री पर कोई परिवर्तन करना चाहते न तो।

संबंधित मुद्दे