2017-02-15 10 views
5

मेरे पास 100 मिलियन पंक्तियों और 5000+ कॉलम के साथ एक डीएफ है। मैं कोल्क्स और शेष 5000+ कॉलम के बीच कोर खोजने की कोशिश कर रहा हूं।डीएफ में प्रत्येक समूह के लिए pyspark कोर (5K से अधिक कॉलम)

aggList1 = [mean(col).alias(col + '_m') for col in df.columns] #exclude keys 
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1) 
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4'])) 
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\ 
        for colmd in all5Kcolumns]) 


aggCols= [corr(colx, col).alias(col) for col in colsall5K] 
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols) 

अभी स्पार्क 64 केबी कोडेजन समस्या (यहां तक ​​कि स्पार्क 2.2) की वजह से यह काम नहीं कर रहा है। इसलिए मैं प्रत्येक 300 कॉलम के लिए लूपिंग कर रहा हूं और अंत में सभी विलय कर रहा हूं। लेकिन क्लस्टर में 30 नोड्स (10 कोर प्रत्येक और प्रत्येक नोड 100 जीबी के साथ) में 30 घंटे से अधिक समय ले रहा है। इसे ट्यून करने में कोई मदद? प्रत्येक पाश

+1

समूह 100 पंक्तियों से 5-10 मिलियन तक भिन्न हो सकते हैं, समूहों की संख्या 7000 होगी। – Harish

उत्तर

1

आप NumPy और RDDs का एक सा के साथ की कोशिश कर सकते में कैश - प्रत्येक पाश में चेकप्वाइंट -

बातें नीचे पहले से ही की कोशिश की - पुनः विभाजन DF 10,000 करने के लिए। सबसे पहले आयात का एक गुच्छा:

keys = ["key1", "key2", "key3"] # list of key column names 
xs = ["x1", "x2", "x3"] # list of column names to compare 
y = "y"       # name of the reference column 

और कुछ सहायकों:

def as_pair(keys, y, xs): 
    """ Given key names, y name, and xs names 
    return a tuple of key, array-of-values""" 
    key = itemgetter(*keys) 
    value = itemgetter(y, * xs) # Python 3 syntax 

    def as_pair_(row): 
     return key(row), np.array(value(row)) 
    return as_pair_ 

def init(x): 
    """ Init function for combineByKey 
    Initialize new StatCounter and merge first value""" 
    return StatCounter().merge(x) 

def center(means): 
    """Center a row value given a 
    dictionary of mean arrays 
    """ 
    def center_(row): 
     key, value = row 
     return key, value - means[key] 
    return center_ 

def prod(arr): 
    return arr[0] * arr[1:] 

def corr(stddev_prods): 
    """Scale the row to get 1 stddev 
    given a dictionary of stddevs 
    """ 
    def corr_(row): 
     key, value = row 
     return key, value/stddev_prods[key] 
    return corr_ 

और जोड़े की RDD को DataFrame कन्वर्ट:

pairs = df.rdd.map(as_pair(keys, y, xs)) 

from operator import itemgetter 
import numpy as np 
from pyspark.statcounter import StatCounter 

के कुछ चर निर्धारित करते हैं

अगला समूह प्रति आंकड़ों की गणना करते हैं:

stats = (pairs 
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats) 
    .collectAsMap()) 

means = {k: v.mean() for k, v in stats.items()} 

नोट: 5000 सुविधाओं और 7000 समूह के साथ वहाँ होना चाहिए स्मृति में इस संरचना रखने के साथ कोई समस्या नहीं। बड़े डेटासेट के साथ आपको आरडीडी और join का उपयोग करना पड़ सकता है लेकिन यह धीमा हो जाएगा।

केंद्र डेटा:

centered = pairs.map(center(means)) 

कंप्यूट सहप्रसरण:

covariance = (centered 
    .mapValues(prod) 
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats) 
    .mapValues(StatCounter.mean)) 

और अंत में सह-संबंध:

stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()} 

correlations = covariance.map(corr(stddev_prods)) 

उदाहरण डेटा:

df = sc.parallelize([ 
    ("a", "b", "c", 0.5, 0.5, 0.3, 1.0), 
    ("a", "b", "c", 0.8, 0.8, 0.9, -2.0), 
    ("a", "b", "c", 1.5, 1.5, 2.9, 3.6), 
    ("d", "e", "f", -3.0, 4.0, 5.0, -10.0), 
    ("d", "e", "f", 15.0, -1.0, -5.0, 10.0), 
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"]) 
साथ

परिणाम:,

correlations.collect() 
[(('a', 'b', 'c'), array([ 1.  , 0.99723002, 0.65133607])), 
(('d', 'e', 'f'), array([-1., -1., 1.]))] 

यह समाधान है, जबकि एक सा शामिल है, काफी लोचदार है और आसानी से संभाल करने के लिए समायोजित किया जा सकता:

df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show() 
+----+----+----+-----------+------------------+------------------+ 
|key1|key2|key3|corr(y, x1)|  corr(y, x2)|  corr(y, x3)| 
+----+----+----+-----------+------------------+------------------+ 
| d| e| f|  -1.0|    -1.0|    1.0| 
| a| b| c|  1.0|0.9972300220940342|0.6513360726920862| 
+----+----+----+-----------+------------------+------------------+ 

और विधि ऊपर प्रदान की विभिन्न डेटा वितरण। जेआईटी के साथ और बढ़ावा देना भी संभव होना चाहिए।

संबंधित मुद्दे