2013-05-29 5 views
8

मेरे पास scipy.sparse.csr_matrix प्रारूप में एक बड़ा स्पैर मैट्रिक्स एक्स है और मैं समानांतरता का उपयोग करने वाले एक numpy array w द्वारा इसे गुणा करना चाहता हूं। कुछ शोधों के बाद मुझे पता चला कि मुझे प्रक्रियाओं के बीच एक्स और डब्ल्यू की प्रतिलिपि बनाने से बचने के लिए मल्टीप्रोसेसिंग में ऐरे का उपयोग करने की आवश्यकता है (उदा। यहां: How to combine Pool.map with Array (shared memory) in Python multiprocessing? और Is shared readonly data copied to different processes for Python multiprocessing?)। (4.431, 0.165) समानांतर संस्करण का संकेत गैर समानांतर गुणा तुलना में बहुत धीमी है: यहाँ मेरी नवीनतम प्रयासscipy sparse matrix गुणा को समानांतर करने के लिए कैसे करें

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp): 
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray 
    global WShape 

    WArray = Warr  
    WShape = Wshp 

def dot2(args): 
    rowInds, i = args  

    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float) 
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32) 
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) 
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) 

    global WArray 
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) 

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W) 

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count() 
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) 

    #Store the data in X as RawArray objects so we can share it amoung processes 
    XData = multiprocessing.RawArray("d", X.data) 
    XIndices = multiprocessing.RawArray("i", X.indices) 
    XIndptr = multiprocessing.RawArray("i", X.indptr) 

    def matmat(W): 
     WArray = multiprocessing.RawArray("d", W.flatten()) 
     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
     params = [] 

     for i in range(numJobs): 
      params.append((rowInds, i)) 

     iterator = pool.map(dot2, params) 
     P = numpy.zeros((X.shape[0], W.shape[1])) 

     for i in range(numJobs): 
      P[rowInds[i]:rowInds[i+1], :] = iterator[i] 

     return P 

    return matmat 

if __name__ == '__main__': 
    #Create a random sparse matrix X and a random dense one W  
    X = scipy.sparse.rand(10000, 8000, 0.1) 
    X = X.tocsr() 
    W = numpy.random.rand(8000, 20) 

    startTime = time.time() 
    A = getMatmat(X)(W) 
    parallelTime = time.time()-startTime 

    startTime = time.time() 
    B = X.dot(W) 
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime) 

हालांकि उत्पादन की तरह कुछ है।

मेरा मानना ​​है कि मंदी का कारण इसी तरह की परिस्थितियों में हो सकता है जब कोई प्रक्रियाओं में बड़े डेटा की प्रतिलिपि बना रहा है, लेकिन यह मामला यहां नहीं है क्योंकि मैं साझा चर को संग्रहीत करने के लिए ऐरे का उपयोग करता हूं (जब तक यह numpy.frombuffer में नहीं होता है या जब एक csr_matrix बनाना, लेकिन तब मुझे सीधे csr_matrix साझा करने का कोई तरीका नहीं मिला)। धीमी गति का एक अन्य संभावित कारण प्रत्येक प्रक्रिया के लिए प्रत्येक मैट्रिक्स गुणा का एक बड़ा परिणाम लौटा रहा है, हालांकि मुझे इस बारे में कोई जानकारी नहीं है।

क्या कोई देख सकता है कि मैं कहां गलत हूं? किसी भी मदद के लिए धन्यवाद!

