2009-09-16 16 views
5

को निर्दिष्ट कोई (अस्वीकार कर दिया है कोई कार्य जिसका अर्थ है) कार्यों के लिए निर्वाहक हमेशा कम से कम 5 धागे होगा कि बनाने के लिए जिस तरह से है, और 20 धागे की अधिकतम, और असीम कतारThreadPoolExecutor समस्या

मैं के साथ नए ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) की कोशिश की है सभी संभावनाओं है कि मैं कतार के लिए के बारे में सोचा:

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

और कोई भी चाहता था के रूप में काम किया।

+0

आपका क्या मतलब है कि यह काम नहीं करता है? एक अपवाद? – Gandalf

+4

जो कि बहुत करीब है: Gandalf और Sarmun! – akf

+0

एंट्स यहां किसी भी मिनट होना चाहिए। । । और वह अजीब ईगल कहाँ है ... – Gandalf

उत्तर

5

शायद ऐसा कुछ आपके लिए काम करेगा? मैंने बस इसे चाबुक कर दिया ताकि कृपया उस पर पोक करें। असल में, यह एक अतिप्रवाह थ्रेड पूल कि खिलाने के लिए प्रयोग किया जाता है लागू करता है अंतर्निहित ThreadPoolExecutor

दो प्रमुख ड्रा पीठ मैं इसके साथ देख रहे हैं:

  • submit() पर एक लौटे भविष्य वस्तु की कमी है। लेकिन शायद यह आपके लिए कोई मुद्दा नहीं है।
  • नौकरियां जमा होने पर द्वितीयक कतार केवल ThreadPoolExecutor में खाली हो जाएगी। एक सुरुचिपूर्ण समाधान होना है, लेकिन मुझे अभी तक यह नहीं दिख रहा है। यदि आप जानते हैं कि StusMagicExecutor में कार्यों की एक स्थिर स्ट्रीम होगी तो यह कोई समस्या नहीं हो सकती है। ("मई" कुंजी शब्द है।) एक विकल्प हो सकता है कि आपके सबमिट किए गए कार्य StusMagicExecutor पर पूरा होने के बाद हो जाएं?

स्टू के मैजिक निर्वाहक:

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

ग्रेट, टीएनएक्स, यह वही है जो मैं चाहता था। रनटाइबल के साथ नए टास्क को लपेटने के साथ जो खत्म होने के बाद एक कार्य को पोक करता है, 20 में से केवल एक थ्रेड निष्क्रिय हो सकता है जबकि हमारे पास माध्यमिक कतार में कार्य होते हैं। – Sarmun

1

ThreadPoolExecutor के लिए javadocs स्पष्ट हैं कि एक बार corePoolSize धागे बनाए गए हैं, कतार पूर्ण होने के बाद ही नए थ्रेड बनाए जाएंगे। तो यदि आप core को 5 और max से 20 पर सेट करते हैं, तो आपको कभी भी वांछित व्यवहार नहीं मिलेगा।

हालांकि, यदि आप core और max से 20 दोनों सेट करते हैं, तो सभी 20 थ्रेड व्यस्त होने पर कार्य केवल कतार में जोड़े जाएंगे। बेशक, यह आपके "5 धागे न्यूनतम" की आवश्यकता को थोड़ा सा अर्थहीन बनाता है, क्योंकि सभी 20 जीवित रहेंगे (जब तक वे निष्क्रिय नहीं हो जाते हैं)।

+0

"idling out" कभी नहीं होगा, जब तक कि आप कहते हैं कि सभी कोर धागे मर सकते हैं। किसी भी तरह से, कोर और अधिकतम आकार दोनों के बिंदु को न देखें यदि इसे ठीक से उपयोग नहीं किया जा सकता है। क्या कोई अन्य वर्ग है (ThreadPoolExecutor के अलावा), जो मेरी आवश्यकताओं को पूरा करेगा? – Sarmun

1

मुझे लगता है कि इस समस्या को वर्ग के एक कमी है और बहुत निर्माता पैरामीटर संयोजन दिए गए भ्रामक है। यहां स्विंगवर्कर के आंतरिक थ्रेडपूलएक्सएटर से लिया गया एक समाधान है जिसे मैंने शीर्ष स्तर की कक्षा में बनाया है। इसमें न्यूनतम नहीं है लेकिन कम से कम ऊपरी बाउंड का उपयोग करता है। एकमात्र चीज जो मुझे नहीं पता वह लॉकिंग निष्पादन से आपको क्या प्रदर्शन मारा जाता है।

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
}