5

अद्यतन 1.0 शुरू

ऐसा लगता है जब कॉलपायथन में समानांतर लूप कैसे करें?

for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 

तर्क समारोह ALS_Y/ALS_X में पारित नहीं संदर्भ हैं, यह arguments..So की नकल की है, जब X या Y बहुत large matrixes जैसे है, मेरे मामले में, यह 6000*40 या तो है (और यह for-loop है, मान लें कि संख्या पुनरावृत्तियों 50 000 है, इसलिए ...), यह स्मृति की सीमा से अधिक है।
और फिर मैं वैश्विक तर्कों का उपयोग, बस कार्यों में पैरामीटर के रूप में सूचकांक गुजर की कोशिश की,

import multiprocessing 
import time 
import numpy as np 

def func(idx): 
    global a 
    a[idx] += 1 



if __name__ == "__main__": 
    a=range(10) 
    for j in xrange(2): 
     pool = multiprocessing.Pool(processes=8) 
     result = [] 
     for i in xrange(10): 
      result.append(pool.apply_async(func, (i,))) 
     pool.close() 
     pool.join() 
     print a 
     print "Sub-process(es) done." 

यह आउटपुट: `

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 

So, this means it still copied A`! अब, मुझे आश्चर्य है कि इस मुद्दे को संभालने का कोई तरीका है? सराहना!

अद्यतन 1.0 अंत


नीचे मैट्रिक्स गुणन समस्या को हल करने अजगर में मेरी कोड है। डब्ल्यू = एक्सवाई। हालांकि, नीचे दिए गए कोड कुशल नहीं हैं, और मुझे उम्मीद है कि इसे समानांतर संस्करण में परिवर्तित किया जा सकता है, जीपीयू का उपयोग करना सबसे अच्छा है, सीपीयू भी ठीक है। मुझे समांतर प्रोग्रामिंग के बारे में कोई अनुभव नहीं है, तो क्या कोई मुझे कुछ सलाह दे सकता है?

नीचे (, कम से कम वर्ग बारी विवरण here) मैट्रिक्स का उपयोग करते हुए ए एल एस गुणनखंड के लिए कोड है

for ii in range(n_iterations): 
    for u, Wu in enumerate(W): 
     X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
     Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 

प्रयुक्त बहु lib के बाद, अब मेरी कोड:

def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

for ii in range(n_iterations): 
pool = multiprocessing.Pool(processes=12)#create pool 
result = []#store each row for X 
idx = []#store the row number 
for u, Wu in enumerate(W): 
    idx.append(u) 
    result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
pool.close() 
pool.join() 
for u, vector in zip(idx, result): 
    X[u] = vector.get()#assign the result to X 
###################################### 
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X 
result = [] 
idx = [] 
for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
pool.close() 
pool.join() 
for i, vector in zip(idx, result): 
    Y[:,i] = vector.get() 
error = get_error(Q, X, Y, W) 
weighted_errors.append(error) 
print '{}th iteration is completed'.format(ii), 'error: ',error 

लेकिन कुछ हद तक दुख, कार्यक्रम हमेशा चुपचाप दुर्घटनाग्रस्त हो गया ...

नीचे मेरे कोड का पूरा समूह है .. यह सब गन्दा है। बस load_dataget_error और vec2str उपेक्षा, यहाँ के बाद से मैं मैट्रिक्स बेतरतीब ढंग से ..

import pandas as pd 
import numpy as np 
import multiprocessing 

def vec2str(vec): 
    res = '' 
    for dim in len(vec): 
     res += str(vec[dim]) + ',' 
    return res 

def load_data(heads, filename, sep,header=None): 
    data = pd.read_table(filename, sep=sep, header=header, names=heads) 
    rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally... 
    Q = rp.fillna(0) 
    Q = Q.values 
    W = Q >0.5 
    W[W == True] = 1 
    W[W == False] = 0 
    W = W.astype(np.float64, copy=False) 
    return Q, W, rp 

def get_error(Q, X, Y, W): 
    return np.sum((W * (Q - np.dot(X, Y)))**2) 

''' 
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(), 
           np.dot(, np.dot(np.diag(), Q[u].T))).T 
''' 
def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
    return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

''' 
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 
''' 

def ALS_Y(X, Wi, Q, lambda_, n_factors, i): 
    return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 



if __name__ == "__main__": 

    lambda_ = 0.1 
    n_factors = 40 
    filename = 'data_songID' 
    n_iterations = 20 
    #Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',') 
    Q = np.random.rand(1000,1000) 
    m, n = Q.shape 
    W = np.eye(1000) 
    print 'Loading data finished, ', 'size: ', Q.shape 
    print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors) 
    X = 5 * np.random.rand(m, n_factors) 
    Y = 5 * np.random.rand(n_factors, n) 
    errors = [] 
    for ii in range(n_iterations): 
     X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors), 
         np.dot(Y, Q.T)).T 
     Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors), 
         np.dot(X.T, Q)) 
     if ii % 100 == 0: 
      print('{}th iteration is completed'.format(ii)) 
     errors.append(get_error(Q, X, Y, W)) 
     Q_hat = np.dot(X, Y) 
     print('Error of rated movies: {}'.format(get_error(Q, X, Y, W))) 
    print errors 
    #####ALS start....##### 
    print '*'*100 
    weighted_errors = [] 
    for ii in range(n_iterations): 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for u, Wu in enumerate(W): 
      idx.append(u) 
      result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
     pool.close() 
     pool.join() 
     for u, vector in zip(idx, result): 
      X[u] = vector.get() 
     ###################################### 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for i, Wi in enumerate(W.T): 
      idx.append(i) 
      result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
     pool.close() 
     pool.join() 
     for i, vector in zip(idx, result): 
      Y[:,i] = vector.get() 
     error = get_error(Q, X, Y, W) 
     weighted_errors.append(error) 
     print '{}th iteration is completed'.format(ii), 'error: ',error 

    weighted_Q_hat = np.dot(X,Y) 
    print weighted_errors 
    X.tofile('X.bin') 
    Y.tofile('Y.bin') 
    latent_user_file = open('user_latent','w') 
    for idx in len(rp.axes[0]): 
     latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n') 

    latent_mid_file = open('mid_latent', 'w') 
    for idx in len(rp.axes[1]): 
     latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n') 
