आप NumPy और RDDs का एक सा के साथ की कोशिश कर सकते में कैश - प्रत्येक पाश में चेकप्वाइंट -
बातें नीचे पहले से ही की कोशिश की - पुनः विभाजन DF 10,000 करने के लिए। सबसे पहले आयात का एक गुच्छा:
keys = ["key1", "key2", "key3"] # list of key column names
xs = ["x1", "x2", "x3"] # list of column names to compare
y = "y" # name of the reference column
और कुछ सहायकों:
def as_pair(keys, y, xs):
""" Given key names, y name, and xs names
return a tuple of key, array-of-values"""
key = itemgetter(*keys)
value = itemgetter(y, * xs) # Python 3 syntax
def as_pair_(row):
return key(row), np.array(value(row))
return as_pair_
def init(x):
""" Init function for combineByKey
Initialize new StatCounter and merge first value"""
return StatCounter().merge(x)
def center(means):
"""Center a row value given a
dictionary of mean arrays
"""
def center_(row):
key, value = row
return key, value - means[key]
return center_
def prod(arr):
return arr[0] * arr[1:]
def corr(stddev_prods):
"""Scale the row to get 1 stddev
given a dictionary of stddevs
"""
def corr_(row):
key, value = row
return key, value/stddev_prods[key]
return corr_
और जोड़े की RDD
को DataFrame
कन्वर्ट:
pairs = df.rdd.map(as_pair(keys, y, xs))
from operator import itemgetter
import numpy as np
from pyspark.statcounter import StatCounter
के कुछ चर निर्धारित करते हैं
अगला समूह प्रति आंकड़ों की गणना करते हैं:
stats = (pairs
.combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
.collectAsMap())
means = {k: v.mean() for k, v in stats.items()}
नोट: 5000 सुविधाओं और 7000 समूह के साथ वहाँ होना चाहिए स्मृति में इस संरचना रखने के साथ कोई समस्या नहीं। बड़े डेटासेट के साथ आपको आरडीडी और join
का उपयोग करना पड़ सकता है लेकिन यह धीमा हो जाएगा।
केंद्र डेटा:
centered = pairs.map(center(means))
कंप्यूट सहप्रसरण:
covariance = (centered
.mapValues(prod)
.combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
.mapValues(StatCounter.mean))
और अंत में सह-संबंध:
stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()}
correlations = covariance.map(corr(stddev_prods))
उदाहरण डेटा:
df = sc.parallelize([
("a", "b", "c", 0.5, 0.5, 0.3, 1.0),
("a", "b", "c", 0.8, 0.8, 0.9, -2.0),
("a", "b", "c", 1.5, 1.5, 2.9, 3.6),
("d", "e", "f", -3.0, 4.0, 5.0, -10.0),
("d", "e", "f", 15.0, -1.0, -5.0, 10.0),
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"])
साथ
परिणाम:,
correlations.collect()
[(('a', 'b', 'c'), array([ 1. , 0.99723002, 0.65133607])),
(('d', 'e', 'f'), array([-1., -1., 1.]))]
यह समाधान है, जबकि एक सा शामिल है, काफी लोचदार है और आसानी से संभाल करने के लिए समायोजित किया जा सकता:
df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show()
+----+----+----+-----------+------------------+------------------+
|key1|key2|key3|corr(y, x1)| corr(y, x2)| corr(y, x3)|
+----+----+----+-----------+------------------+------------------+
| d| e| f| -1.0| -1.0| 1.0|
| a| b| c| 1.0|0.9972300220940342|0.6513360726920862|
+----+----+----+-----------+------------------+------------------+
और विधि ऊपर प्रदान की विभिन्न डेटा वितरण। जेआईटी के साथ और बढ़ावा देना भी संभव होना चाहिए।
समूह 100 पंक्तियों से 5-10 मिलियन तक भिन्न हो सकते हैं, समूहों की संख्या 7000 होगी। – Harish