2009-10-28 11 views
23

मुझे अब दो बार एक समस्या का सामना करना पड़ा है जिससे निर्माता निर्माता थ्रेड एन कार्य आइटम उत्पन्न करता है, उन्हें ExecutorService पर सबमिट करता है और फिर सभी एन आइटम संसाधित होने तक प्रतीक्षा करने की आवश्यकता होती है।लचीला CountDownLatch?

चेतावनियां

  • एन अग्रिम में ज्ञात नहीं है। अगर ऐसा होता तो मैं बस CountDownLatch बनाउंगा और उसके बाद निर्माता थ्रेड await() होगा जब तक कि सभी काम पूरा नहीं हो जाते।
  • का उपयोग करते हुए एक CompletionService अनुचित है क्योंकि ऐसा होने पर मेरा निर्माता धागा ब्लॉक करने के लिए की जरूरत है (अर्थात take() फोन करके) संकेत सब काम पूरा है कि का कोई रास्ता नहीं है, इंतज़ार कर बंद करने के लिए निर्माता धागा पैदा करने के लिए।

मेरे वर्तमान इष्ट समाधान एक पूर्णांक काउंटर का उपयोग करना है, और वेतन वृद्धि को यह जब भी काम के एक आइटम प्रस्तुत और घटती है यह जब एक काम मद संसाधित किया जाता है। सभी एन कार्यों के सबमिशन के बाद मेरे निर्माता थ्रेड को लॉक पर इंतजार करना होगा, यह जांच कर लें कि counter == 0 चाहे इसे अधिसूचित किया गया हो। उपभोक्ता धागा (रों) अगर यह काउंटर कम कर दिया है और नया मान 0.

इस समस्या का एक बेहतर दृष्टिकोण है है या वहाँ java.util.concurrent में एक उपयुक्त निर्माण मैं नहीं बल्कि का उपयोग करना चाहिए है निर्माता को सूचित करने की आवश्यकता होगी "अपना खुद का रोलिंग" से?

अग्रिम धन्यवाद।

+0

निर्माता किस समय पता लगाता है कि कितने काम आइटम हैं? जब आखिरी वस्तु का उत्पादन किया गया है? –

+0

आपका वर्तमान समाधान रेस हालत से पीड़ित हो सकता है: आइटम 1 -> काउंटर ++ -> प्रक्रिया आइटम 1 -> काउंटर-- -> आइटम 2 का उत्पादन करें। चूंकि निर्माता ने अगले आइटम का उत्पादन करने से पहले काउंटर को कम कर दिया है, निर्माता सोचता है कि वह तैयार है। –

+0

@rwwilden: आप इस परिदृश्य में सही हो सकते हैं। हालांकि, मेरे निर्माता केवल * सभी * कार्य वस्तुओं को सबमिट करने के बाद काउंटर पर निरीक्षण/प्रतीक्षा करेंगे और इसलिए यह इस विशेष मामले में दौड़ की स्थिति का प्रतिनिधित्व नहीं करता है। – Adamski

उत्तर

24

java.util.concurrent.Phaser ऐसा लगता है कि यह आपके लिए अच्छा काम करेगा। यह जावा 7 में रिलीज होने की योजना है लेकिन सबसे स्थिर संस्करण jsr166 की रुचि समूह वेबसाइट पर पाया जा सकता है।

फ़ेज़र एक गौरवशाली चक्रीय बैरियर है। आप एन पार्टियों को पंजीकृत कर सकते हैं और जब आप तैयार हो जाते हैं तो विशिष्ट चरण में उनकी अग्रिम प्रतीक्षा करें।

कि यह कैसे काम करेगा पर एक त्वरित उदाहरण:

final Phaser phaser = new Phaser(); 

public Runnable getRunnable(){ 
    return new Runnable(){ 
     public void run(){ 
      ..do stuff... 
      phaser.arriveAndDeregister(); 
     } 
    }; 
} 
public void doWork(){ 
    phaser.register();//register self 
    for(int i=0 ; i < N; i++){ 
     phaser.register(); // register this task prior to execution 
     executor.submit(getRunnable()); 
    } 
    phaser.arriveAndAwaitAdvance(); 
} 
+3

