2012-04-27 12 views
10

मुझे कतार सर्वर से डेटा मिल रहा है और मुझे इसे संसाधित करने और एक पावती भेजने की आवश्यकता है। कुछ इस तरह:यदि मैं बहुत अधिक डेटा पर काम करने की ज़रूरत है तो मैं ThreadPoolExecutor कमांड कैसे कर सकता हूं?

while (true) { 
    queueserver.get.data 
    ThreadPoolExecutor //send data to thread 
    queueserver.acknowledgement 

मैं पूरी तरह से समझ में नहीं आता क्या धागे में क्या होता है, लेकिन मैं इस कार्यक्रम डेटा, तुरंत हो जाता है यह धागा भेजता है और फिर इसे स्वीकार करता है लगता है। तो यहां तक ​​कि यदि मेरे पास प्रत्येक कतार की सीमा है तो केवल 200 अनजान आइटम हो सकते हैं, यह केवल उतना तेज़ हो जाएगा जितना इसे प्राप्त हो सकता है। यह एक अच्छा है जब मैं एक सर्वर पर एक प्रोग्राम लिखता हूं, लेकिन यदि मैं एकाधिक श्रमिकों का उपयोग कर रहा हूं तो यह एक मुद्दा बन जाता है क्योंकि थ्रेड कतार में वस्तुओं की मात्रा उसके काम के प्रतिबिंब नहीं है बल्कि यह कितनी तेज़ है कतार सर्वर से आइटम प्राप्त कर सकते हैं।

क्या थ्रेड कतार काम से भरा हुआ है, तो प्रोग्राम को प्रतीक्षा करने के लिए मैं कुछ भी कर सकता हूं?

उत्तर

20

यदि मैं बहुत अधिक डेटा पर काम करने की ज़रूरत है तो मैं थ्रेडपूलएक्सटेक्टर कमांड कैसे कर सकता हूं?

मुझे 100% यकीन नहीं है कि मैं आपकी समस्या को समझ रहा हूं।

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200); 

बजाय डिफ़ॉल्ट ExecutorService रों बनाया का उपयोग कर Executors है, जो एक असीम का उपयोग उपयोग करने का एक ExecutorService को प्रस्तुत नौकरियों के संदर्भ में,: निश्चित रूप से एक खुला समाप्त कतार के बजाय, आप एक BlockingQueue उस पर एक सीमा के साथ उपयोग कर सकते हैं कतार, आप अपने खुद बना सकते हैं:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
       new ArrayBlockingQueue<Runnable>(200)); 

एक बार कतार भरता, यह किसी भी नए कार्य है कि प्रस्तुत कर रहे हैं अस्वीकार करने के लिए कारण होगा। आपको RejectedExecutionHandler सेट करना होगा जो कतार में सबमिट हो। कुछ की तरह:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 
      0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will throw an exception 
// when you submit the 201st job, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full 
     executor.getQueue().put(r); 
    } 
}); 

मुझे लगता है कि यह एक बड़ा याद आती है कि जावा एक ThreadPoolExecutor.CallerBlocksPolicy नहीं है है।

+0

धन्यवाद ग्रे। बस स्पष्ट करने के लिए इसका मतलब है कि अगर मैंने कतार में 200 आइटम रखे हैं, तो यह तब तक अनुमति नहीं देगा जब तक कि उस कतार से काम कम नहीं हो जाता? और अगर ऐसा होता है तो यह उस आदेश पर इंतजार कर रहा है जिसका उपयोग मैं थ्रेड में सबमिट करने के लिए कर रहा हूं जब तक कि ब्लॉकिंग कतार में भेजने के लिए कोई कमरा न हो? –

+0

यदि आप 'BlockingQueue' @learningJava पर 'put()' का उपयोग करते हैं, तो हाँ, कतार पूर्ण होने पर यह अवरुद्ध हो जाएगा और फिर कतार आकार घटने पर जारी रहेगा। – Gray

+0

+1: एक अन्य विकल्प है कि अस्वीकृत एक्सेक्यूशन हैंडलर इसके बजाय कार्य चलाएं। i.e. 'r.run();' –

2

यदि आप कार्यकर्ता पर कार्य शुरू करने पर पावती चाहते हैं, तो आप एक कस्टम ThreadFactory कर सकते हैं जो वास्तविक कार्य करने से पहले थ्रेड से पावती भेजता है। या आप ThreadPoolExecutor के beforeExecute को ओवरराइड कर सकते हैं।

