2012-04-03 13 views
5

मैं धागे की श्रृंखला से परिणामों को संसाधित करने के लिए एक पूर्ण सेवा सेवा का उपयोग करना चाहता हूं क्योंकि वे पूरा कर चुके हैं। मैं एक पाश भविष्य जाने वाली वस्तुओं को प्रदान करता है के रूप में वे उपलब्ध हो जाते हैं लेने के लिए सेवा है, लेकिन मैं यह निर्धारित करने के लिए जब सभी धागे पूरा कर लिया है सबसे अच्छा तरीका है पता नहीं है (और इस प्रकार पाश बाहर निकलने के लिए):कैसे पता चलेगा कि एक पूरा सेवा कब समाप्त हो रही है?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadPoolExecutor; 

public class Bar { 

    final static int MAX_THREADS = 4; 
    final static int TOTAL_THREADS = 20; 

    public static void main(String[] args) throws Exception{ 

     final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); 
     final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool); 

     for (int i=0; i<TOTAL_THREADS; i++){ 
      service.submit(new MyCallable(i)); 
     } 

     int finished = 0; 
     Future<Integer> future = null; 
     do{ 
      future = service.take(); 
      int result = future.get(); 
      System.out.println(" took: " + result); 
      finished++;    

     }while(finished < TOTAL_THREADS); 

     System.out.println("Shutting down"); 
     threadPool.shutdown(); 
    } 


    public static class MyCallable implements Callable<Integer>{ 

     final int id; 

     public MyCallable(int id){ 
      this.id = id; 
      System.out.println("Submitting: " + id); 
     } 

     @Override 
     public Integer call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("finished: " + id); 
      return id; 
     } 
    } 
} 

मैंने ThreadPoolExecutor की स्थिति की जांच करने का प्रयास किया है, लेकिन मुझे पता है कि getCompletedTaskCount प्राप्त करें और GetTaskCount विधियां केवल अनुमान हैं और इन पर भरोसा नहीं किया जाना चाहिए। क्या यह सुनिश्चित करने का एक बेहतर तरीका है कि मैंने खुद को गिनने की तुलना में CompletionService से सभी वायदा पुनर्प्राप्त किए हैं?


संपादित करें: दोनों लिंक कि Nobeh प्रदान की, और this link का सुझाव है कि, प्रस्तुत कार्यों की संख्या की गणना फिर टेक बुला() कि कई बार, जाने के लिए रास्ता है। मुझे आश्चर्य है कि CompletionService या उसके निष्पादक से पूछने का कोई तरीका नहीं है कि वापस आने के लिए क्या छोड़ा गया है।

उत्तर

3

इन सवालों के जवाब आपको उत्तर देता है?

  • क्या आपके असीमित कार्य CompletionService पर सबमिट किए गए अन्य कार्यों को बनाते हैं?
  • service एकमात्र ऐसा ऑब्जेक्ट है जो आपके एप्लिकेशन में बनाए गए कार्यों को संभालने के लिए माना जाता है?

पर reference documentation आधार पर, एक उपभोक्ता/निर्माता दृष्टिकोण पर CompletionService कार्य करता है और एक आंतरिक Executor का लाभ लेता है। इसलिए, जब तक आप एक ही स्थान पर कार्यों का उत्पादन करते हैं और उन्हें किसी अन्य स्थान पर उपभोग करते हैं, CompletionService.take() बताएगा कि क्या कोई और परिणाम देने के लिए कोई परिणाम हैं।

मेरा मानना ​​है कि this question भी आपकी मदद करता है।

+0

धन्यवाद, nobeh। ऐसा लगता है कि वे थ्रेड गिनती पर भी लूप करते हैं, उनके लिए "int (int कार्यों हैंडल = 0; कार्यों हैंडल <सबमिट टास्क; कार्य हैंडल ++)" लूप। मैं बस सोच रहा था कि क्या समवर्ती एपीआई में कुछ और सूक्ष्म है जो मुझे उपयोग करना चाहिए। –

+1

एपीआई में उदाहरण पंक्ति में() n बार निष्पादित करने के समान दृष्टिकोण का उपयोग करता है। http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html –

+0

यदि एक्सेक्टर को कार्यों को सबमिट किए गए कार्यों की तुलना में किसी भिन्न थ्रेड में CompletionService से परिणाम लेते हैं, तो क्या यह थ्रेड सुरक्षित है? – raffian

3

http://www.javaspecialists.eu/archive/Issue214.html देखें कि आप जो खोज रहे हैं उसे करने के लिए निष्पादक कॉम्प्लेशन सेवा को विस्तारित करने के बारे में एक सभ्य सुझाव के लिए। मैंने आपकी सुविधा के लिए नीचे प्रासंगिक कोड चिपकाया है। लेखक इटरटेबल को लागू करने के लिए सेवा भी सुझाता है, जो मुझे लगता है कि एक अच्छा विचार होगा।

एफडब्ल्यूआईडब्ल्यू, मैं आपसे सहमत हूं कि यह वास्तव में मानक कार्यान्वयन का हिस्सा होना चाहिए, लेकिन हां, यह नहीं है।

import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class CountingCompletionService<V> extends ExecutorCompletionService<V> { 
    private final AtomicLong submittedTasks = new AtomicLong(); 
    private final AtomicLong completedTasks = new AtomicLong(); 

    public CountingCompletionService(Executor executor) { 
    super(executor); 
    } 

    public CountingCompletionService(
     Executor executor, BlockingQueue<Future<V>> queue) { 
    super(executor, queue); 
    } 

    public Future<V> submit(Callable<V> task) { 
    Future<V> future = super.submit(task); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> submit(Runnable task, V result) { 
    Future<V> future = super.submit(task, result); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> take() throws InterruptedException { 
    Future<V> future = super.take(); 
    completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll() { 
    Future<V> future = super.poll(); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll(long timeout, TimeUnit unit) 
     throws InterruptedException { 
    Future<V> future = super.poll(timeout, unit); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public long getNumberOfCompletedTasks() { 
    return completedTasks.get(); 
    } 

    public long getNumberOfSubmittedTasks() { 
    return submittedTasks.get(); 
    } 

    public boolean hasUncompletedTasks() { 
    return completedTasks.get() < submittedTasks.get(); 
    } 
} 
संबंधित मुद्दे

 संबंधित मुद्दे