2016-08-09 4 views
6

में थ्रेडपूल में प्रक्रियाओं को रोकना मैं कुछ हार्डवेयर को नियंत्रित करने वाली लाइब्रेरी के लिए एक इंटरैक्टिव रैपर (ipython में उपयोग के लिए) लिखने की कोशिश कर रहा हूं। कुछ कॉल आईओ पर भारी हैं इसलिए कार्यों को समानांतर में करने के लिए समझ में आता है। एक ThreadPool (लगभग) का उपयोग अच्छी तरह से काम करता है:पायथन

from multiprocessing.pool import ThreadPool 

class hardware(): 
    def __init__(IPaddress): 
     connect_to_hardware(IPaddress) 

    def some_long_task_to_hardware(wtime): 
     wait(wtime) 
     result = 'blah' 
     return result 

pool = ThreadPool(processes=4) 
Threads=[] 
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)] 
for tt in range(4): 
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000)) 
    threads.append(task) 
alive = [True]*4 
Try: 
    while any(alive) : 
     for tt in range(4): alive[tt] = not threads[tt].ready() 
     do_other_stuff_for_a_bit() 
except: 
    #some command I cannot find that will stop the threads... 
    raise 
for tt in range(4): print(threads[tt].get()) 

समस्या आता उपयोगकर्ता प्रक्रिया को रोकने के लिए करना चाहता है या वहाँ do_other_stuff_for_a_bit() में एक आईओ त्रुटि है यदि। दबाकर Ctrl + सी मुख्य प्रक्रिया को रोकता है लेकिन कार्यकर्ता धागे तब तक चलते रहते हैं जब तक उनका वर्तमान कार्य पूरा न हो जाए।
लाइब्रेरी को फिर से लिखने के बिना इन थ्रेड को रोकने के लिए कोई तरीका है या उपयोगकर्ता को पायथन से बाहर निकलना है? pool.terminate() और pool.join() जो मैंने अन्य उदाहरणों में उपयोग किया है, वे नौकरी नहीं लगते हैं।

वास्तविक दिनचर्या (उपरोक्त सरलीकृत संस्करण की बजाय) लॉगिंग का उपयोग करता है और हालांकि सभी कार्यकर्ता धागे किसी बिंदु पर बंद हो जाते हैं, मैं उन प्रक्रियाओं को देख सकता हूं जिन्हें उन्होंने पूरा करना जारी रखा है (और हार्डवेयर होने के नाते मैं देख सकता हूं कमरे में देखकर उनका प्रभाव)।

यह अजगर 2.7 में है।

अद्यतन:

समाधान एक धागा पूल के बजाय multiprocessing.Process उपयोग पर स्विच करना हो रहा है। परीक्षण कोड मैंने कोशिश की foo_pulse चलाने के लिए है: आप ThreadPool का उपयोग कर तो Ctrl-C चल (भले ही यह सूत्र सही दूर मारने करता से foo_pulse नहीं रुकती इस चल कोशिश तो

class foo(object): 
    def foo_pulse(self,nPulse,name): #just one method of *many* 
     print('starting pulse for '+name) 
     result=[] 
     for ii in range(nPulse): 
      print('on for '+name) 
      time.sleep(2) 
      print('off for '+name) 
      time.sleep(2) 
      result.append(ii) 
     return result,name 

, प्रिंट बयान पर रखने आने वाले:

from multiprocessing.pool import ThreadPool 
import time 
def test(nPulse): 
    a=foo() 
    pool=ThreadPool(processes=4) 
    threads=[] 
    for rn in range(4) : 
     r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn))) 
     threads.append(r) 
    alive=[True]*4 
    try: 
     while any(alive) : #wait until all threads complete 
      for rn in range(4): 
       alive[rn] = not threads[rn].ready() 
       time.sleep(1) 
    except : #stop threads if user presses ctrl-c 
     print('trying to stop threads') 
     pool.terminate() 
     print('stopped threads') # this line prints but output from foo_pulse carried on. 
     raise 
    else : 
     for t in threads : print(t.get()) 

हालांकि multiprocessing.Process का उपयोग कर एक संस्करण में काम करता है के रूप में उम्मीद:

