2014-06-23 3 views
59

स्पार्क में स्काला का उपयोग कर, जब भी मैं saveAsTextFile का उपयोग कर बाहर परिणाम डंप, यह कई भागों में उत्पादन विभाजित करने के लिए लगता है। मैं बस इसके लिए एक पैरामीटर (पथ) गुजर रहा हूँ।saveAsTextFile को कैसे सहेजने के लिए एकाधिक फ़ाइल में आउटपुट विभाजित करें?

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 
  1. आउटपुट की संख्या reducers इसे इस्तेमाल करता है की संख्या के अनुरूप है?
  2. इस उत्पादन मतलब यह है संकुचित है?
  3. मैं जानता हूँ कि मैं उत्पादन एक साथ पार्टी का उपयोग कर को जोड़ सकते हैं, लेकिन वहाँ एक भी पाठ फ़ाइल में उत्पादन स्टोर करने के लिए एक विकल्प है, बंटवारे के बिना ?? मैंने एपीआई दस्तावेज़ों को देखा, लेकिन यह इसके बारे में ज्यादा कुछ नहीं कहता है।
+1

यह आम तौर पर खराब व्यवहार का केवल बिग डाटा में एक फ़ाइल का उपयोग करने के लिए करता है, तो उस फ़ाइल बड़ी है। – samthebest

+0

क्या सबसे अच्छा अभ्यास तो उत्पादन किया गया था, कहते हैं, एक क्रमबद्ध फ़ाइल यदि है? फ़ाइलों का एक संग्रह के रूप में रखना और कई आउटपुट फ़ाइल नाम सूचकांक में किसी प्रकार का हो कर (यानी प्रथम फ़ाइल की तरह कुछ, "आ" नाम दिया गया है, बीच वाले "FG" जैसा होगा पिछले एक "zzy")? – Rdesmond

+0

अक्सर यह मामला है कि एक भारी स्पार्क नौकरी केवल एक बहुत ही छोटा आउटपुट (एकत्रीकरण, केपीआईएस, लोकप्रियता, ...) उत्पन्न करती है जो एचडीएफएस पर उत्पादित होती है, लेकिन बाद में इसे बड़े डेटा से संबंधित अनुप्रयोगों द्वारा उपयोग किया जाने की संभावना है। इस मामले में क्लीनर और आसान स्थानान्तरण और खपत के लिए एक अच्छी तरह से नामित एकल फ़ाइल है। –

उत्तर

84

कारण यह कई फाइलों के रूप में सहेजता है क्योंकि गणना वितरित की जाती है। उत्पादन, इतने छोटे ऐसे आपको लगता है कि आप एक मशीन पर फिट कर सकते हैं, तो आप

val arr = year.collect() 

के साथ अपने कार्यक्रम समाप्त कर सकते हैं और फिर एक फ़ाइल के रूप में जिसके परिणामस्वरूप सरणी बचाने के लिए, एक और तरीका है एक कस्टम का उपयोग किया जाएगा विभाजनकर्ता, partitionBy, और इसे सब कुछ एक विभाजन में जाता है हालांकि यह सलाह नहीं दी जाती है क्योंकि आपको कोई समांतरता नहीं मिलेगी।

आप फ़ाइल की आवश्यकता होती है saveAsTextFile साथ बचाया जा आप coalesce(1,true).saveAsTextFile() उपयोग कर सकते हैं। इसका मूल रूप से गणना है कि गणना को 1 विभाजन में जोड़ दें। आप repartition(1) का भी उपयोग कर सकते हैं जो coalesce के लिए सिर्फ एक रैपर है जो शफल तर्क को सत्य पर सेट करता है। RDD.scala के स्रोत के माध्यम से देख रहा है कि मैंने इनमें से अधिकांश चीजों को कैसे देखा, आपको एक नज़र रखना चाहिए।

+1

आप सरणी को टेक्स्ट फ़ाइल के रूप में कैसे सहेजते हैं ?? किसी सरणी के लिए कोई saveAsTextFile फ़ंक्शन नहीं है। सिर्फ आरडीडी के लिए। – user2773013

