2011-08-26 13 views
41

मैंने पहले शोध किया और मेरे प्रश्न का उत्तर नहीं मिला। मैं पायथन में समानांतर में कई कार्यों को चलाने की कोशिश कर रहा हूं।पायथन: मैं समानांतर में पायथन फ़ंक्शन कैसे चला सकता हूं?

files.py 

import common #common is a util class that handles all the IO stuff 

dir1 = 'C:\folder1' 
dir2 = 'C:\folder2' 
filename = 'test.txt' 
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] 

def func1(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir1) 
     c.getFiles(dir1) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir1) 
     c.getFiles(dir1) 

def func2(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir2) 
     c.getFiles(dir2) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir2) 
     c.getFiles(dir2) 

मैं func1 और func2 फोन और उन्हें एक ही समय में चलाने के लिए चाहते हैं:

मैं कुछ इस तरह की है। कार्य एक दूसरे के साथ या एक ही वस्तु पर बातचीत नहीं करते हैं। अभी मुझे func2 को शुरू करने के लिए func2 से पहले खत्म करने के लिए इंतजार करना होगा। कैसे मैं नीचे की तरह कुछ करते हैं:

process.py 

from files import func1, func2 

runBothFunc(func1(), func2()) 

मैं दोनों निर्देशिका बहुत ही समय के करीब बनाने के लिए सक्षम होना चाहते हैं, क्योंकि हर मिनट मैं भरोसा कर रहा हूँ कि कितने फ़ाइलें बनाई जा रही हैं। यदि निर्देशिका नहीं है तो यह मेरा समय फेंक देगा।

+1

प्रश्न अपडेट करना – lmcadory

+1

आप इसे फिर से आर्किटेक्ट करना चाहते हैं; यदि आप हर मिनट फाइल/फ़ोल्डरों की संख्या गिन रहे हैं, तो आप दौड़ की स्थिति बना रहे हैं। प्रत्येक फ़ंक्शन को काउंटर अपडेट करने के बारे में क्या है, या यह सुनिश्चित करने के लिए लॉकफ़ाइल का उपयोग करें कि आवधिक प्रक्रिया गिनती को अद्यतन न करे जब तक कि दोनों कार्य निष्पादित नहीं हो जाते? –

उत्तर

73

आप threading या multiprocessing का उपयोग कर सकते हैं।

peculiarities of CPython, threading के कारण वास्तविक समांतरता प्राप्त करने की संभावना नहीं है। इस कारण से, multiprocessing आम तौर पर एक बेहतर शर्त है।

from multiprocessing import Process 

def func1(): 
    print 'func1: starting' 
    for i in xrange(10000000): pass 
    print 'func1: finishing' 

def func2(): 
    print 'func2: starting' 
    for i in xrange(10000000): pass 
    print 'func2: finishing' 

if __name__ == '__main__': 
    p1 = Process(target=func1) 
    p1.start() 
    p2 = Process(target=func2) 
    p2.start() 
    p1.join() 
    p2.join() 

शुरुआती के यांत्रिकी/बच्चे प्रक्रियाओं में शामिल होने आसानी से अपने runBothFunc की तर्ज पर एक समारोह में समझाया जा सकता है::

यहां एक संपूर्ण उदाहरण है

def runInParallel(*fns): 
    proc = [] 
    for fn in fns: 
    p = Process(target=fn) 
    p.start() 
    proc.append(p) 
    for p in proc: 
    p.join() 

runInParallel(func1, func2) 
+2

मैंने आपके कोड का उपयोग किया लेकिन फ़ंक्शंस अभी भी एक ही समय में शुरू नहीं हुआ था। – lmcadory

+2

@ लामर मैकडॉरी: कृपया समझाएं कि "एक ही समय में" आप वास्तव में क्या मतलब रखते हैं, शायद आपने जो किया, उसके बारे में एक ठोस उदाहरण दिया, जो आप होने की उम्मीद कर रहे थे, और वास्तव में क्या हुआ। – NPE

+3

@ लामर: आपको "बिल्कुल एक ही समय" की कोई गारंटी नहीं हो सकती है और सोच रहा है कि आप केवल सादा गलत हो सकते हैं। आपके पास कितने सीपीयू हैं, मशीन का भार, कंप्यूटर पर होने वाली कई चीजों का समय सभी धागे/प्रक्रिया शुरू होने पर प्रभाव डालता है। साथ ही, चूंकि सृजन के बाद प्रक्रियाएं शुरू हो जाती हैं, इसलिए प्रक्रिया बनाने के ऊपरी हिस्से को भी आपके द्वारा देखे जाने वाले समय के अंतर में गणना की जानी चाहिए। – Martin

