11

मैं एक एल्गोरिदम के लिए पाइथन मल्टीप्रोसेसिंग लाइब्रेरी का उपयोग करता हूं जिसमें मेरे पास कुछ डेटा प्रोसेसिंग करते हैं और मूल प्रक्रिया में परिणाम लौटाते हैं। मैं मल्टीप्रोसेसिंग का उपयोग करता हूं। कर्मचारियों को नौकरियों को पारित करने के लिए क्यूयू, और परिणाम एकत्र करने के लिए दूसरा।श्रमिकों में पाइथन मल्टीप्रोसेसिंग और हैंडलिंग अपवाद

यह सब कुछ अच्छी तरह से काम करता है, जब तक कि कार्यकर्ता डेटा के कुछ हिस्से को संसाधित करने में विफल रहता है। असफल हो सकता है इस मामले कार्यकर्ता को नष्ट कर दिया जाना चाहिए में,

  • डाटा प्रोसेसिंग - - प्रसंस्करण डेटा का एक हिस्सा असफल हो सकता है इस मामले कार्यकर्ता में, इसे छोड़ देना चाहिए

    • प्रारंभ: प्रत्येक कार्यकर्ता नीचे सरल उदाहरण में दो चरणों है खंड और अगले डेटा के साथ जारी रखें।

    जब इनमें से कोई भी चरण विफल रहता है तो मुझे स्क्रिप्ट पूर्ण होने के बाद एक डेडलॉक मिलता है। इस कोड को अपनी समस्या simulates:

    1. जब init() विफल रहता है, कैसे पता लगाने के लिए कि कार्यकर्ता अमान्य है और यह समाप्त होने की प्रतीक्षा करने के लिए नहीं:

      import multiprocessing as mp 
      import random 
      
      workers_count = 5 
      # Probability of failure, change to simulate failures 
      fail_init_p = 0.2 
      fail_job_p = 0.3 
      
      
      #========= Worker ========= 
      def do_work(job_state, arg): 
          if random.random() < fail_job_p: 
           raise Exception("Job failed") 
          return "job %d processed %d" % (job_state, arg) 
      
      def init(args): 
          if random.random() < fail_init_p: 
           raise Exception("Worker init failed") 
          return args 
      
      def worker_function(args, jobs_queue, result_queue): 
          # INIT 
          # What to do when init() fails? 
          try: 
           state = init(args) 
          except: 
           print "!Worker %d init fail" % args 
           return 
          # DO WORK 
          # Process data in the jobs queue 
          for job in iter(jobs_queue.get, None): 
           try: 
            # Can throw an exception! 
            result = do_work(state, job) 
            result_queue.put(result) 
           except: 
            print "!Job %d failed, skip..." % job 
           finally: 
            jobs_queue.task_done() 
          # Telling that we are done with processing stop token 
          jobs_queue.task_done() 
      
      
      
      #========= Parent ========= 
      jobs = mp.JoinableQueue() 
      results = mp.Queue() 
      for i in range(workers_count): 
          mp.Process(target=worker_function, args=(i, jobs, results)).start() 
      
      # Populate jobs queue 
      results_to_expect = 0 
      for j in range(30): 
          jobs.put(j) 
          results_to_expect += 1 
      
      # Collecting the results 
      # What if some workers failed to process the job and we have 
      # less results than expected 
      for r in range(results_to_expect): 
          result = results.get() 
          print result 
      
      #Signal all workers to finish 
      for i in range(workers_count): 
          jobs.put(None) 
      
      #Wait for them to finish 
      jobs.join() 
      

      मैं इस कोड के बारे में दो सवाल है?

    2. जब do_work() विफल रहता है, तो माता-पिता प्रक्रिया को कैसे सूचित किया जाए कि परिणाम कतार में कम परिणाम की अपेक्षा की जानी चाहिए?

    मदद के लिए धन्यवाद!

  • उत्तर

    10

    मैंने इसे अपना काम करने के लिए थोड़ा सा कोड बदल दिया (नीचे स्पष्टीकरण देखें)।

    import multiprocessing as mp 
    import random 
    
    workers_count = 5 
    # Probability of failure, change to simulate failures 
    fail_init_p = 0.5 
    fail_job_p = 0.4 
    
    
    #========= Worker ========= 
    def do_work(job_state, arg): 
        if random.random() < fail_job_p: 
         raise Exception("Job failed") 
        return "job %d processed %d" % (job_state, arg) 
    
    def init(args): 
        if random.random() < fail_init_p: 
         raise Exception("Worker init failed") 
        return args 
    
    def worker_function(args, jobs_queue, result_queue): 
        # INIT 
        # What to do when init() fails? 
        try: 
         state = init(args) 
        except: 
         print "!Worker %d init fail" % args 
         result_queue.put('init failed') 
         return 
        # DO WORK 
        # Process data in the jobs queue 
        for job in iter(jobs_queue.get, None): 
         try: 
          # Can throw an exception! 
          result = do_work(state, job) 
          result_queue.put(result) 
         except: 
          print "!Job %d failed, skip..." % job 
          result_queue.put('job failed') 
    
    
    #========= Parent ========= 
    jobs = mp.Queue() 
    results = mp.Queue() 
    for i in range(workers_count): 
        mp.Process(target=worker_function, args=(i, jobs, results)).start() 
    
    # Populate jobs queue 
    results_to_expect = 0 
    for j in range(30): 
        jobs.put(j) 
        results_to_expect += 1 
    
    init_failures = 0 
    job_failures = 0 
    successes = 0 
    while job_failures + successes < 30 and init_failures < workers_count: 
        result = results.get() 
        init_failures += int(result == 'init failed') 
        job_failures += int(result == 'job failed') 
        successes += int(result != 'init failed' and result != 'job failed') 
        #print init_failures, job_failures, successes 
    
    for ii in range(workers_count): 
        jobs.put(None) 
    

    मेरे परिवर्तन:

    1. बदल दिया jobs सिर्फ एक सामान्य Queue (JoinableQueue के बजाय) किया जाना है।
    2. श्रमिक अब विशेष परिणाम स्ट्रिंग "init विफल" और "नौकरी विफल" वापस संवाद करते हैं।
    3. मास्टर प्रक्रिया विशेष विशेष परिणामों के लिए मॉनीटर करती है जब तक कि विशिष्ट स्थितियां प्रभावी न हों।
    4. अंत में, हालांकि, आपके पास कई श्रमिकों के लिए "रोकें" अनुरोध (यानी None नौकरियां) डालें। ध्यान दें कि इन सभी को कतार से नहीं खींचा जा सकता है (यदि कार्यकर्ता इनटाइलाइज करने में असफल रहा)।

    वैसे, आपका मूल कोड अच्छा और काम करने में आसान था। यादृच्छिक संभावनाएं बिट बहुत अच्छी है।

    +2

    या आप त्रुटियों के लिए इन-बैंड संचार से बचने के लिए परिणाम कतार में एक tuple '(परिणाम, त्रुटि)' (त्रुटि सफलता पर कोई नहीं) डाल सकते हैं। – jfs

    संबंधित मुद्दे