2016-05-24 9 views
8

के साथ udf के लिए डेटा फ्रेम कॉलम और बाहरी सूची को पास करना मेरे पास निम्न संरचना के साथ स्पार्क डेटाफ्रेम है। BodyText_token में टोकन (संसाधित/शब्दों का सेट) है। और मैं परिभाषित कीवर्डकॉलम

root 
|-- id: string (nullable = true) 
|-- body: string (nullable = true) 
|-- bodyText_token: array (nullable = true) 

keyword_list=['union','workers','strike','pay','rally','free','immigration',], 
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

मैं कितने टोकन प्रत्येक कीवर्ड सूची के अंतर्गत आते हैं की जाँच करें और मौजूदा dataframe का एक नया स्तंभ के रूप में परिणाम को जोड़ने के लिए की जरूरत के लिए एक आंतरिक सूची है। उदाहरण: यदि tokens =["become", "farmer","rally","workers","student"] परिणाम होगा -> [1,2,0]

निम्न कार्य अपेक्षित के रूप में काम करता है।

def label_maker_topic(tokens,topic_words): 
    twt_list = [] 
    for i in range(0, len(topic_words)): 
     count = 0 
     #print(topic_words[i]) 
     for tkn in tokens: 
      if tkn in topic_words[i]: 
       count += 1 
     twt_list.append(count) 

    return twt_list 

मैंने फ़ंक्शन तक पहुंचने के लिए कॉलम के साथ udf का उपयोग किया और मुझे एक त्रुटि मिली। मुझे लगता है कि यह एक udf को बाहरी सूची पास करने के बारे में है। क्या कोई तरीका है कि मैं बाहरी सूची और डेटाफ्रम कॉलम को udf पर पास कर सकता हूं और अपने डेटाफ्रेम में एक नया कॉलम जोड़ सकता हूं?

topicWord = udf(label_maker_topic,StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

उत्तर

20

साफ समाधान बंद का उपयोग करके अतिरिक्त तर्क पारित करने के लिए है:

def make_topic_word(topic_words): 
    return udf(lambda c: label_maker_topic(c, topic_words)) 

df = sc.parallelize([(["union"],)]).toDF(["tokens"]) 

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) 
    .show()) 

यह keyword_list में किसी भी बदलाव या समारोह आप यूडीएफ के साथ रैप आवश्यकता नहीं है। आप मनमाने ढंग से ऑब्जेक्ट को पास करने के लिए इस विधि का भी उपयोग कर सकते हैं। इसका उपयोग उदाहरण के लिए कुशल लुकअप के लिए sets की एक सूची को पास करने के लिए किया जा सकता है।

आप अपने वर्तमान यूडीएफ का उपयोग करें और topic_words पास करना चाहते हैं सीधे आप इसे एक स्तंभ शाब्दिक पहले कन्वर्ट करने के लिए होगा: अपने डेटा और आवश्यकताओं को वहां वैकल्पिक, और अधिक कुशल समाधान कर सकते हैं पर निर्भर करता है

from pyspark.sql.functions import array, lit 

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) 
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

, जिसके लिए यूडीएफ (विस्फोट + कुल + पतन) या लुकअप (हैशिंग + वेक्टर ऑपरेशंस) की आवश्यकता नहीं होती है।

7

निम्नलिखित काम करता है ठीक है, जहां किसी भी बाहरी पैरामीटर यूडीएफ के लिए पारित किया जा सकता है (एक ट्वीक कोड किसी को मदद करने के लिए)

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token)) 
+0

यह काम करता है लेकिन मैं इस के साथ सावधान रहना होगा, क्योंकि यूडीएफ 'topic_words होगा इस समय मूल्य udf परिभाषित किया गया था। तो 'topic_words' को बदलना और बाद में udf का पुनः उपयोग करना काम नहीं करेगा - यह अभी भी udf परिभाषित किए जाने पर 'topic_words' के मान का उपयोग करेगा। – CHP