अपडेट: मुझे यकीन नहीं है लेकिन मुझे लगता है कि प्रक्रियाओं के बीच बड़ी मात्रा में डेटा साझा करना केवल इतना कुशल नहीं है, और आदर्श रूप में मुझे मल्टीथ्रेडिंग का उपयोग करना चाहिए (हालांकि ग्लोबल इंटरप्रेटर लॉक (जीआईएल) बहुत कठिन बनाता है)। इसके आस-पास एक तरीका है उदाहरण के लिए साइथन का उपयोग करके जीआईएल जारी करना (http://docs.cython.org/src/userguide/parallelism.html देखें), हालांकि बहुत सारे निष्क्रिय कार्यों को जीआईएल के माध्यम से जाना होगा।

+0

क्या आपके पास एक अनुकूलित, बहुप्रचारित ATLAS निर्माण से जुड़ा हुआ numpy/scipy है?यदि आप ऐसा करते हैं, तो आप np.dot का उपयोग करते समय समानांतर मैट्रिक्स गुणा को मुफ्त में प्राप्त करना चाहिए। –

+1

मैं एक मल्टीथ्रेडेड बीएलएएस लाइब्रेरी (ओपनबीएलएस) का उपयोग कर रहा हूं जो numpy/scipy से जुड़ा हुआ है लेकिन मैंने X.dot (W) और numpy.dot (X, W) का परीक्षण किया है (बाद वाला स्पैस एक्स के लिए काम नहीं करता है) और यह नहीं है parallelised। – Charanpal

उत्तर

1

आपकी सर्वश्रेष्ठ शर्त साइथन के साथ सी को छोड़ना है। इस तरह आप जीआईएल को हरा सकते हैं और ओपनएमपी का उपयोग कर सकते हैं। मुझे आश्चर्य नहीं है कि मल्टीप्रोसेसिंग धीमी है - वहां बहुत सारे ओवरहेड हैं।

यहां साइप्रस के स्पैर मैट्रिक्स का एक बेवकूफ रैपर ओपनएमपी रैपर है - पाइथन में वेक्टर उत्पाद कोड।

मेरे लैपटॉप पर, यह तेजी से थोड़ा तेज चलता है। लेकिन मेरे पास इतने सारे कोर नहीं हैं। कोड, जिसमें setup.py स्क्रिप्ट और सी हेडर फाइलें और सामान शामिल हैं, इस आलेख में है: https://gist.github.com/rmcgibbo/6019670

मुझे संदेह है कि यदि आप वास्तव में समानांतर कोड तेज़ होना चाहते हैं (मेरे लैपटॉप पर, यह केवल 20% तेज है सिंगल थ्रेडेड सिसी से, 4 धागे का उपयोग करते समय भी), आपको कैश इलाके पर ध्यान देना, मेरे समानांतरता के बारे में और अधिक सावधानी से सोचने की आवश्यकता है।

# psparse.pyx 

#----------------------------------------------------------------------------- 
# Imports 
#----------------------------------------------------------------------------- 
cimport cython 
cimport numpy as np 
import numpy as np 
import scipy.sparse 
from libc.stddef cimport ptrdiff_t 
from cython.parallel import parallel, prange 

#----------------------------------------------------------------------------- 
# Headers 
#----------------------------------------------------------------------------- 

ctypedef int csi 

ctypedef struct cs: 
    # matrix in compressed-column or triplet form 
    csi nzmax  # maximum number of entries 
    csi m   # number of rows 
    csi n   # number of columns 
    csi *p   # column pointers (size n+1) or col indices (size nzmax) 
    csi *i   # row indices, size nzmax 
    double *x  # numerical values, size nzmax 
    csi nz   # # of entries in triplet matrix, -1 for compressed-col 

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil 
cdef extern csi cs_print (cs *A, csi brief) nogil 

assert sizeof(csi) == 4 

#----------------------------------------------------------------------------- 
# Functions 
#----------------------------------------------------------------------------- 

@cython.boundscheck(False) 
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): 
    """Multiply a sparse CSC matrix by a dense matrix 

    Parameters 
    ---------- 
    X : scipy.sparse.csc_matrix 
     A sparse matrix, of size N x M 
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] 
     A dense matrix, of size M x P. Note, W must be contiguous and in 
     fortran (column-major) order. You can ensure this using 
     numpy's `asfortranarray` function. 

    Returns 
    ------- 
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] 
     A dense matrix, of size N x P, the result of multiplying X by W. 

    Notes 
    ----- 
    This function is parallelized over the columns of W using OpenMP. You 
    can control the number of threads at runtime using the OMP_NUM_THREADS 
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. 
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the 
    GNU LGPL v2.1+. 

    References 
    ---------- 
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems 
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 
    """ 
    if X.shape[1] != W.shape[0]: 
     raise ValueError('matrices are not aligned') 

    cdef int i 
    cdef cs csX 
    cdef np.ndarray[double, ndim=2, mode='fortran'] result 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices 
    cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data 

    # Pack the scipy data into the CSparse struct. This is just copying some 
    # pointers. 
    csX.nzmax = X.data.shape[0] 
    csX.m = X.shape[0] 
    csX.n = X.shape[1] 
    csX.p = &indptr[0] 
    csX.i = &indices[0] 
    csX.x = &data[0] 
    csX.nz = -1 # to indicate CSC format 

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) 
    for i in prange(W.shape[1], nogil=True): 
     # X is in fortran format, so we can get quick access to each of its 
     # columns 
     cs_gaxpy(&csX, &W[0, i], &result[0, i]) 

    return result 