import multiprocessing as mp 
import time 
def test_pro(nPulse): 
    pros=[] 
    ans=[] 
    a=foo() 
    for rn in range(4) : 
     q=mp.Queue() 
     ans.append(q) 
     r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))}) 
     r.start() 
     pros.append(r) 
    try: 
     for p in pros : p.join() 
     print('all done') 
    except : #stop threads if user stops findRes 
     print('trying to stop threads') 
     for p in pros : p.terminate() 
     print('stopped threads') 
    else : 
     print('output here') 
     for q in ans : 
      print(q.get()) 
    print('exit time') 

मैं पुस्तकालय foo के लिए एक आवरण कहाँ परिभाषित किया है (ताकि यह किया फिर से लिखे जाने की जरूरत नहीं है)।

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

प्रलेखन मैं कोई कारण नहीं क्यों एक पूल कार्य नहीं करेगा (एक बग के अलावा अन्य) को देखने से: वापसी मान की जरूरत नहीं है, तो न तो इस आवरण है।

+1

क्या आपके पास अनियंत्रित वर्गों का उपयोग करने का कोई कारण है? आपको 'concurrent.futures' मॉड्यूल के साथ शायद बेहतर भाग्य होगा। – SuperSaiyan

+0

अनियंत्रित वर्गों का उपयोग करने के लिए कोई वास्तविक कारण नहीं है - इसके अलावा, उदाहरण के कोड में जो कुछ भी किया गया था, उसमें इस्तेमाल किया गया था जब यह शोध किया गया था। – SRD

+0