आप पावती जब एक नया कार्यकर्ता एक नया कार्य के लिए उपलब्ध हो जाती है चाहते हैं, मुझे लगता है कि आप एक SynchronousQueue और एक ThreadPoolExecutor.CallerRunsPolicy के साथ एक ThreadPoolExecutor प्रारंभ कर सकते हैं, या अपने स्वयं नीति जहां फोन करने वाले ब्लॉक के साथ।

0

पहले, मुझे लगता है कि आपका रवैया गलत है, क्योंकि आप अपने छद्म कोड में किया था व्यस्त प्रतीक्षा है, तो आप toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/

जावा से संगामिति ट्यूटोरियल के माध्यम से पढ़ना चाहिए व्यस्त के साथ एक समाधान अनदेखी कि, बीमार प्रस्ताव आप इंतजार (जो recommanded नहीं है):

 ExecutorService e1 = Executors.newFixedThreadPool(20); 
    while (true) { 
      if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll()); 
      if (!myq.isEmpty()) e1.execute(myq.poll()); 
    } 

नोट:

1.Make सुनिश्चित करें कि आपके MYQ, सिंक्रनाइज़ है के रूप में अन्य उत्तर में कहा। सिंक्रनाइज़ेशन सही है यह सुनिश्चित करने के लिए आप कुछ अवरुद्ध कतार का विस्तार कर सकते हैं।

2।आप एक रननेबल क्लास को कार्यान्वित करते हैं जो कि सर्वर से सेवा में सर्वर से निकलता है, उन रननेबल को कन्स्ट्रक्टर को पैरामीटर के रूप में myq प्राप्त करना होगा और इसे वैश्विक चर के रूप में सहेजना होगा।

3.myq रननेबल प्राप्त करता है, कि इसकी रन विधि के अंत में, आपको यह सुनिश्चित करना होगा कि रनने योग्य myq से हटा दें।

+0

ओपी का समय (सत्य) 'इंतजार में व्यस्त नहीं था। आपका 'जबकि (सत्य)' ** ** व्यस्त प्रतीक्षा करने की कोशिश कर रहा है और समस्या का समाधान नहीं करता है, जो कि निष्पादक की डिफ़ॉल्ट कतार असंबद्ध है, इसलिए यह कितनी पूर्ण हो जाती है, इस बारे में स्वतंत्र रूप से नई नौकरियों को स्वीकार करना जारी रखेगी। – trutheality

+0

myp उन लोगों को होना चाहिए जो बाध्य हैं, आप कोड और नोट्स में लिखे गए शब्दों पर ध्यान नहीं दे रहे हैं –

0

ब्लॉकिंगपूल होने के बारे में क्या है जो 200 से अधिक कार्यों को निष्पादित नहीं करेगा और 201 कार्य सबमिट करने से पहले कार्य पूरा करने की प्रतीक्षा करेगा। मैंने इसे अपने आवेदन में सेमफोर का उपयोग करके हासिल किया है। आप अपने कन्स्ट्रक्टर को मान पास करके सीमा भी बदल सकते हैं।

@Gray उत्तर से केवल अंतर यह है कि शायद ही कभी इस मामले में कोई भी कार्य अस्वीकार कर दिया जाएगा। सेमफोर किसी भी कार्य को तब तक इंतजार कर देगा जब तक कि कोई अन्य कार्य खत्म न हो जाए। फिर भी, हमने किसी भी अस्वीकृति के मामले में निष्पादक को उस कार्य को दोबारा सबमिट करने के लिए अस्वीकार करने वाले हैंडलर को अस्वीकार कर दिया है।

private class BlockingPool extends ThreadPoolExecutor { 
    private final Semaphore semaphore;  
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){  
     super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() { 
      @Override 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
       executor.execute(r); 
      } 
     }); 
     semaphore = new Semaphore(tasksAllowedInThreads); 
    } 

    @Override 
    public void execute(Runnable task){ 
     boolean acquired = false; 
     do{ 
      try{ 
       semaphore.acquire(); 
       acquired = true; 
      } catch (final InterruptedException e){ 
       // log 
      } 
     } while (!acquired); // run in loop to handle InterruptedException 
     try{ 
      super.execute(task); 
     } catch (final RejectedExecutionException e){ 
      System.out.println("Task Rejected"); 
      semaphore.release(); 
      throw e; 
     } 
    }  

    @Override 
    protected void afterExecute(Runnable r, Throwable t){ 
     super.afterExecute(r, t); 
     if (t != null){ 
      t.printStackTrace(); 
     } 
     semaphore.release(); 
    } 
} 

क्या यह समझ में आता है!

संबंधित मुद्दे

 संबंधित मुद्दे