यह सीएसपरसे से कुछ सी कहता है।

// src/cs_gaxpy.c 

#include "cs.h" 
/* y = A*x+y */ 
csi cs_gaxpy (const cs *A, const double *x, double *y) 
{ 
    csi p, j, n, *Ap, *Ai ; 
    double *Ax ; 
    if (!CS_CSC (A) || !x || !y) return (0) ;  /* check inputs */ 
    n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; 
    for (j = 0 ; j < n ; j++) 
    { 
     for (p = Ap [j] ; p < Ap [j+1] ; p++) 
     { 
     y [Ai [p]] += Ax [p] * x [j] ; 
     } 
    } 
    return (1) ; 
} 
+0

इस प्रतिक्रिया के लिए धन्यवाद! मेरे पास समान विचार थे और उन्होंने ईजिन पर आधारित एक साइथन/ओपनएमपी डॉट उत्पाद लिखा था (https://github.com/charanpald/sppy/blob/master/sppy/csarray_base.pyx के pdot2d देखें)। यहां, मैंने एक्स की पंक्तियों को cpu_count ब्लॉक में विभाजित किया है और यह मेरी 8 कोर मशीन पर लगभग 2x तेज चलता है (मुझे यकीन है कि इसे बेहतर किया जा सकता है)। जैसे ही मैं संकलन के साथ कुछ मुद्दों को हल करता हूं, मैं आपके समाधान के साथ तुलना करूंगा। – Charanpal

1

शायद प्रतिक्रिया के साथ थोड़ा देर हो चुकी है। PyTrilinos पैकेज का उपयोग करके विश्वसनीय समांतर गतिशीलता प्राप्त करना संभव हो सकता है जो ट्रिलिनोस में कई कार्यों में पाइथन रैपर प्रदान करता है।

from PyTrilinos import Epetra 
from scipy.sparse import rand 
import numpy as np 

n_rows = 10000 
n_cols = 8000 
n_vecs = 20 
fill_factor = 0.1 

comm = Epetra.PyComm() 
my_id = comm.MyPID() 

row_map = Epetra.Map(n_rows, 0, comm) 
out_vec_map = row_map 
in_vec_map = Epetra.Map(n_cols, 0, comm) 
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) 

n_local_rows = row_map.NumMyElements() 

# Create local block matrix in scipy and convert to Epetra 
X = rand(n_local_rows, n_cols, fill_factor).tocoo() 

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) 
A.InsertMyValues(X.row, X.col, X.data) 
A.FillComplete() 

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) 
V = Epetra.MultiVector(in_vec_map, n_vecs) 

V[:] = W.T # order of indices is opposite 

B = Epetra.MultiVector(out_vec_map, n_vecs) 

# Multiply 
A.Multiply(False, V, B) 

फिर आप एमपीआई

का उपयोग कर
mpiexec -n 2 python scipy_to_trilinos.py 

PyTrilinos के अन्य उदाहरण GitHub भंडार here पर पाया जा सकता इस कोड को चला सकते हैं: यहाँ अपने उदाहरण pyTrilinos उपयोग करने के लिए परिवर्तित है। बेशक अगर कोई pyTrilinos का उपयोग करना था, तो scipy का उपयोग करके मैट्रिक्स को शुरू करने का यह तरीका सबसे इष्टतम नहीं हो सकता है।

संबंधित मुद्दे