2010-08-10 11 views
42

मैं ThreadPoolExecutor का उपयोग करके कई कार्यों को निष्पादित करने का प्रयास कर रहा हूं। नीचे एक काल्पनिक उदाहरण है:कतार पूर्ण होने पर ThreadPoolExecutor ब्लॉक?

def workQueue = new ArrayBlockingQueue<Runnable>(3, false) 
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) 
for(int i = 0; i < 100000; i++) 
    threadPoolExecutor.execute(runnable) 

समस्या यह है कि मैं जल्दी से एक java.util.concurrent.RejectedExecutionException पाने के बाद से कार्यों की संख्या काम कतार के आकार से अधिक है। हालांकि, वांछित व्यवहार जो मैं खोज रहा हूं वह मुख्य धागा ब्लॉक है जब तक कि कतार में कमरा न हो। इसे निष्पादित करने का श्रेष्ठ तरीका क्या है?

+2

इस प्रश्न पर एक नज़र डालें: http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – Kiril

+2

[यह उत्तर] (http : //stackoverflow.com/a/4522411/394431) एक अन्य प्रश्न के लिए एक कस्टम 'अवरुद्ध क्यूई' उपclass का उपयोग करने का सुझाव देता है जो 'put()' को प्रस्तुत करके 'प्रस्ताव()' पर ब्लॉक करता है। मुझे लगता है कि 'अस्वीकृत निष्पादन हैंडलर' के रूप में उतना ही कम काम करना समाप्त होता है जो 'getQueue() डालता है()। –

+2

सीधे कतार में डालना गलत होगा, जैसा कि इस उत्तर में बताया गया है http://stackoverflow.com/a/3518588/585903 –

उत्तर

45

कुछ बहुत संकीर्ण परिस्थितियों में, आप एक java.util.concurrent लागू कर सकते हैं। अस्वीकृत एक्सेक्यूशन हैंडलर जो आपको चाहिए।

RejectedExecutionHandler block = new RejectedExecutionHandler() { 
    rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    executor.getQueue().put(r); 
    } 
}; 

ThreadPoolExecutor pool = new ... 
pool.setRejectedExecutionHandler(block); 

अब। यह निम्न कारणों

  • यह गतिरोध है क्योंकि पूल में सभी धागे बात आप कतार में डाल दिख रहा है से पहले मर सकते होने का खतरा है के लिए एक बहुत बुरा विचार है। एक उचित समय जीवित समय निर्धारित करके इसे मिटाना।
  • कार्य आपके एक्जिक्यूटर की अपेक्षा से लपेटा नहीं गया है। बहुत से निष्पादक कार्यान्वयन निष्पादन से पहले किसी प्रकार की ट्रैकिंग ऑब्जेक्ट में अपने कार्यों को लपेटते हैं। तुम्हारा स्रोत देखें।
  • getQueue() के माध्यम से जोड़ना एपीआई द्वारा दृढ़ता से निराश होता है, और किसी बिंदु पर निषिद्ध हो सकता है।

थ्रेडपूलएक्ससेलर.केलररन्स पॉलिसी स्थापित करने के लिए लगभग हमेशा बेहतर रणनीति है जो आपके एप को थ्रेड पर कार्य चलाकर निष्पादित करेगा()।

लेकिन, कभी कभी एक अवरुद्ध रणनीति, अपने सभी निहित जोखिम के साथ, वास्तव में आप क्या चाहते है। मैं इन शर्तों

  • तहत कहेंगे आप केवल एक ही धागे पर अमल()
  • आप के लिए है बुला है (या करना चाहते हैं) एक बहुत छोटे से कतार की लंबाई
  • है आप बिल्कुल की संख्या को सीमित करने की जरूरत है इस काम को चलाने वाले थ्रेड (आमतौर पर बाहरी कारणों से), और एक कॉलर-रन रणनीति उसको तोड़ देगी।
  • आपके कार्य, अप्रत्याशित आकार के होते हैं तो फोन करने वाले-रन भुखमरी परिचय सकता है अगर पूल क्षण भर के 4 छोटे कार्यों के साथ व्यस्त था और अपने एक धागा कॉल करना एक बड़ा एक साथ फंस मिला निष्पादित।

तो, मैं कहता हूँ के रूप में। इसकी शायद ही कभी आवश्यकता है और खतरनाक हो सकता है, लेकिन वहां आप जाते हैं।

गुड लक।

