2016-04-23 13 views
13

पर चलते थे देता है मैं इस चिंगारी कार्यक्रम है और मैं यह सिर्फ उचित भागोंस्पार्क कार्यक्रम अजीब परिणाम जब स्टैंडअलोन क्लस्टर

# Split by delimiter , 
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number 
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist() 

# start with K randomly selected points from the dataset 
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid. 
centroids = points.takeSample(False, K, 34) 
#print centroids 
# Initialize our new centroids 
newCentroids = [[] for k in range(K)] 
tempCentroids = [] 
for centroid in centroids: 
    tempCentroids.append([centroid[N] + 0.5]) 
#centroids = sc.broadcast(tempCentroids) 

convergence = False 

ncm = NCM() 

while(not convergence): 
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m))) 
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value))) 
    # Memberships 
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c))) 
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    # Components of new centroids 
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t)))) 
    #print "wTm = " + str(wTm.collect()) 
    print "at first reduce" 
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2)) 
    #print "sumwTm = " + str(sumwTm.collect()) 
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t)))) 
    print "adding to cnumerator list" 
    #print wTmx.collect() 
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values() 
    print "collected cnumerator, now printing"  
    #print "cnumerator = " + str(cnumerator.collect()) 
    #print str(sumwTm.collect()) 
    # Calculate the new centroids 
    sumwTmCollection = sumwTm.collect()[0][1] 
    cnumeratorCollection = cnumerator.collect() 
    #print "sumwTmCollection = " + str(sumwTmCollection) 
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items 
    print "cnumeratorCollection = " + str(cnumeratorCollection) 
    for i in range(len(newCentroids)): 
     newCentroids[i] = scalarMult(1/sumwTmCollection[i], [cnumeratorCollection[i]]) 
    centroids = newCentroids 
    # Test for convergence 
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon) 

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met 
    # Clear out space for a new set of centroids 
    newCentroids = [[] for k in range(K)] 

इस कार्यक्रम के अपने स्थानीय मशीन पर बहुत अच्छी तरह से काम करता है के लिए सीमित करने के लिए कोशिश करता हूँ, फिर भी, स्टैंडअलोन क्लस्टर पर चलने पर यह अपेक्षित व्यवहार नहीं करता है। यह जरूरी नहीं कि एक त्रुटि फेंक दे, लेकिन यह मेरी स्थानीय मशीन पर चलने पर प्राप्त होने वाले विभिन्न आउटपुट से अलग आउटपुट देता है। क्लस्टर और 3 नोड्स ठीक काम कर रहे हैं। मुझे लगता है कि समस्या यह है कि मैं centroids अद्यतन करता रहता हूं, जो एक पायथन सूची है, और यह हर बार while-loop के माध्यम से बदलता है। क्या यह संभव है कि प्रत्येक नोड में उस सूची की नवीनतम प्रतिलिपि न हो? मुझे ऐसा लगता है इसलिए मैंने broadcast variable का उपयोग करने की कोशिश की लेकिन उनको अपडेट नहीं किया जा सकता (केवल पढ़ने के लिए)। मैंने accumulator का उपयोग करने का भी प्रयास किया लेकिन वे केवल संचय के लिए हैं। मैंने प्रत्येक नोड के लिए एचडीएफएस पर एक फाइल के रूप में पाइथन सूचियों को सहेजने की कोशिश की, लेकिन यह अच्छी तरह से काम नहीं कर सका। क्या आपको लगता है कि मैं समस्या को सही ढंग से समझ रहा हूं? क्या यहां कुछ और चल रहा है? मैं कोड कैसे प्राप्त कर सकता हूं जो मेरी स्थानीय मशीन पर ठीक काम करता है, लेकिन क्लस्टर पर नहीं?

+0

क्षमा करें, मैं यह नहीं समझ सकता कि आप अपने कोड में अपने सेंट्रॉइड को अपडेट कर रहे हैं .. क्या आप इसे मेरे लिए हाइलाइट कर सकते हैं? धन्यवाद – mgaido

+0

इसे देखने के लिए धन्यवाद। यह कपास की ओर है। 'सेंट्रॉइड = न्यू कंट्रोइड्स '। –

+0

यदि आप अपने कोड को साफ़/पेयर करते हैं, तो मैं आपके प्रश्न का उत्तर देने के लिए और अधिक प्रेरित हूं, और अधिक महत्वपूर्ण बात यह है कि आपकी स्क्रिप्ट क्लस्टर पर अलग-अलग आउटपुट देता है। –

उत्तर

4

इस समस्या के लिए हर समय और ध्यान के लिए धन्यवाद, खासकर जब ऐसा लगता है जैसे मैं आपकी नौकरियों को आसान बनाने के लिए अधिक जानकारी पोस्ट कर सकता था। समस्या यहाँ

centroids = points.takeSample(False, K, 34) 

में मैं इस एहसास नहीं था, लेकिन एक छोटे से प्रयोग के बाद, इस समारोह में एक ही उत्पादन हर बार लौटता है, किया जा रहा है कि मैं क्या सोचा था कि नमूने के तौर पर था के बावजूद। जब तक आप एक ही बीज (इस मामले में 34) का उपयोग करते हैं, तब तक आपको वही आरडीडी मिल जाएगा। मेरे क्लस्टर पर आरडीडी किसी अन्य कारण से अलग था जो मेरी स्थानीय मशीन पर लौट आया था। किसी भी मामले में, चूंकि यह हर बार एक ही आरडीडी था, मेरा आउटपुट कभी नहीं बदला। "यादृच्छिक" सेंट्रॉइड मेरे साथ लौटने की समस्या यह है कि इन विशेष लोगों ने गणित में एक गद्दे बिंदु की तरह कुछ जन्म दिया, जहां सेंट्रॉइड का कोई अभिसरण नहीं मिलेगा। उत्तर का यह हिस्सा गणितीय और प्रोग्रामिंग एक है, इसलिए मैं इसका और उल्लेख नहीं करूँगा। इस बिंदु पर मेरी असली आशा व्यक्त की कि अगर आप

centroids = points.takeSample(False, K, 34) 

चाहते है कि आप अपने बीज कुछ यादृच्छिक संख्या के लिए हर बार बदल विभिन्न नमूनों हर बार यह कहा जाता है का उत्पादन करने, कि दूसरों धारणा से मदद कर रहे हैं।

मुझे उम्मीद है कि यह सब मदद करता है। मैंने अपनी याददाश्त के समाधान पर इतना समय बिताया नहीं है।

फिर से धन्यवाद।

संबंधित मुद्दे