2008-09-17 19 views
13

में डेडलॉक मुझे रूबी के लिए एक सभ्य थ्रेडपूल कार्यान्वयन नहीं मिला, इसलिए मैंने अपना लिखा (आंशिक रूप से यहाँ से कोड पर आधारित: http://snippets.dzone.com/posts/show/3276, लेकिन थ्रेडपूल शट डाउन के लिए प्रतीक्षा/सिग्नल और अन्य कार्यान्वयन में बदल गया। हालांकि कुछ समय बाद (100 धागे होने और 1300 के बारे में कार्यों को प्रबंधित करने) चल रहा है, यह लाइन 25 पर गतिरोध के साथ मर जाता है -। यह वहाँ एक नया काम के लिए इंतजार कर रहा है कोई भी विचार, क्यों यह हो सकता हैथ्रेडपूल

require 'thread' 
begin 
    require 'fastthread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(callback) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @callback = callback 
     @mutex.synchronize {@running = true} 
     @thread = Thread.new do 
     while @mutex.synchronize {@running} 
      block = get_block 
      if block 
      block.call 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
      else 
      # Wait for a new job 
      @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @mutex.synchronize {@block} 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @mutex.synchronize {@block = nil} 
    end 

    def busy? 
     @mutex.synchronize {[email protected]?} 
    end 

    def stop 
     @mutex.synchronize {@running = false} 
     # Signal the thread not to wait for a new job 
     @cv.signal 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @workers = [] 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    end 

    def size 
    @mutex.synchronize {@workers.size} 
    end 

    def busy? 
    @mutex.synchronize {@workers.any? {|w| w.busy?}} 
    end 

    def shutdown 
    @mutex.synchronize {@workers.each {|w| w.stop}} 
    end 
    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    while true 
     @mutex.synchronize do 
     worker = get_worker 
     if worker 
      return worker.set_block(block) 
     else 
      # Wait for a free worker 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 

    # Used by workers to report ready status 
    def signal 
    @cv.signal 
    end 

    private 
    def get_worker 
    free_worker || create_worker 
    end 

    def free_worker 
    @workers.each {|w| return w unless w.busy?}; nil 
    end 

    def create_worker 
    return nil if @workers.size >= @max_size 
    worker = Worker.new(self) 
    @workers << worker 
    worker 
    end 
end 

उत्तर

10

ठीक है, इसलिए कार्यान्वयन के साथ मुख्य समस्या यह है कि यह सुनिश्चित करने के लिए कि कोई संकेत गुम हो गया है और मृत ताले से बचें?

मेरे अनुभव में, यह स्थिति चर और म्यूटेक्स के साथ वास्तव में हासिल करना मुश्किल है, लेकिन सेमफोरों के साथ आसान है। ऐसा इसलिए होता है कि रूबी क्यूई (या साइज्ड क्यूयू) नामक ऑब्जेक्ट को कार्यान्वित करती है जो समस्या को हल करनी चाहिए।

require 'thread' 
begin 
    require 'fasttread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(thread_queue) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @queue = thread_queue 
     @running = true 
     @thread = Thread.new do 
     @mutex.synchronize do 
      while @running 
      @cv.wait(@mutex) 
      block = get_block 
      if block 
       @mutex.unlock 
       block.call 
       @mutex.lock 
       reset_block 
      end 
      @queue << self 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @block 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @block = nil 
    end 

    def busy? 
     @mutex.synchronize { [email protected]? } 
    end 

    def stop 
     @mutex.synchronize do 
     @running = false 
     @cv.signal 
     end 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @queue = Queue.new 
    @workers = [] 
    end 

    def size 
    @workers.size 
    end 

    def busy? 
    @queue.size < @workers.size 
    end 

    def shutdown 
    @workers.each { |w| w.stop } 
    @workers = [] 
    end 

    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    worker = get_worker 
    worker.set_block(block) 
    end 

    private 

    def get_worker 
    if [email protected]? or @workers.size == @max_size 
     return @queue.pop 
    else 
     worker = Worker.new(@queue) 
     @workers << worker 
     worker 
    end 
    end 

end 

और यहाँ एक साधारण परीक्षण कोड है::

tp = ThreadPool.new 500 
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } 
tp.shutdown 
+0

1. @workers तक पहुंच को सिंक्रनाइज़ नहीं किया जाना चाहिए? 2. कार्यकर्ता थ्रेड में अभी भी लॉक और अनलॉक करने की आवश्यकता क्यों है? – Roman

+0

कार्यकर्ता तक पहुंच हमेशा एक ही थ्रेड से की जाती है ... इसलिए सिंक्रनाइज़ेशन की आवश्यकता नहीं होती है। कार्यकर्ता धागे में लॉक के लिए, आपको थ्रेड को सुरक्षित रूप से उठाने की आवश्यकता है। – PierreBdR

+0