+2

@ user2773013 अच्छी तरह से इसके लिए दृष्टिकोण 'coalesce' या' विभाजन 'दृष्टिकोण सुझाया गया है, लेकिन वास्तव में एचडीएफएस पर संग्रहीत करने में कोई बात नहीं है अगर यह केवल 1 नोड पर है, इसलिए क्यों संग्रह का उपयोग करना वास्तव में सही तरीका है – aaronman

+1

धन्यवाद @ अरोमान !!! – user2773013

16

आप coalesce(1) और फिर saveAsTextFile() कह सकते हैं - लेकिन यह एक बुरा विचार हो सकता है अगर आप डेटा का एक बहुत कुछ है। अलग-अलग मैपर्स और रेड्यूसर अलग-अलग फ़ाइलों को लिखने के लिए प्रति विभाजन अलग-अलग फ़ाइलों को हडोप में उत्पन्न किया जाता है। एक आउटपुट फ़ाइल होने पर केवल एक अच्छा विचार है यदि आपके पास बहुत कम डेटा है, तो इस मामले में आप एकत्र() भी कर सकते हैं, जैसा कि @ अरोमान ने कहा था।

+0

नाइस ने विभाजनकर्ता के साथ गड़बड़ करने की तुलना में 'coalesce' क्लीनर के बारे में नहीं सोचा था, कहा जा रहा है कि मुझे अभी भी लगता है कि आपका लक्ष्य इसे एक फ़ाइल 'संग्रह' में लाने के लिए यह संभवतः – aaronman

+1

करने का सही तरीका है। लेकिन, यदि आप कोलेस का उपयोग करते हैं, तो इसका मतलब है कि आप केवल 1 रेड्यूसर का उपयोग कर रहे हैं। क्या यह प्रक्रिया धीमा नहीं करेगा क्योंकि केवल 1 reducer का उपयोग किया जाता है ?? – user2773013

+1

हां, लेकिन यही वह है जिसे आप पूछ रहे हैं। स्पार्क प्रति विभाजन एक फ़ाइल आउटपुट करता है।othe ओर, तुम क्यों फ़ाइलों की संख्या के बारे में परवाह करते हैं? जब चिंगारी में फ़ाइलों को पढ़ने तुम सिर्फ मूल निर्देशिका निर्दिष्ट कर सकते हैं और विभाजन के सभी एक ही RDD – David

2

आप वर्तमान संस्करण 1.0.0 में स्पार्क के अगले संस्करण में ऐसा करने में सक्षम होंगे, यह संभव नहीं है जब तक कि आप मैन्युअल रूप से ऐसा नहीं करते हैं, उदाहरण के लिए, जैसा कि आपने बताया है, एक बैश स्क्रिप्ट कॉल के साथ।

+0

धन्यवाद! – user2773013

+0

