2017-06-10 6 views
5

द्वारा स्पार्क आरडीडी फिल्टर मेरे पास विभिन्न प्रकार के तत्वों के साथ एक आरडीडी है, और मैं उन्हें अपने प्रकार से गिनना चाहता हूं, उदाहरण के लिए, नीचे दिया गया कोड सही तरीके से काम करेगा।तत्व वर्ग

scala> val rdd = sc.parallelize(List(1, 2.0, "abc")) 
rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at <console>:24 

scala> rdd.filter{case z:Int => true; case _ => false}.count 
res0: Long = 1 

scala> rdd.filter{case z:String => true; case _ => false}.count 
res1: Long = 1 

अब यदि उपयोगकर्ता उपयोगकर्ता परिभाषित प्रकार के हैं, तो नीचे दिया गया कोड अपेक्षा के अनुसार काम नहीं करेगा।

scala> class TypeA extends Serializable    // this is the base class 
defined class TypeA 

scala> case class TypeB(id:Long) extends TypeA  // derived class 1 
defined class TypeB 

scala> case class TypeC(name:String) extends TypeA // derived class 2 
defined class TypeC 

scala> val rdd1 = sc.parallelize(List(TypeB(123), TypeC("jack"), TypeB(456))) // create an rdd with different types of elements 
rdd1: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[3] at parallelize at <console>:29 

scala> rdd1.count    // total size is correct 
res2: Long = 3 

scala> rdd1.filter{case z:TypeB => true; case _ => false}.count // what the hell? 
res3: Long = 0 

scala> rdd1.filter{case z:TypeC => true; case _ => false}.count // again ? 
res4: Long = 0 

scala> rdd1.filter{case z:TypeA => true; case _ => false}.count // only works for the base class? 
res5: Long = 3 

क्या मुझे यहां कुछ याद आया? कृपया मदद करें!

उत्तर

3

यह Spark-1199 की भिन्नता जैसा दिखता है और संभवतः एक आरईपीएल बग है।

यह पैदावार अपेक्षित व्यवहार जब स्थानीय स्तर पर विचार के अंदर चल रहा है:

import org.apache.spark.SparkContext 

class TypeA extends Serializable 
case class TypeB(id:Long) extends TypeA 
case class TypeC(name:String) extends TypeA 

val sc = new SparkContext("local[*]", "swe") 
val rdd = sc.parallelize(List(TypeB(12), TypeC("Hsa"))) 

rdd.filter { case x: TypeB => true; case _ => false }.count() 

पैदावार:

import org.apache.spark.SparkContext 

defined class TypeA 
defined class TypeB 
defined class TypeC 

sc: org.apache.spark.SparkContext = [email protected] 
rdd: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[0] at parallelize at <console>:18 

[Stage 0:>....... (0 + 0)/4] 
res0: Long = 1 
+1

दरअसल, उफ़ !! अपने स्पार्क एनवी पर कोशिश करने के लिए धन्यवाद :-) – avocado

संबंधित मुद्दे