2013-01-08 12 views
5

का मल्टीप्रोसेसिंग और ऐरे मैं 3 प्रक्रियाओं को लॉन्च कर रहा हूं और मैं चाहता हूं कि वे एक साझा सरणी, प्रक्रिया के अनुरूप सूचकांक में एक स्ट्रिंग डालें। (I)। नीचे दिए गए कोड मेंपायथन: c_char_p

देखो, उत्पन्न उत्पादन होता है:

['test 0', None, None] 
['test 1', 'test 1', None] 
['test 2', 'test 2', 'test 2'] 

क्यों 'परीक्षण 0' test 1 द्वारा ओवरराइट, और test 1test 2 द्वारा?

मैं क्या चाहते हैं (क्रम महत्वपूर्ण नहीं है) है:

['test 0', None, None] 
['test 0', 'test 1', None] 
['test 0', 'test 1', 'test 2'] 

कोड:

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Value, Lock, Process, Array 
import ctypes 
from ctypes import c_int, c_char_p 

class Consumer(multiprocessing.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      multiprocessing.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr=self.arr, lock=self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr=None, lock=None): 
     with lock: 
      arr[self.i] = "test %d" % self.i 
      print arr[:] 

    def __str__(self): 
     return 'ARC' 

    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    arr = Array(ctypes.c_char_p, 3) 

    lock = multiprocessing.Lock() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

मैं अजगर 2.7.3 (उबंटू) चल रहा हूँ

उत्तर

5

यह समस्या लगती है this one के समान। वहां, जेएफ सेबेस्टियन ने अनुमान लगाया कि arr[i] अंक arr[i] को एक स्मृति पते पर असाइनमेंट जो असाइनमेंट बनाने के लिए केवल सार्थक था। अन्य उपप्रोसेस उस पते को देखते समय कचरा पुनर्प्राप्त करते हैं।

इस समस्या से बचने के लिए कम से कम दो तरीके हैं।

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, lock, lst): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.lock = lock 
      self.lst = lst 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(lock = self.lock, lst = self.lst) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, lock, lst): 
     with lock: 
      lst[self.i] = "test {}".format(self.i) 
      print([lst[i] for i in range(3)]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    manager = mp.Manager() 
    lst = manager.list(['']*3) 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

एक और तरीका है एक निश्चित आकार mp.Array('c', 10) रूप में इस तरह के साथ एक साझा सरणी का उपयोग करने के लिए है: एक एक multiprocessing.manager सूची का उपयोग करने के लिए है।

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr = self.arr, lock = self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr, lock): 
     with lock: 
      arr[self.i].value = "test {}".format(self.i) 
      print([a.value for a in arr]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    arr = [mp.Array('c', 10) for i in range(3)] 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

मैं अटकलें कारण है कि इस काम करता है जब mp.Array(ctypes.c_char_p, 3) नहीं है, यह है कि क्योंकि mp.Array('c', 10) एक निश्चित आकार इतना स्मृति पता कभी नहीं बदलता है, जबकि mp.Array(ctypes.c_char_p, 3) एक चर आकार की है, इसलिए जब arr[i] है स्मृति पते बदल सकता है एक बड़ी स्ट्रिंग को सौंपा गया।

शायद यह क्या the docs, जब यह कहा गया है के बारे में चेतावनी रहे हैं

हालांकि यह संभव है स्टोर करने के लिए साझा स्मृति में एक सूचक याद रखें कि यह एक विशिष्ट का पता स्थान में एक स्थान के पास भेजेगा प्रक्रिया। हालांकि, सूचक दूसरी प्रक्रिया के संदर्भ में संदर्भ में अमान्य होने की संभावना है और से पॉइंटर को हटाने की कोशिश कर रहा है दूसरी प्रक्रिया क्रैश का कारण बन सकती है।

+0

धन्यवाद अरबों बार! आपके दोनों समाधान वास्तव में काम कर रहे हैं :) मैं जेएफ सेबेस्टियन के उस पद पर आया लेकिन किसी कारण से इसे लागू नहीं कर सका ... दोह! अब आप मुझे बताओ कि मुझे अपनी मूर्ति बनाना चाहिए! फिर से धन्यवाद ... – Ujoux

+0

दिलचस्प सवाल और उत्साह के लिए धन्यवाद! Stackoverflow पर आपको और अधिक देखने की उम्मीद है। मूर्तियों के लिए - मुझे लगता है कि चेक मार्क के ऊपर उपरोक्त पर क्लिक करना एक बहुत ही बढ़िया बनाता है; ^) – unutbu

+0

जैसे ही मेरी 15 की जरूरी प्रतिष्ठा होगी, मैं नहीं भूलूंगा;) – Ujoux