2016-03-01 12 views
6

मैं स्पार्क पर एक नौसिखिया हूँ दिया (मेरे संस्करण 1.6.0 है) और अब मैं नीचे दिए गए समस्या को हल करने की कोशिश कर रहा हूँ:कैसे प्रदर्शन करने के लिए "लुक" चिंगारी dataframes पर आपरेशन अनेक शर्तें

वहाँ मान लीजिए दो स्रोत फाइलें हैं:

  • पहला (ए शॉर्ट के लिए) एक बड़ा है जिसमें ए 1, बी 1, सी 1 और अन्य 80 कॉलम नामक कॉलम हैं। अंदर 230 के रिकॉर्ड हैं।
  • दूसरा (बी शॉर्ट के लिए) एक छोटी लुकअप टेबल है जिसमें ए 2, बी 2, सी 2 और डी 2 नामक कॉलम शामिल हैं। अंदर 250 रिकॉर्ड हैं।

    • सबसे पहले देखने A1, बी 1 और बी में सी 1 (इसी कॉलम ए 2, बी 2 और सी 2 कर रहे हैं), सफल अगर:

    अब हम नीचे तर्क दिया, एक में एक नया स्तंभ डालने की आवश्यकता , नए जोड़े गए कॉलम के मान के रूप में डी 2 वापस करें। यदि कुछ भी नहीं मिला ...

  • फिर बी में लुकअप ए 1, बी 1 यदि सफल हो, तो डी 2 लौटाएं। तो कुछ भी नहीं मिला ...
  • डिफ़ॉल्ट मान "NA"

मैं पहले से ही फाइलों में पढ़ सकते हैं और उन्हें डेटा फ्रेम में परिवर्तित कर दिया है निर्धारित करें। पहली स्थिति के लिए, मुझे परिणाम मिलाकर उन्हें एक साथ जोड़ दिया गया। लेकिन मुझे अगले चरण में अच्छा रास्ता नहीं मिल रहा है।

मेरी वर्तमान कोशिश कम सख्त स्थिति का उपयोग करके ए और बी में शामिल होने से एक नया डेटा फ्रेम बनाना है। हालांकि मुझे कोई संकेत नहीं है कि वर्तमान डेटा फ्रेम को दूसरे से कैसे अपडेट किया जाए। या क्या पूरी समस्या से निपटने के लिए कोई और अधिक सहज और कुशल तरीका है?

सभी उत्तरों के लिए धन्यवाद।

----------------------------- 2016030 9 -------------- पर अपडेट करें ------------------

अंततः @mlk का जवाब स्वीकार कर लिया। यूडीएफ बनाम जुड़ने पर उनकी महान टिप्पणियों के लिए @ zero323 पर अभी भी बहुत बढ़िया धन्यवाद, टंगस्टन कोड पीढ़ी वास्तव में एक और समस्या है जिसका हम सामना कर रहे हैं। लेकिन चूंकि हम हर देखने के लिए देखने के स्कोर और औसत 4 की स्थिति क्या करने की जरूरत, पूर्व समाधान अधिक उपयुक्त है ...

अंतिम समाधान किसी भी तरह से नीचे की तरह लग रहा है स्निपेट:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

उत्तर

4

बी है छोटा मुझे लगता है कि ऐसा करने का सबसे अच्छा तरीका प्रसारण चर और उपयोगकर्ता परिभाषित फ़ंक्शन होगा।

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

शायद यह जाने का तरीका है। मैंने 'जॉइन' के साथ एक वैकल्पिक समाधान भी प्रदान किया। – zero323

+0

धन्यवाद mlk। यदि लुकअप टेबल बड़ा है (500 के * 50), क्या यह अभी भी प्रसारण करना अच्छा है? –

+0

और मेरा दूसरा सवाल यह है कि, मुझे अलग-अलग कॉलम पर 30 लुकअप करने की ज़रूरत है, और 50 यूडीएफ लिखना है, क्या प्रदर्शन प्रभावित होगा? –

2

बस संदर्भ के लिए UDFs के बिना एक समाधान:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

इसका मतलब यह है प्रत्येक श्रेणी में सबसे कम एक मैच (सभी तीन स्तंभों, या पहले दो) या डुप्लिकेट पंक्तियों उत्पादन में हैं नहीं है चाहा हे।

यूडीएफ बनाम शामिल हों:

वहाँ कई कारकों पर विचार कर रहे हैं और वहाँ यहाँ कोई सीधा जवाब है:

विपक्ष:

  • प्रसारण joins आंकड़ों के दो बार गुजर की आवश्यकता होती है कार्यकर्ता नोड्स अब के लिए broadcasted टेबल कैश नहीं किए गए हैं (SPARK-3863) और निकटतम भविष्य में यह बदलने की संभावना नहीं है (संकल्प: बाद में)।
  • join पूर्ण मिलान होने पर भी ऑपरेशन दो बार लागू होता है।

पेशेवरों:

  • join और coalesce जबकि UDFs नहीं हैं अनुकूलक के लिए पारदर्शी है।
  • सीधे SQL अभिव्यक्तियों के साथ परिचालन कर सकते हैं, सभी टंगस्टन अनुकूलन से कोड पीढ़ी सहित लाभ हो सकता है जबकि यूडीएफ नहीं कर सकता है।
संबंधित मुद्दे