+0

मल्टीथ्रेडिंग मॉड्यूल का उपयोग करके आप 2 धागे क्यों नहीं बनाते हैं और फिर उनसे जुड़ें –

+1

@binayr: पाइथन में थ्रेड ग्लोबल इंटरप्रेटर लॉक के अधीन हैं, लेकिन एन सीपी जैसे सी एक्सटेंशन बिना काम किए सार्थक मात्रा में काम करने में सक्षम हो सकते हैं जीआईएल, तो यह कहना मुश्किल है कि कितनी मदद होगी। – Kevin

+0

चूंकि X_inner लूप के बाद Y_inner लूप को निष्पादित किया जाना है। लेकिन X_inner लूप में प्रत्येक पुनरावृत्ति एक साथ निष्पादित किया जा सकता है। और मुझे उम्मीद है कि X_inner लूप को समानांतर निष्पादित किया जा सकता है ... मेरी मशीन एक 12-सीपीयू और 3-जीपीयू है ... इसलिए, मुझे उम्मीद है कि मैं इनका पूरी तरह से उपयोग कर सकता हूं, लेकिन मैं यह कैसे नहीं कर सकता .. उदास .. –

उत्तर

1

पिछले साल मैं एक "समांतर लूप" अजगर में के लिए अपनी इच्छा का सामना करना पड़ा, और ऊपर एक काट दिया एक भौतिक विज्ञान के लिए अपने काम के हिस्से के रूप उत्पन्न कागज। ऐसे कई मॉड्यूल हैं जो आप चाहते हैं, लेकिन मुझे लगता है कि मैं वास्तव में पीपी के साथ काम कर सकता हूं जिस तरह से मैं मनमानी कार्यों के लिए चाहता था।

आप कुछ है कि इस तरह दिखता है चाहते हैं:

ResultList = Library_ParallelLoop.Main(
    Function = ExampleFunction, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

तो मैं आप के बजाय इस पोस्ट में मेरा पूरा स्रोत प्रदान करने के रूप कार्यान्वयन वास्तव में प्राप्त करने के लिए यह दर्दनाक कई लाइनों था काम करने का मेरा Git हब को इंगित, और इसमें गहरी प्रतिलिपि पाइथन फ़ंक्शन शामिल हैं जो स्पष्ट रूप से कुछ और है जो कि अजगर में बहुत अच्छी तरह से निर्मित नहीं था।

ढूँढना primes के उदाहरण:

https://github.com/douglasquincyadams/Main/blob/master/Test_ParallelLoop.py

रेपो:

https://github.com/douglasquincyadams/Main

आप अपने कंप्यूटर के कुछ अंधेरे कोने में मेरी रेपो डाउनलोड करते हैं - फिर y हमारे काम का टुकड़ा होना चाहिए:

import Library_ParallelLoop 

def do_the_thing_function(ii): 
    for u, Wu in enumerate(W): 
    X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
    Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
          np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 
    return #whatever your result is supposed to be... your code doesn't work on its own 

ListOfArgSets = [] 
for ii in range(n_iterations): 
    ListOfArgSets.append( { "ii" : ii , } ) 

ResultList = Library_ParallelLoop.Main(
    Function = do_the_thing_function, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

यदि आप मुझसे पूछा - एक समानांतर पाश बहुत तरह के ऊपर एक पहले से ही होना चाहिए कुछ है जो अच्छा काम है और भाषाओं करने के लिए बनाया है, लेकिन हमेशा किसी भी तरह रहस्यमय ढंग से जादूगरों द्वारा exampled किया जा रहा है एक टावर में और जब आप अपने क्रैपी लैपटॉप पर कोशिश करते हैं तो काफी काम नहीं करता है। वैसे भी - उम्मीद है कि यह मदद करता है।

अतिरिक्त नोट - मैं भी सुझाव है कि आप एक मनमाना बड़े पैमाने पर चलाना समस्या (कुछ भी सरल छोरों से अधिक) को हल करने के, कि तुम एमपीआई उपयोग करें, क्योंकि यह घंटियाँ और सीटी जो अनुमति दे सकते हैं के सभी प्रकार है चाहता हूँ एक दूसरे के मध्य भाग से बात करने की प्रक्रिया। एमपीआई वह है जो विज्ञान के लोग सबसे बड़े सिमुलेशन के लिए उपयोग करना पसंद करते हैं, इस प्रकार बड़े आकार के क्लस्टर जो बहुत बड़ी नौकरियों (~ 10k + कोर) को सभी समर्थन एमपीआई को संभालने के लिए डिज़ाइन किए गए हैं और निश्चित रूप से पीपी या यहां तक ​​कि मल्टीप्रोसेसिंग मॉड्यूल का समर्थन करने की संभावना नहीं है। यदि आप बस अपने पीसी में सभी कोर का उपयोग करना चाहते हैं, (या नेटवर्क पर कुछ पीसी) तो बस काम करने के लिए सबसे आसान विकल्प चुनें।

संबंधित मुद्दे