2011-10-29 20 views
12

मैं एक कक्षा लिखने की कोशिश कर रहा हूं जो एकाधिक प्रक्रियाओं का उपयोग करके चेकसम की गणना करेगा, जिससे एकाधिक कोर का लाभ उठाया जा सकेगा। मेरे पास इसके लिए एक साधारण वर्ग है, और यह एक साधारण मामला निष्पादित करते समय बहुत अच्छा काम करता है। लेकिन जब भी मैं कक्षा के दो या दो से अधिक उदाहरण बना देता हूं, तो कर्मचारी कभी बाहर नहीं निकलता है। ऐसा लगता है कि यह संदेश कभी नहीं मिलता है कि पाइप को माता-पिता द्वारा बंद कर दिया गया है।पायथन मल्टीप्रोसेसिंग पाइप का उपयोग

सभी कोड नीचे मिल सकते हैं। मैं पहले md5 और sha1 चेकसम को अलग से गणना करता हूं, जो काम करता है, और फिर मैं समानांतर में गणना करने की कोशिश करता हूं, और फिर पाइप को बंद करने का समय होने पर प्रोग्राम लॉक हो जाता है।

यहां क्या हो रहा है? मैं उम्मीद कर रहे पाइप क्यों काम नहीं कर रहे हैं? मुझे लगता है कि मैं कतार पर "स्टॉप" संदेश भेजकर एक कामकाज कर सकता हूं और बच्चे को इस तरह से छोड़ सकता हूं, लेकिन मैं वास्तव में जानना चाहता हूं कि यह क्यों काम नहीं कर रहा है।

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 
    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     self.parent_conn.close() # This is the child. Close unused end. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 


def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums in parallel causes a lockup! 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here! 

main() 

पीएस। यह समस्या हल किया गया है यहाँ है ऊपर कोड की एक काम संस्करण अगर कोई रुचि रखता है:

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 

    all_open_parent_conns = [] 

    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     ChecksumPipe.all_open_parent_conns.append(self.parent_conn) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     for conn in ChecksumPipe.all_open_parent_conns: 
      conn.close() # This is the child. Close unused ends. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 

def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums also works fine now 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() 

main() 
+0

कनेक्शन को नष्ट करने के लिए 'self.parent_conn.close() 'के बाद आप' ChecksumPipe.all_open_parent_conns.remove (self.parent_conn) 'जोड़ना चाहेंगे। –

+0

'self.summer = eval ("हैशिलिब% s()"% csname)' बदसूरत लग रहा है। 'Self.summer = getattr (हैशिलिब, csname)()' के बारे में क्या? – glglgl

उत्तर

7

हाँ, कि वास्तव में आश्चर्य की बात व्यवहार है।

हालांकि, यदि आप दो समानांतर बाल प्रक्रियाओं के लिए lsof के आउटपुट को देखते हैं तो यह ध्यान रखना आसान है कि दूसरे बच्चे की प्रक्रिया में अधिक फ़ाइल डिस्क्रिप्टर खुले हैं।

क्या होता है कि जब दो समानांतर बाल प्रक्रियाएं शुरू होती हैं तो दूसरे बच्चे को माता-पिता की पाइप विरासत मिलती है, ताकि जब माता-पिता self.parent_conn.close() पर कॉल करें तो दूसरे बच्चे के पास अभी भी उस पाइप फ़ाइल डिस्क्रिप्टर को खोल दिया गया है, ताकि पाइप फ़ाइल विवरण न हो कर्नेल में बंद नहीं होता है (संदर्भ संख्या 0 से अधिक है), प्रभाव के साथ self.child_conn.recv_bytes() पहले समांतर बाल प्रक्रिया में कभी भी read() s EOF और EOFError कभी नहीं फेंक दिया जाता है।

आपको एक स्पष्ट शट डाउन संदेश भेजने की आवश्यकता हो सकती है, बल्कि फ़ाइल डिस्क्रिप्टर को बंद करना पड़ सकता है क्योंकि फ़ाइल प्रक्रियाओं के बीच कौन सी फाइल डिस्क्रिप्टर साझा की जाती हैं, इस पर थोड़ा नियंत्रण नहीं होता है (कोई क्लोज-ऑन-फोर्क फ़ाइल डिस्क्रिप्टर ध्वज नहीं है)।

+0

धन्यवाद! यह मेरे लिए चीजों को मंजूरी दे दी। मैंने इसे अपने उदाहरण में एक साझा वर्ग चर का उपयोग करके हल किया जिसमें सभी उदाहरणों में सभी खुले कनेक्शन शामिल हैं, ताकि बच्चे उन सभी सॉकेट को बंद कर सकें जिनकी उन्हें आवश्यकता नहीं है। –

संबंधित मुद्दे