13

के लिए अपवादों को संभालना मेरे पास निम्न कोड स्निपेट है जो मूल रूप से कार्य की सूची के माध्यम से स्कैन करता है जिसे निष्पादित करने की आवश्यकता होती है और प्रत्येक कार्य निष्पादक को निष्पादन के लिए दिया जाता है।ThreadPoolExecutor

JobExecutor बदले में एक और निष्पादक (डीबी सामान करने के लिए ... कतार में डेटा पढ़ने और लिखने के लिए) बनाता है और कार्य को पूरा करता है।

JobExecutor सबमिट किए गए कार्यों के लिए Future<Boolean> देता है। जब कोई कार्य विफल रहता है, तो मैं सभी धागे को गहराई से बाधित करना चाहता हूं और सभी अपवादों को पकड़कर निष्पादक को बंद करना चाहता हूं। मुझे क्या परिवर्तन करने की ज़रूरत है?

public class DataMovingClass { 
    private static final AtomicInteger uniqueId = new AtomicInteger(0); 

    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator(); 

    ThreadPoolExecutor threadPoolExecutor = null ; 

    private List<Source> sources = new ArrayList<Source>(); 

    private static class IDGenerator extends ThreadLocal<Integer> { 
     @Override 
     public Integer get() { 
      return uniqueId.incrementAndGet(); 
     } 
    } 

    public void init(){ 

    // load sources list 

    } 

    public boolean execute() { 

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10, 
       10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
       new ThreadFactory() { 
        public Thread newThread(Runnable r) { 
         Thread t = new Thread(r); 
         t.setName("DataMigration-" + uniqueNumber.get()); 
         return t; 
        }// End method 
       }, new ThreadPoolExecutor.CallerRunsPolicy()); 

    List<Future<Boolean>> result = new ArrayList<Future<Boolean>>(); 

    for (Source source : sources) { 
        result.add(threadPoolExecutor.submit(new JobExecutor(source))); 
    } 

    for (Future<Boolean> jobDone : result) { 
       try { 
        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { 
         // in case of successful DbWriterClass, we don't need to change 
         // it. 
         success = false; 
        } 
       } catch (Exception ex) { 
        // handle exceptions 
       } 
      } 

    } 

    public class JobExecutor implements Callable<Boolean> { 

     private ThreadPoolExecutor threadPoolExecutor ; 
     Source jobSource ; 
     public SourceJobExecutor(Source source) { 
      this.jobSource = source; 
      threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
        new ThreadFactory() { 
         public Thread newThread(Runnable r) { 
          Thread t = new Thread(r); 
          t.setName("Job Executor-" + uniqueNumber.get()); 
          return t; 
         }// End method 
        }, new ThreadPoolExecutor.CallerRunsPolicy()); 
     } 

     public Boolean call() throws Exception { 
      boolean status = true ; 
      System.out.println("Starting Job = " + jobSource.getName()); 
      try { 

         // do the specified task ; 


      }catch (InterruptedException intrEx) { 
       logger.warn("InterruptedException", intrEx); 
       status = false ; 
      } catch(Exception e) { 
       logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); 
       status = false ; 
      } 
      System.out.println("Ending Job = " + jobSource.getName()); 
      return status ; 
     } 
    } 
} 

उत्तर

14

जब आप निष्पादक के लिए कोई कार्य सबमिट करते हैं, तो यह आपको एक FutureTask उदाहरण देता है।

FutureTask.get()ExecutorException के रूप में कार्य द्वारा फेंक दिया गया कोई भी अपवाद फिर से फेंक देगा।

तो जब आप List<Future> के माध्यम से फिर से प्रयास करते हैं और प्रत्येक पर कॉल करते हैं, तो ExecutorException पकड़ें और व्यवस्थित शट डाउन करें।

+0

ठीक है .. क्या आपको कोई अन्य त्रुटियां या स्थान दिखाई देता है जहां मुझे अपवादों को संभालने की आवश्यकता होती है? – jagamot

1

सबक्लास ThreadPoolExecutor और इसकी protected afterExecute (Runnable r, Throwable t) विधि ओवरराइड करें।

यदि आप java.util.concurrent.Executors सुविधा वर्ग (जो आप नहीं हैं) के माध्यम से एक थ्रेड पूल बना रहे हैं, तो यह देखने के लिए अपने स्रोत को देखें कि यह ThreadPoolExecutor का आविष्कार कैसे कर रहा है।

0

चूंकि आप ThreadPoolExecutor पर कार्य सबमिट कर रहे हैं, अपवाद FutureTask द्वारा निगल रहे हैं।

} इस code

**Inside FutureTask$Sync** 

void innerRun() { 
    if (!compareAndSetState(READY, RUNNING)) 
     return; 

    runner = Thread.currentThread(); 
    if (getState() == RUNNING) { // recheck after setting thread 
     V result; 
     try { 
      result = callable.call(); 
     } catch (Throwable ex) { 
      setException(ex); 
      return; 
     } 
     set(result); 
    } else { 
     releaseShared(0); // cancel 
    } 

पर एक नज़र

protected void setException(Throwable t) { 
    sync.innerSetException(t); 
} 

कोड ऊपर से, यह स्पष्ट है कि setException विधि को पकड़ने Throwable है लो।कारण इस कारण के लिए, FutureTask अगर आप का उपयोग ThreadPoolExecutor

प्रति जावा documentation के रूप में पर "submit()" विधि सभी अपवादों को निगल जाता है, आप प्रति प्रलेखन के रूप में ThreadPoolExecutor

protected void afterExecute(Runnable r, 
          Throwable t) 

नमूना कोड में afterExecute() विधि का विस्तार कर सकते हैं:

class ExtendedExecutor extends ThreadPoolExecutor { 
    // ... 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (t == null && r instanceof Future<?>) { 
     try { 
     Object result = ((Future<?>) r).get(); 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
    } 
    if (t != null) 
     System.out.println(t); 
    } 
} 

आप तीन में Exceptions तरीके पकड़ कर सकते हैं

  1. Future.get() रूप

ऊपर दिखाए गए try{}catch{}Exceptoion{} ब्लॉक

  • ThreadPoolExecutor विधि का ओवरराइड afterExecute में
  • चादर पूरे run() या call() विधि स्वीकार किए जाते हैं जवाब में सुझाव के रूप में करने के लिए शान से अन्य धागे बाधा डालते हैं, नीचे पर एक नजर है एसई प्रश्न:

    How to stop next thread from running in a ScheduledThreadPoolExecutor

    How to forcefully shutdown java ExecutorService

  • संबंधित मुद्दे