स्पार्क के अगले संस्करण यहाँ है और यह स्पष्ट नहीं है कि यह कैसे करना है :( –

1

मैं भी कि प्रलेखन स्पष्ट रूप से कहा गया है कि उपयोगकर्ताओं को सावधान जब बुला विभाजन का एक वास्तविक छोटी संख्या के साथ सम्मिलित किया जाना चाहिए उल्लेख करना चाहते हैं। यह विभाजन की इस संख्या को प्राप्त करने के लिए अपस्ट्रीम विभाजन का कारण बन सकता है।

मैं सम्मिलित (1) जब तक वास्तव में आवश्यक उपयोग करने की अनुशंसा नहीं होता।

2

स्पार्क में 1.6.1 प्रारूप के रूप में नीचे दिखाया गया है। यह एक आउटपुट फ़ाइल बनाता है। अगर आउटपुट को संभालने के लिए पर्याप्त छोटा होता है तो इसका उपयोग करने के लिए सबसे अच्छा अभ्यास है। वास्तव में यह क्या करता है कि यह एक नया आरडीडी देता है जो numPartitions विभाजन में कम हो जाता है। अगर आप एक कठोर सहवास कर रहे हैं, जैसे numPartitions = 1 करने के लिए, यह आपके गणना में हो सकता है कम नोड्स से आप

pair_result.coalesce(1).saveAsTextFile("/app/data/") 
2

दूसरों के रूप में उल्लेख किया है, तो आप को इकट्ठा करने या अपने डेटा सम्मिलित कर सकते हैं (उदाहरण के लिए numPartitions के मामले = 1 में एक नोड) की तरह है पर जगह ले जा एक फ़ाइल बनाने के लिए स्पार्क को मजबूर करने के लिए सेट करें। लेकिन यह स्पार्क कार्यों की संख्या को भी सीमित करता है जो आपके डेटासेट पर समानांतर में काम कर सकते हैं।मैं इसे उत्पादन HDFS निर्देशिका में एक सौ फ़ाइलों को बनाने के लिए, तो hadoop fs -getmerge /hdfs/dir /local/file.txt का उपयोग स्थानीय फाइल सिस्टम में एक एकल फाइल में परिणाम निकालने के लिए जाने के लिए पसंद करते हैं। यह सबसे अधिक समझ में आता है जब आपका आउटपुट अपेक्षाकृत छोटी रिपोर्ट है।

0

यहाँ एक एकल फाइल उत्पादन के लिए मेरा उत्तर है। मैं हाल ही में जोड़े coalesce(1)

val year = sc.textFile("apat63_99.txt") 
       .map(_.split(",")(1)) 
       .flatMap(_.split(",")) 
       .map((_,1)) 
       .reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 

कोड:

year.coalesce(1).saveAsTextFile("year") 
1

आप repartition() फोन और इस तरह से पालन कर सकते हैं:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 

var repartitioned = year.repartition(1) 
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

enter image description here

1

एक बड़ा डाटासेट के साथ काम कर रहे लोगों के लिए और अभी भी चिंगारी के समानांतरवाद से लाभ के लिए तैयार है, rdd.coalesce(1).saveAsTextFile("path")समाधान नहीं है। पूरी पाइपलाइन (अंतिम स्पार्क एक्शन से स्टोरेज तक) को 1 एक्जिक्यूटर पर निष्पादित किया जाएगा।

इसके बजाय आप पहले निष्पादकों में चाहे जितने पर अपने पाइपलाइन निष्पादित और का उपयोग saveAsTextFile (जो उत्पादन में कई फ़ाइलों का उत्पादन करेगा) और फिर केवल apache FileSystem एपीआई का उपयोग कर इन सभी फ़ाइलों को मर्ज कर सकते हैं।

निम्न विधि स्टोर करने के लिए RDD दिया जाता है और पथ जहां यह स्टोर करने के लिए:

import org.apache.spark.rdd.RDD 
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} 
import org.apache.hadoop.conf.Configuration 

def saveAsSingleTextFile(
    outputRDD: RDD[String], 
    outputFile: String 
): Unit = { 

    // Classic saveAsTextFile in a temporary folder: 
    outputRDD.saveAsTextFile(outputFile + ".tmp") 

    // The facility allowing file manipulations on hdfs: 
    val hdfs = FileSystem.get(new Configuration()) 

    // Merge the folder into a single file: 
    FileUtil.copyMerge(
    hdfs, 
    new Path(outputFile + ".tmp"), 
    hdfs, 
    new Path(outputFile), 
    true, 
    new Configuration(), 
    null) 

    // And we delete the intermediate folder: 
    hdfs.delete(new Path(outputFile + ".tmp"), true) 
} 

इस तरह प्रसंस्करण अभी भी वितरित किया जाता है और विलय हिस्सा बाद में किया जाता है, जो प्रदर्शन में नुकसान की सीमा।

बोनस में आप आउटपुट फ़ाइल का सही नाम प्रदान कर सकते हैं, इसके विपरीत rdd.coalesce (1) .saveAsTextFile ("मेरी/पथ") जो फ़ाइल मेरी/path/अंशकालिक 00000 पैदा करता है।

संबंधित मुद्दे