2011-09-05 10 views
11

मेरे पास 256x256x256 नकली सरणी है, जिसमें प्रत्येक तत्व एक मैट्रिक्स है। मुझे इनमें से प्रत्येक मैट्रिक्स पर कुछ गणना करने की ज़रूरत है, और मैं चीजों को गति देने के लिए multiprocessing मॉड्यूल का उपयोग करना चाहता हूं।itertools और multiprocessing संयोजन?

इन गणनाओं के परिणाम, एक 256x256x256 सरणी मूल एक की तरह में संग्रहित किया जाना चाहिए ताकि मूल सरणी में तत्व [i,j,k] में मैट्रिक्स का नतीजा नई सरणी के [i,j,k] तत्व में डाल दिया जाना चाहिए।

ऐसा करने के लिए, मैं एक सूची बनाना चाहता हूं जो छद्म-आश मार्ग में [array[i,j,k], (i, j, k)] के रूप में लिखा जा सकता है और इसे "मल्टीप्रोसेस्ड" होने के लिए एक समारोह में पास कर सकता है। यह मानते हुए कि matrices सभी मैट्रिक्स मूल सरणी और myfunc से निकाले की एक सूची है समारोह गणना कर रही है, कोड इस तरह कुछ हद तक दिखेगा:

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

हालांकि, यह की तरह map_async वास्तव में इस पैदा कर रही है लगता है विशाल finput- पहले सूची: मेरा सीपीयू बहुत कुछ नहीं कर रहा है, लेकिन स्मृति और स्वैप सेकंड के मामले में पूरी तरह से उपभोग हो जाता है, जो स्पष्ट रूप से मैं नहीं चाहता हूं।

क्या इस विशाल सूची को मल्टीप्रोसेसिंग फ़ंक्शन में पास करने का कोई तरीका है बिना इसे स्पष्ट रूप से बनाने की आवश्यकता के? या आप इस समस्या को हल करने का एक और तरीका जानते हैं?

धन्यवाद एक गुच्छा! :-)

+1

चूंकि आप 'map_async()' पर 'get() 'का उपयोग कर रहे हैं, तो आप शायद * एसिंक्रोनस * ऑपरेशन नहीं चाहते हैं और इसके बजाय' Pool.map() 'का उपयोग करना चाहिए। –

+0

शायद मैं समस्या को सही ढंग से समझ नहीं पा रहा हूं, लेकिन क्या आपने imap या imap_unordered माना है? –

उत्तर

10

सभी multiprocessing.Pool.map* विधियों को पूरी तरह से (demo code) जैसे फ़ंक्शन कहा जाता है, इसे पूरी तरह से उपभोग करने वाले यंत्रों का उपभोग करते हैं। एक समय में इटरेटर एक हिस्सा के मानचित्र समारोह मात्रा को खिलाने के लिए, grouper_nofill का उपयोग करें:

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

पी एस। pool.map_async का chunksize पैरामीटर कुछ अलग करता है: यह टुकड़ों में पुनरावर्तनीय तोड़ देता है, फिर प्रत्येक कार्य को कार्यकर्ता प्रक्रिया में देता है जो map(func,chunk) पर कॉल करता है। यह func(item) बहुत जल्दी खत्म होने पर कार्यकर्ता को अधिक डेटा को चबाने के लिए प्रक्रिया दे सकता है, लेकिन यह आपकी स्थिति में मदद नहीं करता है क्योंकि map_async कॉल जारी होने के बाद हीटरेटर अभी भी पूरी तरह से उपभोग हो जाता है।

+0

बहुत बहुत धन्यवाद! आपका समाधान वास्तव में काम करता प्रतीत होता है! संदर्भ के लिए, मुझे pool.map_async (myfunc, finput) .get (99 99 99) का उपयोग करना पड़ा, लेकिन यह काम करता है! हालांकि, यह अभी भी बहुत सारी मेमोरी का उपयोग करता है (निश्चित रूप से सटीक चंक्साइज के आधार पर), और पाइथन रन के दौरान कचरा इकट्ठा नहीं लगता है। कोई विचार क्यों हो सकता है? – digitaldingo

+0

@ डिजीटलडिंगो: एचएम, कुछ भी दिमाग में नहीं आता है। यह आदर्श होगा यदि आप अपने कोड को [एसएससीसीई] (http://sscce.org/) पर भंग कर सकते हैं और इसे यहां पोस्ट कर सकते हैं। – unutbu

0

Pool.map_async() को कई श्रमिकों को काम भेजने के लिए पुनरावर्तनीय की लंबाई जानने की आवश्यकता है। चूंकि izip में __len__ नहीं है, यह पहले सूची में पुनरावर्तनीय रूपांतरित हो जाता है, जिसके कारण आप बड़ी मेमोरी उपयोग कर रहे हैं।

आप -स्टाइल इटरेटर __len__ के साथ स्टाइल इटरेटर बनाकर इसे हटाने का प्रयास कर सकते हैं।

+0

इसे जानने की आवश्यकता क्यों है?यह सभी निष्क्रिय श्रमिकों और इंतजार को क्यों नहीं खिला सकता है? –

+0

@andrew - 'map_async()' ('multiprocessing/pool.py') में पहली पंक्तियां वास्तव में हैं 'अगर हैटर नहीं है (पुन: प्रयोज्य,' __len__ '): iterable = list (iterable)'। श्रमिकों के पूरा होने के आदेश के रूप में पर्याप्त रूप से बड़ी आउटपुट सूची बनाने के लिए इसकी लंबाई की आवश्यकता है। –

+0

हम्म। यह गतिशील रूप से निर्माण कर सकता है, है ना? मैं बस सोच रहा हूं कि इसे एक मुद्दे के रूप में उठाया जा सकता है। यह एक वैध अनुरोध की तरह लगता है। –

2

मैं भी इस समस्या में भाग गया। बजाय इस की:

res = p.map(func, combinations(arr, select_n)) 

res = p.imap(func, combinations(arr, select_n)) 

imap उपभोग नहीं करता है!

संबंधित मुद्दे