3

वहाँ करने के लिए कोई रास्ता नहीं है गारंटी दें कि दो कार्य एक दूसरे के साथ सिंक में निष्पादित होंगे जो ऐसा लगता है कि आप क्या करना चाहते हैं।

सबसे अच्छा आप फ़ंक्शन को कई चरणों में विभाजित करना है, फिर Process.join का उपयोग करके दोनों सिंक्रनाइज़ेशन पॉइंट्स पर समाप्त होने के लिए प्रतीक्षा करें, जैसे कि Aix के उत्तर उल्लेख।

यह time.sleep(10) से बेहतर है क्योंकि आप सटीक समय की गारंटी नहीं दे सकते हैं। स्पष्ट रूप से प्रतीक्षा करने के साथ, आप कह रहे हैं कि कार्यों को अगले चरण में जाने से पहले उस चरण को निष्पादित किया जाना चाहिए, यह मानने के बजाय कि यह 10ms के भीतर किया जाएगा, जो कि मशीन पर और क्या चल रहा है, इस पर आधारित नहीं है।

3

यदि आप एक विंडोज उपयोगकर्ता हैं और पायथन 3 का उपयोग कर रहे हैं, तो यह पोस्ट आपको पाइथन में समानांतर प्रोग्रामिंग करने में मदद करेगी। जब आप एक सामान्य मल्टीप्रोसेसिंग लाइब्रेरी के पूल प्रोग्रामिंग चलाते हैं, तो आपको अपने प्रोग्राम में मुख्य फ़ंक्शन के बारे में एक त्रुटि मिलेगी । ऐसा इसलिए है क्योंकि विंडोज़ में कोई कांटा() कार्यक्षमता नहीं है। नीचे दी गई पोस्ट उल्लिखित समस्या का समाधान दे रही है।

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

जब से मैं अजगर 3 उपयोग कर रहा था, मैं एक छोटे से इस तरह कार्यक्रम बदल दिया है:

from types import FunctionType 
import marshal 

def _applicable(*args, **kwargs): 
    name = kwargs['__pw_name'] 
    code = marshal.loads(kwargs['__pw_code']) 
    gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) 
    defs = marshal.loads(kwargs['__pw_defs']) 
    clsr = marshal.loads(kwargs['__pw_clsr']) 
    fdct = marshal.loads(kwargs['__pw_fdct']) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    del kwargs['__pw_name'] 
    del kwargs['__pw_code'] 
    del kwargs['__pw_defs'] 
    del kwargs['__pw_clsr'] 
    del kwargs['__pw_fdct'] 
    return func(*args, **kwargs) 

def make_applicable(f, *args, **kwargs): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    kwargs['__pw_name'] = f.__name__ # edited 
    kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited 
    kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited 
    kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited 
    kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited 
    return _applicable, args, kwargs 

def _mappable(x): 
    x,name,code,defs,clsr,fdct = x 
    code = marshal.loads(code) 
    gbls = globals() #gbls = marshal.loads(gbls) 
    defs = marshal.loads(defs) 
    clsr = marshal.loads(clsr) 
    fdct = marshal.loads(fdct) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    return func(x) 

def make_mappable(f, iterable): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    name = f.__name__ # edited 
    code = marshal.dumps(f.__code__) # edited 
    defs = marshal.dumps(f.__defaults__) # edited 
    clsr = marshal.dumps(f.__closure__) # edited 
    fdct = marshal.dumps(f.__dict__) # edited 
    return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

इस समारोह के बाद, ऊपर समस्या कोड भी एक छोटे से इस तरह बदल गई है:

from multiprocessing import Pool 
from poolable import make_applicable, make_mappable 

def cube(x): 
    return x**3 

if __name__ == "__main__": 
    pool = Pool(processes=2) 
    results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] 
    print([result.get(timeout=10) for result in results]) 

और मैं के रूप में उत्पादन हो गया है:

[1, 8, 27, 64, 125, 216] 

मुझे लगता है कि यह पोस्ट कुछ विंडोज उपयोगकर्ताओं के लिए उपयोगी हो सकती है।

संबंधित मुद्दे