public void executeBlocking(Runnable command) { 
    if (threadPool == null) { 
     logger.error("Thread pool '{}' not initialized.", threadPoolName); 
     return; 
    } 
    ThreadPool threadPoolMonitor = this; 
    boolean accepted = false; 
    do { 
     try { 
      threadPool.execute(new Runnable() { 
       @Override 
       public void run() { 
        try { 
         command.run(); 
        } 
        // to make sure that the monitor is freed on exit 
        finally { 
         // Notify all the threads waiting for the resource, if any. 
         synchronized (threadPoolMonitor) { 
          threadPoolMonitor.notifyAll(); 
         } 
        } 
       } 
      }); 
      accepted = true; 
     } 
     catch (RejectedExecutionException e) { 
      // Thread pool is full 
      try { 
       // Block until one of the threads finishes its job and exits. 
       synchronized (threadPoolMonitor) { 
        threadPoolMonitor.wait(); 
       } 
      } 
      catch (InterruptedException ignored) { 
       // return immediately 
       break; 
      } 
     } 
    } while (!accepted); 
} 

ThreadPool java.util.concurrent.ExecutorService की एक स्थानीय उदाहरण है, जिसमें पहले से ही शुरू कर दिया गया है:

+2

एक बहुत अच्छी तरह से विचार-विमर्श प्रतिक्रिया। मैं आपकी हालत के साथ मामूली समस्या लेता हूं कि> "आपको बहुत छोटी कतार की लंबाई है (या चाहते हैं)।" आप भविष्यवाणी करने में सक्षम नहीं हो सकते कि दिए गए नौकरी के कितने कार्य कतारबद्ध होंगे। हो सकता है कि आप एक दैनिक नौकरी चला रहे हैं जो कुछ डीबी से डेटा संसाधित करता है और सोमवार को प्रक्रिया के लिए 500 रिकॉर्ड होते हैं लेकिन मंगलवार को 50,000 होते हैं। आपको अपनी कतार पर ऊपरी बाउंड सेट करना होगा जैसे कि जब कोई बड़ी नौकरी आती है तो आप अपना ढेर नहीं उड़ाएंगे। उस स्थिति में कतार से पहले कुछ कार्यों को पूरा करने के इंतजार में कोई नुकसान नहीं होता है। – skelly

+0

शानदार उत्तर, धन्यवाद –

+0

"यह डेडलॉक के लिए प्रवण है क्योंकि पूल में मौजूद सभी धागे आपके द्वारा कतार में रखी गई चीज़ से पहले मर सकते हैं। उचित समय को जीवंत रखने के द्वारा इसे मिटाना।" शून्य पूल आकार को शून्य से अधिक मान पर सेट करके पूरी तरह से टाला नहीं जा सकता है? हर दूसरे कारण जावा का पतन है जिसमें निष्पादक कतारों को अवरुद्ध करने के लिए अंतर्निहित समर्थन नहीं है। जो दिलचस्प है, क्योंकि यह एक बहुत ही उचित रणनीति प्रतीत होता है। मुझे आश्चर्य है कि तर्क क्या है। –

1

यहाँ इस मामले में मेरी कोड स्निपेट है।

0

मैं एक कस्टम RejectedExecutionHandler, जो केवल थोड़ी देर के लिए बुला धागा ब्लॉक और उसके बाद काम फिर से सबमिट करने का प्रयास का उपयोग कर इस समस्या का समाधान:

public class BlockWhenQueueFull implements RejectedExecutionHandler { 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 

     // The pool is full. Wait, then try again. 
     try { 
      long waitMs = 250; 
      Thread.sleep(waitMs); 
     } catch (InterruptedException interruptedException) {} 

     executor.execute(r); 
    } 
} 

इस वर्ग के सिर्फ धागा पूल में इस्तेमाल किया जा सकता एक अस्वीकृत निष्पादन के रूप में निष्पादक किसी अन्य की तरह हैंडलर। इस उदाहरण में:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull()) 

केवल नकारात्मक पक्ष यह है मुझे लगता है कि फोन करने धागा थोड़ा अत्यंत आवश्यक होता से अधिक समय (250ms तक) पहुंच पाते हो सकता है। कई शॉर्ट-रनिंग कार्यों के लिए, शायद प्रतीक्षा समय को 10ms या उससे भी कम कर दें। इसके अलावा, चूंकि इस निष्पादक को प्रभावी रूप से पुनरावर्ती कहा जा रहा है, इसलिए एक थ्रेड उपलब्ध होने के लिए बहुत लंबा इंतजार कर रहा है (घंटों) के परिणामस्वरूप एक ढेर ओवरफ्लो हो सकता है।

फिर भी, मुझे व्यक्तिगत रूप से इस विधि को पसंद है। यह कॉम्पैक्ट, समझने में आसान है, और अच्छी तरह से काम करता है। क्या मुझे कुछ भी याद आ रही है?

संबंधित मुद्दे