अद्यतन 1.0 शुरू
ऐसा लगता है जब कॉलपायथन में समानांतर लूप कैसे करें?
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
तर्क समारोह ALS_Y/ALS_X
में पारित नहीं संदर्भ हैं, यह arguments..So की नकल की है, जब X
या Y
बहुत large matrixes
जैसे है, मेरे मामले में, यह 6000*40
या तो है (और यह for-loop
है, मान लें कि संख्या पुनरावृत्तियों 50 000
है, इसलिए ...), यह स्मृति की सीमा से अधिक है।
और फिर मैं वैश्विक तर्कों का उपयोग, बस कार्यों में पैरामीटर के रूप में सूचकांक गुजर की कोशिश की,
import multiprocessing
import time
import numpy as np
def func(idx):
global a
a[idx] += 1
if __name__ == "__main__":
a=range(10)
for j in xrange(2):
pool = multiprocessing.Pool(processes=8)
result = []
for i in xrange(10):
result.append(pool.apply_async(func, (i,)))
pool.close()
pool.join()
print a
print "Sub-process(es) done."
यह आउटपुट: `
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
So, this means it still copied
A`! अब, मुझे आश्चर्य है कि इस मुद्दे को संभालने का कोई तरीका है? सराहना!
अद्यतन 1.0 अंत
नीचे मैट्रिक्स गुणन समस्या को हल करने अजगर में मेरी कोड है। डब्ल्यू = एक्सवाई। हालांकि, नीचे दिए गए कोड कुशल नहीं हैं, और मुझे उम्मीद है कि इसे समानांतर संस्करण में परिवर्तित किया जा सकता है, जीपीयू का उपयोग करना सबसे अच्छा है, सीपीयू भी ठीक है। मुझे समांतर प्रोग्रामिंग के बारे में कोई अनुभव नहीं है, तो क्या कोई मुझे कुछ सलाह दे सकता है?
नीचे (, कम से कम वर्ग बारी विवरण here) मैट्रिक्स का उपयोग करते हुए ए एल एस गुणनखंड के लिए कोड है
for ii in range(n_iterations):
for u, Wu in enumerate(W):
X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop
for i, Wi in enumerate(W.T):
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii)
प्रयुक्त बहु lib के बाद, अब मेरी कोड:
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T
for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)#create pool
result = []#store each row for X
idx = []#store the row number
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()#assign the result to X
######################################
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error
लेकिन कुछ हद तक दुख, कार्यक्रम हमेशा चुपचाप दुर्घटनाग्रस्त हो गया ...
नीचे मेरे कोड का पूरा समूह है .. यह सब गन्दा है। बस load_data
get_error
और vec2str
उपेक्षा, यहाँ के बाद से मैं मैट्रिक्स बेतरतीब ढंग से ..
import pandas as pd
import numpy as np
import multiprocessing
def vec2str(vec):
res = ''
for dim in len(vec):
res += str(vec[dim]) + ','
return res
def load_data(heads, filename, sep,header=None):
data = pd.read_table(filename, sep=sep, header=header, names=heads)
rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally...
Q = rp.fillna(0)
Q = Q.values
W = Q >0.5
W[W == True] = 1
W[W == False] = 0
W = W.astype(np.float64, copy=False)
return Q, W, rp
def get_error(Q, X, Y, W):
return np.sum((W * (Q - np.dot(X, Y)))**2)
'''
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(),
np.dot(, np.dot(np.diag(), Q[u].T))).T
'''
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T
'''
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
'''
def ALS_Y(X, Wi, Q, lambda_, n_factors, i):
return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
if __name__ == "__main__":
lambda_ = 0.1
n_factors = 40
filename = 'data_songID'
n_iterations = 20
#Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',')
Q = np.random.rand(1000,1000)
m, n = Q.shape
W = np.eye(1000)
print 'Loading data finished, ', 'size: ', Q.shape
print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors)
X = 5 * np.random.rand(m, n_factors)
Y = 5 * np.random.rand(n_factors, n)
errors = []
for ii in range(n_iterations):
X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors),
np.dot(Y, Q.T)).T
Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors),
np.dot(X.T, Q))
if ii % 100 == 0:
print('{}th iteration is completed'.format(ii))
errors.append(get_error(Q, X, Y, W))
Q_hat = np.dot(X, Y)
print('Error of rated movies: {}'.format(get_error(Q, X, Y, W)))
print errors
#####ALS start....#####
print '*'*100
weighted_errors = []
for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()
######################################
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error
weighted_Q_hat = np.dot(X,Y)
print weighted_errors
X.tofile('X.bin')
Y.tofile('Y.bin')
latent_user_file = open('user_latent','w')
for idx in len(rp.axes[0]):
latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n')
latent_mid_file = open('mid_latent', 'w')
for idx in len(rp.axes[1]):
latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n')
मल्टीथ्रेडिंग मॉड्यूल का उपयोग करके आप 2 धागे क्यों नहीं बनाते हैं और फिर उनसे जुड़ें –
@binayr: पाइथन में थ्रेड ग्लोबल इंटरप्रेटर लॉक के अधीन हैं, लेकिन एन सीपी जैसे सी एक्सटेंशन बिना काम किए सार्थक मात्रा में काम करने में सक्षम हो सकते हैं जीआईएल, तो यह कहना मुश्किल है कि कितनी मदद होगी। – Kevin
चूंकि X_inner लूप के बाद Y_inner लूप को निष्पादित किया जाना है। लेकिन X_inner लूप में प्रत्येक पुनरावृत्ति एक साथ निष्पादित किया जा सकता है। और मुझे उम्मीद है कि X_inner लूप को समानांतर निष्पादित किया जा सकता है ... मेरी मशीन एक 12-सीपीयू और 3-जीपीयू है ... इसलिए, मुझे उम्मीद है कि मैं इनका पूरी तरह से उपयोग कर सकता हूं, लेकिन मैं यह कैसे नहीं कर सकता .. उदास .. –