2011-11-02 13 views
7

मैं चौड़ाई 10 की एक निश्चित थ्रेड पूल ExecutorService, और 100 Callable की की एक सूची है, प्रत्येक 20 सेकंड के लिए इंतज़ार कर रहे हैं और उनके बीच में आता है रिकॉर्डिंग की है।जावा ExecutorService invokeAll() में दखल

मैं उस सूची में invokeAll को एक अलग थ्रेड में कॉल कर रहा हूं, और लगभग तुरंत इस धागे को बाधित करता हूं। ExecutorService निष्पादन अपेक्षित के रूप में बाधित है, लेकिन Callable एस द्वारा दर्ज इंटरप्ट की वास्तविक संख्या 10 से लगभग 20-40 की अपेक्षा कहीं अधिक है। ऐसा क्यों है, अगर ExecutorService एक साथ 10 से अधिक धागे निष्पादित नहीं कर सकता है?

पूर्ण स्रोत:,

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

का उपयोग WinXP 32-बिट (आप इसे और अधिक है कि संगामिति की वजह से एक बार चलाने के लिए आवश्यकता हो सकती है) ओरेकल JRE 1.6.0_27 और JUnit4

+0

हम्म ... इसे एक मुख्य विधि के साथ एक प्रोग्राम में परिवर्तित करना, मुझे हमेशा 10 मिल रहा है ... (विंडोज़ पर जावा 7) –

+0

वही किया गया, 37 (1.6.0_27, विंडोज एक्सपी) मिला। परीक्षण करने के लिए जावा 7 नहीं है, क्या कोई पुष्टि कर सकता है? –

+0

मैं काम पर कोशिश करूंगा। शायद यह जावा 6 बग है ... –

उत्तर

4

मैं परिकल्पना से असहमत कि आपको केवल 10 इंटरप्ट प्राप्त करना चाहिए।

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

अनिवार्य रूप से, मेरे तर्क कोई गारंटी नहीं कि चौकीदार धागा पहले यह समय टुकड़ा उपज और ExecutorService के कार्यकर्ता धागे अधिक शुरू करने के लिए अनुमति देने के लिए है सभी 100 "वेटर" RunnableFuture उदाहरणों को रद्द करने की अनुमति दी जाएगी है कि वहाँ है वेटर कार्यों।

अद्यतन: दिखा कोड AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

से अंत में ब्लॉक जो f.cancel(true) होता है जब बाधा कार्य जो अभी चल रहा है करने के लिए प्रचारित किया जाएगा। जैसा कि आप देख सकते हैं, यह एक तंग लूप है, लेकिन इस बात की कोई गारंटी नहीं है कि लूप को निष्पादित करने वाला थ्रेड एक बार स्लाइस में Future के सभी उदाहरणों के माध्यम से फिर से सक्रिय हो जाएगा।

+0

तो आप कहते हैं, 'invokeAll() 'के बाधा से चलने वाले लोगों के बाधा से पहले सभी कतारबद्ध कार्यों को तुरंत रद्द करना नहीं है? मेरे लिए, यह कम से कम आश्चर्यजनक सिद्धांत के प्रत्यक्ष ब्रेक की तरह दिखता है। –

+1

सही। कार्यों को संसाधित करने वाले कार्यकर्ता थ्रेड को 'invokeAll() 'निष्पादित थ्रेड से अलग होते हैं। एक थ्रेड पर बाधा डालने से कोई अन्य थ्रेड बाधित नहीं होना चाहिए, इसलिए मुझे आश्चर्य नहीं हुआ कि आप अवसर पर कार्यकर्ता धागे के 10 से अधिक इंटरप्ट प्राप्त कर सकते हैं।जैसा कि मैंने पोस्ट किए गए एनोटेटेड कोड में उल्लेख किया है, एक बाधा केवल कार्यकर्ता थ्रेड को भेजी गई है जो कि 'भविष्य.cancel' विधि को पारित बूलियन तर्क के गुण के रूप में कार्य को संसाधित करती है। –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

आप इस ब्लॉक को जोड़ने से समान व्यवहार प्राप्त करने के लिए

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

चाहते हैं इससे पहले कि ThreadPool बीच में।

यह सभी प्रतीक्षा कतार को नई सूची में हटा देगा।

तो यह केवल थ्रेड चलाने में बाधा डालेगा।

संबंधित मुद्दे