कूल - धन्यवाद; ऐसा लगता है कि मैं वास्तव में क्या कर रहा हूं ... विश्वास नहीं कर सकता कि उन्होंने इसे फ़ेज़र कहा था। – Adamski

+2

इच्छा है कि मैं x100 को और अधिक बार बढ़ा सकता हूं। बहुत बहुत धन्यवाद। –

+2

यदि 'getRunnable()' का बिंदु उस कार्य का प्रतिनिधित्व करना है जो केवल फ़ेज़र को सिग्नल करता है और फिर समाप्त होता है, तो आप अन्यथा 'phaser.arriveAndDeregister() '** ** ** ** phaser.arrive()' को आमंत्रित करना चाहते हैं जब अभिभावक कार्य 'phaser.arriveAndAwaitAdvance()' को दूसरी बार डेडलॉक करेगा, क्योंकि कार्य चरणबद्ध कार्यों के लिए इंतजार कर रहा है जो अभी भी फ़ेज़र में पंजीकृत हैं। –

2

आप निश्चित रूप से एक CountDownLatch तो एक AtomicReference द्वारा संरक्षित इस्तेमाल कर सकते हैं कि अपने कार्यों को इस प्रकार लिपटे हो:

public class MyTask extends Runnable { 
    private final Runnable r; 
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } 

    public void run() { 
     r.run(); 
     while (l.get() == null) Thread.sleep(1000L); //handle Interrupted 
     l.get().countDown(); 
    } 
} 

सूचना कि कार्य अपने काम चलाने के लिए और उसके बाद स्पिन तक गिनती देने वाला है, सेट (यानी कार्यों की कुल संख्या पता है)। जैसे ही गिनती सेट हो जाती है, वे इसे नीचे गिनते हैं और बाहर निकलते हैं। ये इस प्रकार प्रस्तुत हो:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); 
executor.submit(new MyTask(r, l)); 

अपने काम, के निर्माण/प्रस्तुत करने की बात है जब आप जानते हैं कि कैसे कई कार्यों आप बनाया है के बाद:

latch.set(new CountDownLatch(nTasks)); 
latch.get().await(); 
+0

परमाणु संदर्भ के साथ CountDownLatch को लपेटना क्यों आपको इस मामले में मदद करता है? संदर्भ को संरक्षित करने की आवश्यकता नहीं है, क्योंकि यह अपने कन्स्ट्रक्टर में माईटास्क को पास कर दिया गया है, और इसके बाद कभी भी बदलाव नहीं होता है। – Avi

+0

इसके अलावा, अंत में {} ब्लॉक में कम से कम coundDown() डालकर रन() विधि में मनमाने ढंग से फेंकने योग्य (रनटाइम अपवाद और त्रुटियां) को संभालना सुनिश्चित करें। – Avi

+0

और निश्चित रूप से, प्रश्न विशेष रूप से इस मामले के बारे में था जब आप कार्य निर्माण के समय कार्यों की संख्या नहीं जानते थे। – Avi

0

मुझे लगता है अपने निर्माता करता है कतार खाली होने पर जानने की आवश्यकता नहीं है, लेकिन आखिरी कार्य पूरा होने पर यह जानने की जरूरत है।

मैं उपभोक्ता को waitforWorkDone(producer) विधि जोड़ दूंगा। निर्माता अपने एन कार्यों को जोड़ सकता है और प्रतीक्षा विधि को कॉल कर सकता है। प्रतीक्षा विधि आने वाली थ्रेड को अवरुद्ध करती है अगर काम कतार खाली नहीं है और इस समय कोई कार्य निष्पादित नहीं हो रहा है।

उपभोक्ता धागे notifyAll() वेटोर लॉक iff पर अपना कार्य पूरा हो गया है, कतार खाली है और कोई अन्य कार्य निष्पादित नहीं किया जा रहा है।