इसमें अभी भी एक समस्या है - डेडलॉक के लिए एक मौका है - जब कार्यकर्ता धागा स्वयं को कतार में जोड़ता है, तो थ्रेडपूल इसे कतार से ले जा सकता है और कार्य सौंप सकता है। उस मामले में एक संकेत भेजा जाएगा। हालांकि, अगर कार्यकर्ता धागा सीवी पर इंतजार नहीं कर रहा है, तो सिग्नल खो जाएगा। – Roman

1

मैं थोड़ा यहाँ पक्षपाती हूँ,? लेकिन मैं कुछ प्रक्रिया भाषा में मॉडलिंग का सुझाव दूंगा और मॉडल इसे जांच सकता हूं। उदाहरण के लिए, एमसीआरएल 2 टूलसेट (एसीपी-आधारित भाषा का उपयोग करके), मोबिलिटी वर्कबेंच (पीआई-कैलकुस) और स्पिन (प्रोमेला)।

अन्यथा मैं कोड के हर बिट को हटाने का सुझाव दूंगा जो समस्या के लिए आवश्यक नहीं है और एक मामूली मामला ढूंढना जहां डेडलॉक होता है। मुझे संदेह है कि डेडलॉक प्राप्त करने के लिए 100 धागे और 1300 कार्य आवश्यक हैं। एक छोटे से मामले के साथ आप शायद कुछ डीबग प्रिंट जोड़ सकते हैं जो समस्या को हल करने के लिए पर्याप्त जानकारी प्रदान करते हैं।

+0

प्रश्न में कोड केवल 180000 में से 1300 कार्यों को संसाधित करने में असफल रहा, दुर्भाग्य से इसे छोटे सेट के साथ पुन: उत्पन्न नहीं कर सका ... – Roman

1

ठीक है, समस्या आपके थ्रेडपूल # सिग्नल विधि में प्रतीत होती है। क्या हो सकता है है:

1 - अपने सभी कार्यकर्ता व्यस्त हैं और आप एक नया काम

2 संसाधित करने का प्रयास - लाइन 90 एक शून्य कार्यकर्ता

3 हो जाता है - एक कार्यकर्ता को मुक्त कर दिया है और यह संकेत मिलता है, लेकिन सिग्नल खो गया है क्योंकि थ्रेडपूल इसके लिए इंतजार नहीं कर रहा है

4 - आप लाइन 95 पर आते हैं, भले ही एक मुफ्त कर्मचारी हो।

यहां त्रुटि यह है कि आप एक नि: शुल्क कार्यकर्ता को सिग्नल कर सकते हैं, भले ही कोई भी सुन न सके। यह थ्रेडपूल # सिग्नल विधि होना चाहिए:

def signal 
    @mutex.synchronize { @cv.signal } 
end 

और समस्या वर्कर ऑब्जेक्ट में समान है। क्या हो सकता है है:

1 - यह जाँच करता है (लाइन 17) एक नौकरी प्रतीक्षा है तो - कार्यकर्ता सिर्फ एक नौकरी

2 पूरा: वहाँ

3 नहीं है - थ्रेड पूल भेजने एक नया काम है और यह संकेत ... लेकिन संकेत

4 खो दिया है - एक संकेत के लिए कार्यकर्ता इंतजार भले ही यह के रूप में व्यस्त चिह्नित है

आप के रूप में अपने इनिशियलाइज़ विधि रखना चाहिए:

def initialize(callback) 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    @callback = callback 
    @mutex.synchronize {@running = true} 
    @thread = Thread.new do 
    @mutex.synchronize do 
     while @running 
     block = get_block 
     if block 
      @mutex.unlock 
      block.call 
      @mutex.lock 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
     else 
      # Wait for a new job 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 
end 

अगला, कार्यकर्ता # get_block और वर्कर # reset_block विधियों को अब सिंक्रनाइज़ नहीं किया जाना चाहिए। इस तरह, आपके पास एक ब्लॉक के लिए परीक्षण और सिग्नल के इंतजार के बीच एक कार्यकर्ता को असाइन किया गया ब्लॉक नहीं हो सकता है।

+0

मुझे लगता है कि आप सही हैं! मैं इसका तुरंत परीक्षण करूंगा, धन्यवाद! – Roman

+0

हम्म .. अब जब मैं धागे को पूरा करने की प्रतीक्षा कर रहा हूं तो एक डेडलॉक है (उदा। थ्रेडपूल के लिए शामिल होने के लिए कॉल करना)। मैं समझने की कोशिश कर रहा हूं क्यों। – Roman

8

आप work_queue मणि, एक निर्माता और कार्यकर्ता धागे का एक पूल के बीच कार्य का समन्वय करने के लिए डिज़ाइन की कोशिश कर सकते यहाँ मेरी सुझाई गई कार्यान्वयन है।

संबंधित मुद्दे