2016-11-30 9 views
10

मैं एक RDD संरचनास्पार्क: RDD सूची में

RDD[(String, String)] 

है और मैं 2 सूची (RDD के प्रत्येक आयाम के लिए एक) बनाना चाहते हैं।

मैंने rdd.foreach() का उपयोग करने की कोशिश की और दो ListBuffers भरें और फिर उन्हें सूची में परिवर्तित करें, लेकिन मुझे लगता है कि प्रत्येक नोड अपनी सूची बफर बनाता है क्योंकि पुनरावृत्ति के बाद बफरलिस्ट खाली होते हैं। मैं यह कैसे कर सकता हूं ?

संपादित करें: मेरे दृष्टिकोण

val labeled = data_labeled.map { line => 
    val parts = line.split(',') 
    (parts(5), parts(7)) 
}.cache() 

var testList : ListBuffer[String] = new ListBuffer() 

labeled.foreach(line => 
    testList += line._1 
) 
    val labeledList = testList.toList 
    println("rdd: " + labeled.count) 
    println("bufferList: " + testList.size) 
    println("list: " + labeledList.size) 

और परिणाम है:

rdd: 31990654 
bufferList: 0 
list: 0 
+1

कृपया आप क्या के कोड के साथ अद्यतन कोशिश की है और कुछ इनपुट डेटा नमूना और अपेक्षित आउटपुट! आपका सवाल मेरे लिए बहुत स्पष्ट नहीं है। – eliasah

उत्तर

9

तुम सच में दो सूचियाँ बनाना चाहते हैं - जिसका अर्थ है, आप चाहते हैं में एकत्र होने के लिए सभी वितरित डेटा चालक अनुप्रयोग (धीमा या OutOfMemoryError) - आप collect का उपयोग कर सकते हैं और फिर परिणाम पर map संचालन का उपयोग कर सकते हैं:

val list: List[(String, String)] = rdd.collect().toList 
val col1: List[String] = list.map(_._1) 
val col2: List[String] = list.map(_._2) 

वैकल्पिक रूप से - अगर आप चाहते हैं करने के लिए "विभाजन" दो RDDs में अपने RDD - यह डेटा एकत्र किए बिना बहुत समान है: इन दो RDDs में

rdd.cache() // to make sure calculation of rdd is not repeated twice 
val rdd1: RDD[String] = rdd.map(_._1) 
val rdd2: RDD[String] = rdd.map(_._2) 

एक तीसरा विकल्प पहला नक्शा करने के लिए है और फिर उनमें से प्रत्येक को इकट्ठा करें, लेकिन यह पहले विकल्प से बहुत अलग नहीं है और उसी जोखिम और सीमाओं से ग्रस्त है।

+0

@Yuriy कैसे प्रसारण चर (जो केवल पढ़ने के लिए) यहां आते हैं? क्या आप इसका अधिक वर्णन कर सकते हैं? – avr

+0

@avr ListBuffer उत्परिवर्तनीय है और '+ = 'उत्परिवर्ती आंतरिक स्थिति, नया संदर्भ नहीं बना रहा है। लेकिन आप सवाल अच्छा है, और अपरिवर्तनीय बयान (जहां किसी भी ऑपरेशन के लिए संदर्भ बदल रहा है) के लिए इसे कुछ (Serializable) से लपेटने की आवश्यकता है। सूची के लिए सरल उदाहरण: 'वैल testList = sc.broadcast (नई Serializable {var list = List.empty [स्ट्रिंग]}) ', और आंतरिक स्थिति को उत्परिवर्तित करने के बाद। – Yuriy

+0

@Yuriy मुझे लगता है कि एवीआर सही है और आपने उसके प्रश्न को गलत समझा - यह म्यूटेबल बनाम अपरिवर्तनीय संग्रह का मामला नहीं है - प्रसारण चर केवल _read केवल इस अर्थ में है कि अगर उनके मान निष्पादक पर बदल जाते हैं, तो ड्राइवर कोड ' इस परिवर्तन को नहीं देखते (स्पार्क सभी निष्पादकों द्वारा किए गए परिवर्तनों को कैसे बढ़ाएगा?)। तथ्य यह है कि यह स्थानीय मोड में काम करता है ज्यादातर बग की तरह दिखता है, यह काम नहीं करेगा जहां क्लस्टर वास्तव में वितरित किया जाता है। –

1

रूप Tzach ज़ोहर के जवाब के लिए एक विकल्प है, तो आप सूची में unzip उपयोग कर सकते हैं:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d"))) 
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val (l1, l2) = myRDD.collect.toList.unzip 
l1: List[String] = List(a, c) 
l2: List[String] = List(b, d) 

या keys और valuesRDD रों पर:

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values) 
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33 
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33 

scala> rdd1.foreach{println} 
a 
c 

scala> rdd2.foreach{println} 
d 
b 
संबंधित मुद्दे