1

मैं कुछ इस तरह के लिए एक ExecutorCompletionService उपयोग किया है:

ExecutorCompletionService executor = ...; 
int count = 0; 
while (...) { 
    executor.submit(new Processor()); 
    count++; 
} 

//Now, pull the futures out of the queue: 
for (int i = 0; i < count; i++) { 
    executor.take().get(); 
} 

यह कार्य, प्रस्तुत किया गया है, इसलिए यदि आपकी सूची मनमाने ढंग से लंबी है की एक कतार रखने शामिल है, अपने विधि बेहतर हो सकता है।

लेकिन समन्वय के लिए AtomicInteger का उपयोग करना सुनिश्चित करें, ताकि आप इसे एक थ्रेड में बढ़ा सकें और कार्यकर्ता धागे में इसे कम कर सकें।

+0

आप 'उत्सुकता' का इंतजार कैसे कर सकते हैं? आपकी सेवा बंद कर रहा है क्या? साथ ही, आप अपने उत्तर में पूर्णता सेवा को फिर से उपयोग नहीं कर सकते हैं, क्योंकि आप ऐसे कार्यों पर इंतजार कर रहे हैं जो * किसी अन्य सेट * के तार्किक रूप से भाग थे। और (जैसा कि मेरे उत्तर में टिप्पणियों में उल्लिखित है), आपको अभी भी कुछ बिंदु –

+0

@oxbow_lakes पर 'nTasks' पता होना चाहिए: सत्य। यही कारण है कि मेरे पास केवल एक विकल्प के रूप में है (जिसे मैंने संपादित किया है, क्योंकि इससे इस मामले में मदद नहीं मिलती है)। मुझे लगता है कि इस समापन सेवा का उपयोग एक ही समय में (ओवरलैपिंग) कार्यों के किसी अन्य सेट के लिए नहीं किया जाएगा। इसका उपयोग उसी धागे से बाद में किया जा सकता है। – Avi

+0

परमाणु बूलियन का उपयोग करने के बजाय कार्यों की संख्या को ट्रैक रखने के इस तरीके का लाभ यह है कि आपको प्रतीक्षा() और सूचित() को मैन्युअल रूप से संभालना नहीं है - कतार उस पर ध्यान रखेगी। आपको बस कतार में वस्तुओं की संख्या ट्रैक करना है। – Avi

0

आप माना जाता है इस एक:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection)

यहाँ एक उदाहरण मैं ने लिखा है: https://github.com/moulaali/java_fun/blob/master/src/WaitForAllConcurrencyExample.java

+0

क्या आप और समझा सकते हैं? स्टैक ओवरफ़्लो के उत्तरों में लिंक किए गए स्रोतों से पर्याप्त सामग्री समझा जाना चाहिए। – VermillionAzure

0

क्या आप वर्णित एक मानक सेमफोर का उपयोग करने के समान है लेकिन 'पिछड़ा' इस्तेमाल किया जाता है।

  • आपका सेमाफोर 0 परमिट
  • प्रत्येक काम इकाई एक परमिट जारी करता है जब यह
  • आप ब्लॉक पूरा करता एन परमिट

प्राप्त करने के लिए इंतजार कर रहे द्वारा साथ शुरू होता है इसके अलावा आप लचीलापन के लिए एक ही एम प्राप्त है < एन परमिट जो उपयोगी है यदि आप मध्यवर्ती राज्य की जांच करना चाहते हैं। उदाहरण के लिए, मैं एक एसिंक्रोनस बाध्य संदेश कतार का परीक्षण कर रहा हूं, इसलिए मुझे उम्मीद है कि कतार कुछ एम < एन के लिए पूर्ण हो जाएगी, इसलिए मैं एम प्राप्त कर सकता हूं और जांच कर सकता हूं कि कतार वास्तव में पूर्ण है और शेष एन-एम परमिट प्राप्त करने के बाद शेष एन-एम परमिट प्राप्त करें कतार।

संबंधित मुद्दे