2013-05-14 22 views
5

पूल के साथ पाइथन में मल्टीप्रोसेसिंग पैकेज का उपयोग करने का प्रयास कर रहा है।पाइथन मल्टीप्रोसेसिंग पूल map_async

Traceback (most recent call last): 
    File "pool-test.py", line 9, in <module> 
    pool.map_async(f,hosts,"test") 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async 
    result = MapResult(self._cache, chunksize, len(iterable), callback) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__ 
    self._number_left = length//chunksize + bool(length % chunksize) 
TypeError: unsupported operand type(s) for //: 'int' and 'str' 

मैं कैसे 1 से अधिक तर्क पारित करने के लिए पता नहीं है:

from multiprocessing import Pool 

def f(host, x): 
    print host 
    print x 

hosts = ['1.1.1.1', '2.2.2.2'] 
pool = Pool(processes=5) 
pool.map_async(f,hosts,"test") 
pool.close() 
pool.join() 

इस कोड को अगले त्रुटि है:

मैं समारोह च जो map_async समारोह द्वारा कहा जाता है एफ समारोह के लिए। क्या कोई रास्ता है?

+0

तुम बस 'pool.map' का उपयोग करें और' "परीक्षण" 'डमी चर छोड़ पूरी तरह कर सकते हैं। – danodonovan

उत्तर

7

"test"map_async के chunksize कीवर्ड तर्क (the docs देखें) के रूप में व्याख्या की है।

आपका कोड शायद होना चाहिए (यहाँ मेरी IPython सत्र से प्रतिलिपि चिपकाया):

from multiprocessing import Pool 

def f(arg): 
    host, x = arg 
    print host 
    print x 

hosts = ['1.1.1.1', '2.2.2.2'] 
args = ((host, "test") for host in hosts) 
pool = Pool(processes=5) 
pool.map_async(f, args) 
pool.close() 
pool.join() 
## -- End pasted text -- 

1.1.1.1 
test 
2.2.2.2 
test 

नोट: अजगर 3 में आप starmap उपयोग कर सकते हैं, जो tuples से तर्क खोल देगा। आप स्पष्ट रूप से host, x = arg करने से बचने में सक्षम होंगे।

+0

मैंने इसका परीक्षण किया लेकिन परिणाम अच्छा नहीं है; यह दोनों होस्टों को प्रिंट करता है लेकिन "टेस्ट" शब्द का "टी" और "ई" केवल प्रिंट करता है। – dseira

+0

अजीब। यह निश्चित रूप से मेरे कंप्यूटर पर ऐसा नहीं करता है। मेरे परिणामों के लिए अद्यतन देखें - मैंने कॉपी-पेस्ट किया और उन्हें दोबारा जांच लिया। –

+0

एक्स = ["टेस्ट", "टेस्ट"] के साथ यह काम करता है लेकिन इसका कोई मतलब नहीं है क्योंकि कल्पना करें कि मेजबान सूची लगभग 10000 है और मैं केवल परिणामों की तुलना करने के लिए एक एक्स चाहता हूं। एक ही परिणाम के साथ 10000 प्रविष्टियों के साथ एक एक्स सूची होना व्यवहार्य नहीं है। कोई बात नहीं धन्यवाद। – dseira

1

जैसा कि मुझे याद है, पूल()। नक्शा() और .map_async() विशेष रूप से केवल एक ही तर्क स्वीकार करते हैं। इस सीमा को एक सूची उत्तीर्ण करके चारों ओर काम किया जा सकता है, लेकिन निश्चित रूप से आपको एक सूची (जैसे) ऑब्जेक्ट को तर्क के रूप में लेने के लिए डिज़ाइन किए गए एक अनुकूलित फ़ंक्शन की आवश्यकता होती है।

एक दृष्टिकोण कस्टम कोड को एक बार लिखना है - उर्फ, एक सामान्य "फ़ंक्शन + तर्क" रैपर। (: यह केवल आंशिक रूप से परीक्षण किया जाता है नोट):

def tmp_test(): 
    # a short test script: 
    # 
    A=[[1,2], [2,3], [4,5], [6,7]] 
    P=mpp.Pool(mpp.cpu_count()) 
    X=P.map_async(map_helper, [[operator.eq]+a for a in A]) 
    # 
    return X.get() 


def null_funct(args=[], kwargs={}): 
    # a place-holder 
    pass 
# 
def map_helper(args_in = [null_funct, [], {}]): 
    # helper function for pool.map_async(). pass data as a list(-like object): 
    # [function, [args], {kwargs}] (though we'll allow for some mistakes). 
    # 
    funct = args_in[0] 
    # 
    # allow for different formatting options: 
    if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)): 
     # probably passed a list of parameters. just use them: 
     args = args_in[1:] 
     # 
     return funct(*args) 
    # 
    # if the args are "properly" formatted: 
    args=[] 
    kwargs = {} 
    for arg in args_in[1:]: 
     # assign list types to args, dict types to kwargs... 
     if isinstance(arg, list) or isinstance(arg, tuple): args += arg 
     if isinstance(arg, dict): kwargs.update(arg) 
    return funct(*args, **kwargs) 
3

पूल अजगर 3 में एक संदर्भ प्रबंधक वापस आती है और इसलिए एक बयान के साथ इस्तेमाल किया जा सकता है मैं कुछ इस तरह ऊपर काम किया। यह अपवादों के साथ समस्याओं से बचाता है और इसका मतलब है कि बंद करने और शामिल होने की कोई आवश्यकता नहीं है। इस मामले में फ़ंक्शन हमेशा एक्स चर के लिए निरंतर प्राप्त कर रहा है और इसलिए इसे आंशिक मूल्यांकन के साथ संभाला जा सकता है। map_async आलसी है और इसलिए हमें कार्यों के होने के परिणाम प्राप्त करने की आवश्यकता है, साथ ही मानचित्र का उपयोग भी कर सकते हैं। इस प्रकार:

from multiprocessing import Pool 
from functools import partial 

def f(host, x): 
    print(host) 
    print(x) 

hosts = ('1.1.1.1', '2.2.2.2') 
with Pool(processes=5) as pool: 
    pool.map(partial(f, x='test'), hosts) 

परिणामों में:

 
1.1.1.1 
test 
2.2.2.2 
test 
संबंधित मुद्दे