2015-11-15 9 views
5

मैं एक मान से समूह करना चाहता हूं और फिर प्रत्येक समूह में pieSpark का उपयोग करके अधिकतम मान पाता हूं। मेरे पास निम्न कोड है लेकिन अब मैं अधिकतम मूल्य निकालने के तरीके पर थोड़ा फंस गया हूं।pyspark: grouby और उसके बाद प्रत्येक समूह का अधिकतम मान

# some file contains tuples ('user', 'item', 'occurrences') 
data_file = sc.textData('file:///some_file.txt') 
# Create the triplet so I index stuff 
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) 
# Group by the user i.e. r[0] 
grouped = data_file.groupBy(lambda r: r[0]) 
# Here is where I am stuck 
group_list = grouped.map(lambda x: (list(x[1]))) #? 

रिटर्न कुछ की तरह:

[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]] 

मैं अब प्रत्येक उपयोगकर्ता के लिए अधिकतम 'घटना' लगाना चाहते हैं। अधिकतम करने के बाद अंतिम परिणाम एक RDD है कि इस तरह देखा परिणाम होगा:

[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]] 

कहाँ केवल अधिकतम डाटासेट फाइल में उन में से प्रत्येक के लिए ही बने रहेंगे। दूसरे शब्दों में, मैं आरडीडी के मूल्य को बदलना चाहता हूं ताकि प्रत्येक उपयोगकर्ता अधिकतम घटनाओं में केवल एक तिहाई हो।

उत्तर

9

groupBy यहाँ कोई जरूरत नहीं है। (X1, x2, कुंजी = लैम्ब्डा अधिकतम एक्स:: सरल reduceByKey ठीक करना होगा और समय की सबसे अधिक कुशल हो जाएगा:

data_file = sc.parallelize([ 
    (u'u1', u's1', 20), (u'u1', u's2', 5), 
    (u'u2', u's3', 5), (u'u2', u's2', 10)]) 

max_by_group = (data_file 
    .map(lambda x: (x[0], x)) # Convert to PairwiseRD 
    # Take maximum of the passed arguments by the last element (key) 
    # equivalent to: 
    # lambda x, y: x if x[-1] > y[-1] else y 
    .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
    .values()) # Drop keys 

max_by_group.collect() 
## [('u2', 's2', 10), ('u1', 's1', 20)] 
+1

आप इस '(लैम्ब्डा x1, x2 समझा सकता है एक्स [-1])) 'यदि संभव हो तो? – WoodChopper

+1

@WoodChopper 'max' सिर्फ एक मानक पायथन' अधिकतम' है। यह तत्व लेता है और सबसे बड़ा लौटाता है। 'कुंजी' तर्क वर्णन करता है कि तत्वों की तुलना कैसे की जानी चाहिए (यहां अंतिम वस्तु से)। – zero323

2

मुझे लगता है मैं समाधान नहीं मिला:

from pyspark import SparkContext, SparkConf 

def reduce_by_max(rdd): 
    """ 
    Helper function to find the max value in a list of values i.e. triplets. 
    """ 
    max_val = rdd[0][2] 
    the_index = 0 

    for idx, val in enumerate(rdd): 
     if val[2] > max_val: 
      max_val = val[2] 
      the_index = idx 

    return rdd[the_index] 

conf = SparkConf() \ 
    .setAppName("Collaborative Filter") \ 
    .set("spark.executor.memory", "5g") 
sc = SparkContext(conf=conf) 

# some file contains tuples ('user', 'item', 'occurrences') 
data_file = sc.textData('file:///some_file.txt') 

# Create the triplet so I can index stuff 
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) 

# Group by the user i.e. r[0] 
grouped = data_file.groupBy(lambda r: r[0]) 

# Get the values as a list 
group_list = grouped.map(lambda x: (list(x[1]))) 

# Get the max value for each user. 
max_list = group_list.map(reduce_by_max).collect() 
संबंधित मुद्दे