@SuperSaiyan: यह एक अलग नाम के तहत दस्तावेज है; 'थ्रेडपूल' को 'multiprocessing.dummy.Pool' के तहत एक प्रलेखित फैशन में उजागर किया गया है, जहां ['multiprocessing.dummy' प्रक्रियाओं के बजाय थ्रेड द्वारा समर्थित' मल्टीप्रोसेसिंग 'एपीआई की एक करीबी प्रति है] (https: // docs। python.org/3/library/multiprocessing.html#module-multiprocessing.dummy)। – ShadowRanger

उत्तर

1

यह समांतरता का एक बहुत ही रोचक उपयोग है।

हालांकि, यदि आप multiprocessing का उपयोग कर रहे हैं, तो लक्ष्य कई प्रक्रियाओं को समानांतर में चलाना है, क्योंकि एक प्रक्रिया के विपरीत कई धागे चल रहे हैं।

इन कुछ परिवर्तन पर विचार करें multiprocessing का उपयोग कर इसे लागू करने:

आप इन कार्यों कि समानांतर में चलेंगे है:

import time 
import multiprocessing as mp 


def some_long_task_from_library(wtime): 
    time.sleep(wtime) 


class MyException(Exception): pass 

def do_other_stuff_for_a_bit(): 
    time.sleep(5) 
    raise MyException("Something Happened...") 

के बना सकते हैं और प्रक्रियाओं शुरू करते हैं, का कहना है कि 4:

procs = [] # this is not a Pool, it is just a way to handle the 
      # processes instead of calling them p1, p2, p3, p4... 
for _ in range(4): 
    p = mp.Process(target=some_long_task_from_library, args=(1000,)) 
    p.start() 
    procs.append(p) 
mp.active_children() # this joins all the started processes, and runs them. 

प्रक्रियाएं समानांतर में चल रही हैं, संभावित रूप से एक अलग सीपीयू कोर में, लेकिन यह तय करने के लिए ओएस के लिए है।आप अपने सिस्टम मॉनिटर में देख सकते हैं।

इस बीच में आप एक प्रक्रिया है कि टूट जाएगा चलाने के लिए, और आप चल रहे प्रक्रियाओं को रोकने के लिए, उन्हें अनाथ छोड़ने नहीं चाहता:

try: 
    do_other_stuff_for_a_bit() 
except MyException as exc: 
    print(exc) 
    print("Now stopping all processes...") 
    for p in procs: 
     p.terminate() 
print("The rest of the process will continue") 

यह मतलब नहीं है तो मुख्य प्रक्रिया जब साथ जारी रखने के लिए एक या सभी उपप्रमुखों को समाप्त कर दिया गया है, आपको मुख्य कार्यक्रम के बाहर निकलना चाहिए।

उम्मीद है कि यह मदद करता है, और आप अपनी लाइब्रेरी के लिए इस बिट्स को अनुकूलित कर सकते हैं।

+0

मेरे मामले में इससे कोई फर्क नहीं पड़ता कि सबकुछ एक ही सीपीयू पर चलता है, समानांतर में चलने का कारण यह है कि आईओ पर भारी इंतजार है। हालांकि यह विधि एक नकारात्मक पक्ष के साथ काम करती है कि कॉल से मूल्य वापस करना मुश्किल है। अभी के लिए मैंने इसे एक रैपर फ़ंक्शन के साथ हल किया - मेरी अद्यतन पोस्ट देखें। – SRD

+0

कॉल से वापस आने के लिए आपको किस प्रकार के मूल्यों की आवश्यकता है, आप 'कतार', 'पाइप', साझा मेमोरी 'वैल्यू 'या' ऐरे 'या यहां तक ​​कि एक डिस्क फ़ाइल का उपयोग कर सकते हैं। इनमें से कुछ मामलों में आपको 'लॉक' का उपयोग करने की आवश्यकता हो सकती है। – chapelo

0

क्यों पूल तो यह काम नहीं किया के सवाल के जवाब में इस परियोजना की प्रकृति इंटरैक्टिव अजगर है के लिए बच्चे प्रक्रियाओं द्वारा और कारण आयात योग्य होने के लिए की वजह से (Documentation में उद्धृत) तो मुख्य की जरूरत है उपयोग किया जा रहा है।

साथ ही यह स्पष्ट नहीं था कि थ्रेडपूल क्यों होगा - हालांकि सुराग नाम में सही है। ThreadPool multiprocessing.dummy का उपयोग करके कार्यकर्ता प्रक्रियाओं के अपने पूल को बनाता है, जैसा कि here नोट किया गया है, थ्रेडिंग मॉड्यूल के चारों ओर सिर्फ एक रैपर है। पूल multiprocessing.Process का उपयोग करता है। यह इस परीक्षण के द्वारा देखा जा सकता है: सूत्र एक विधि समाप्त कार्यकर्ता धागे तक वे अपने वर्तमान कार्य पूरा कर लिया है चल रहा है पर ले जाने के की जरूरत नहीं है

p=ThreadPool(processes=3) 
p._pool[0] 
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method 

p=Pool(processes=3) 
p._pool[0] 
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed 

के रूप में। थ्रेड मारना गन्दा है (यही कारण है कि मैंने मल्टीप्रोसेसिंग मॉड्यूल का उपयोग करने की कोशिश की) लेकिन समाधान here हैं।

एक ऊपर का उपयोग कर समाधान के बारे में चेतावनी:

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

उस वस्तु के कहने के अंदर विशेषताओं में बदलाव के मुख्य कार्यक्रम के लिए वापस पारित नहीं कर रहे हैं। उदाहरण के तौर पर ऊपर दिए गए वर्ग फू में विधियां भी हो सकती हैं जैसे: डीफ़ एडिप (नया आईपीआईपी): self.hardwareIP = newIP r=mp.Process(target=a.addIP,args=(127.0.0.1)) पर कॉल a अपडेट नहीं करता है।

एक जटिल वस्तु के लिए एक ही रास्ता इस दौर एक कस्टम manager जो दोनों तरीकों और वस्तु a की विशेषताओं एक बहुत बड़ी जटिल वस्तु के लिए एक पुस्तकालय यह सबसे अच्छा किया जा सकता है के आधार पर पर पहुंच सकते हैं का उपयोग कर स्मृति साझा किया जा रहा है प्रबंधक को पॉप्युलेट करने के लिए dir(foo) का उपयोग करना। अगर मैं यह समझ सकता हूं कि मैं इस उत्तर को एक उदाहरण के साथ कैसे अपडेट करूंगा (मेरे भविष्य के लिए दूसरों के जितना अधिक)।