2013-05-31 11 views
9

की आवश्यकता है मैं एसिंक्रोनस 'प्रसारण शैली' संदेश प्रबंधित करने के लिए एक पायथन कक्षा (अधिमानतः एक तृतीय पक्ष लाइब्रेरी की बजाय मानक भाषा का हिस्सा) की तलाश में हूं।एक थ्रेड-सुरक्षित एसिंक्रोनस संदेश कतार

मेरे पास एक थ्रेड होगा जो कतार पर संदेश रखता है ('putMessageOnQueue' विधि को ब्लॉक नहीं करना चाहिए) और फिर कई अन्य धागे जो सभी संदेशों के लिए इंतजार कर रहे हैं, संभवतः कुछ अवरुद्ध 'waitForMessage' फ़ंक्शन को अवरुद्ध करते हुए। जब कतार पर एक संदेश रखा जाता है, तो मैं प्रत्येक प्रतीक्षा धागे को संदेश की अपनी प्रति प्राप्त करना चाहता हूं।

मैंने अंतर्निहित Queue कक्षा को देखा है, लेकिन मुझे नहीं लगता कि यह उपयुक्त है क्योंकि उपभोग करने वाले संदेश उन्हें कतार से हटाने में शामिल होते हैं, इसलिए केवल 1 क्लाइंट थ्रेड प्रत्येक को देखेगा।

ऐसा लगता है कि यह एक सामान्य उपयोग-मामला होना चाहिए, क्या कोई समाधान का सुझाव दे सकता है?

+0

मैं है मान लीजिए कि आप अपनी खुद की कक्षा बना सकते हैं जो ट्रैक के ट्रैक को रखता है, किस संदेश को बिना किसी समस्या के संदेश मिला। – Bakuriu

उत्तर

7

मुझे लगता है कि इसके लिए सामान्य दृष्टिकोण प्रत्येक थ्रेड के लिए एक अलग संदेश कतार का उपयोग करना है, और संदेश को प्रत्येक कतार में धक्का देना है जिसने पहले ऐसे संदेशों को प्राप्त करने में रुचि दर्ज की है।

इस तरह कुछ काम करना चाहिए, लेकिन यह अपरीक्षित कोड है ...

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

बिल्कुल सही, यह बहुत अच्छा काम करता है! शर्मिंदा एक तैयार संस्करण नहीं है, लेकिन मुझे लगता है कि सिद्धांत जटिल नहीं है जब कोई इसे स्पष्ट रूप से समझाता है (जैसा आपने किया था)। – codebox

+0

@codebox ठीक है, ['multiprocessing'] (http://docs.python.org/2/library/multiprocessing.html) मॉड्यूल में बेहतर समर्थन है, लेकिन यह थ्रेड के बजाय उपप्रोसेसेस के लिए है। मुझे लगता है कि ऐसा इसलिए है क्योंकि इंटर-प्रोसेस संचार आमतौर पर इंटर-थ्रेड संचार से अधिक जटिल होता है, क्योंकि धागे स्वाभाविक रूप से एक ही ढेर को साझा करते हैं। – Aya

2

मुझे लगता है कि यह एक अधिक सीधे आगे उदाहरण (Python Lib में कतार उदाहरण से लिया)

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

यह प्रश्न की आवश्यकताओं को कैसे पूरा करता है? उन्होंने स्पष्ट रूप से कहा कि कतार काम नहीं करती है क्योंकि प्रत्येक धागे को आइटम की एक प्रति की आवश्यकता होती है। – Wlerin

संबंधित मुद्दे