2015-07-22 6 views
6

में उपभोक्ता को निलंबित करें मेरे पास उत्पादक और उपभोक्ता BlockingQueue से जुड़ा हुआ है।निर्माता/उपभोक्ता पैटर्न

कतार से उपभोक्ता प्रतीक्षा रिकॉर्ड और इसे संसाधित:

Record r = mQueue.take(); 
process(r); 

मैं दूसरे धागे से थोड़ी देर के लिए इस प्रक्रिया को रोकने की आवश्यकता। इसे कैसे कार्यान्वित करें?

अब मैं इसे लागू इस तरह लगता है, लेकिन यह है बुरा समाधान की तरह दिखता है:

private Object mLock = new Object(); 
private boolean mLocked = false; 

public void lock() { 
    mLocked = true; 
} 

public void unlock() { 
    mLocked = false; 
    mLock.notify(); 

} 

public void run() { 
    .... 
      Record r = mQueue.take(); 
      if (mLocked) { 
       mLock.wait(); 
      } 
      process(r); 
} 
+1

क्या आप अपने विशिष्ट आवश्यकता पर थोड़ा अधिक संदर्भ प्रदान मन? कारण मैं पूछता हूं कि अगर मुझे वास्तव में श्रृंखला में किसी को रोकने की ज़रूरत है, तो मैं उपभोक्ता की बजाय निर्माता को रोक दूंगा। यह मूल रूप से रोकथाम के दौरान कतार क्षमता के बिना बढ़ने के समान प्रभाव प्राप्त करता है। – sstan

उत्तर

2

मुझे लगता है कि अपने समाधान सरल और सुरुचिपूर्ण है, और लगता है कि आप कुछ संशोधनों के साथ यह रखना चाहिए। मेरे द्वारा प्रस्तावित संशोधन synchronization हैं।

इसके बिना, थ्रेड हस्तक्षेप और स्मृति consistancy त्रुटियां (और अक्सर होता है) हो सकता है। उस पर, आप लॉक पर wait या notify नहीं कर सकते हैं (और यदि आप synchronized ब्लॉक के अंदर हैं तो आप इसका स्वामी हैं ..)। फिक्स आसान है, बस mLock ब्लॉक को सिंक्रनाइज़ करें जहां आप प्रतीक्षा/अधिसूचित करते हैं। साथ ही, जैसा कि आप mLocked को एक अलग थ्रेड से बदल रहे हैं, आप इसे volatile चिह्नित करना चाहते हैं।

private Object mLock = new Object(); 
private volatile boolean mLocked = false; 

public void lock() { 
    mLocked = true; 
} 

public void unlock() { 
    synchronized(mlock) { 
     mLocked = false; 
     mLock.notify(); 
    } 

} 

public void run() { 
    .... 
      Record r = mQueue.take(); 
      synchronized(mLock) { 
       while (mLocked) { 
        mLock.wait(); 
       } 
      } 
      process(r); 
} 
+0

मैंने एक ही कोड लिखा था। लेकिन फिर मैंने सोचा कि यदि आप प्रतीक्षा के साथ सिंक ब्लॉक में जाते हैं, तो मुझे सूचित करने के साथ सिंक ब्लॉक में नहीं जाना है। लेकिन मैं गलत था: http://stackoverflow.com/questions/13249835/java-does-wait-release-lock-from-synchronized- – HotIceCream

+2

अवरुद्ध करें (mLocked) जबकि (mLocked) को बेहतर स्थानांतरित किया जा सकता है। http://stackoverflow.com/a/2779565/1173794 – HotIceCream

+0

@HotIceCream हाँ, ध्यान नहीं दिया - संपादित किया गया। – ddmps

2

आप java.util.concurrent.locks.ConditionJava docs एक, जबकि एक ही शर्त के आधार पर के लिए रुकते हैं उपयोग कर सकते हैं।

यह दृष्टिकोण मेरे लिए साफ दिखता है और पुनर्वित्त लॉक तंत्र सिंक्रनाइज़ सिंक्रनाइज़ किए गए बेहतर थ्रूपुट है। से IBM article

एक बोनस के रूप अंश नीचे पढ़ें, ReentrantLock के कार्यान्वयन में कहीं अधिक स्केलेबल सिंक्रनाइज़ का वर्तमान क्रियान्वयन से विवाद चल रहा है। (यह संभावना है कि जेवीएम के भविष्य के संस्करण में सिंक्रनाइज़ किए गए contended प्रदर्शन में सुधार होगा। इसका मतलब है कि जब कई धागे सभी एक ही लॉक के लिए संघर्ष कर रहे हैं, तो कुल थ्रूपुट आमतौर पर जा रहा है सिंक्रनाइज़ किए गए से ReentrantLock के साथ बेहतर रहें।


BlockingQueue सबसे अच्छा निर्माता-उपभोक्त समस्या का समाधान के लिए जाना जाता है, और यह भी इंतजार कर के लिए Condition उपयोग करता है।

जावा डॉक के Condition से लिया गया उदाहरण नीचे देखें, जो निर्माता - उपभोक्ता पैटर्न का नमूना कार्यान्वयन है।

class BoundedBuffer { 
    final Lock lock = new ReentrantLock(); 
    final Condition notFull = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100]; 
    int putptr, takeptr, count; 

    public void put(Object x) throws InterruptedException { 
    lock.lock(); 
    try { 
     while (count == items.length) 
     notFull.await(); 
     items[putptr] = x; 
     if (++putptr == items.length) putptr = 0; 
     ++count; 
     notEmpty.signal(); 
    } finally { 
     lock.unlock(); 
    } 
    } 

    public Object take() throws InterruptedException { 
    lock.lock(); 
    try { 
     while (count == 0) 
     notEmpty.await(); 
     Object x = items[takeptr]; 
     if (++takeptr == items.length) takeptr = 0; 
     --count; 
     notFull.signal(); 
     return x; 
    } finally { 
     lock.unlock(); 
    } 
    } 
} 

अतिरिक्त पठन:

+3

श्री डाउनवॉटर डाउनवॉटर, क्या आप अपना नीचे वोट देने के लिए दर्द ले सकते हैं? – hagrawal

+0

मैन मुझे डाउनवॉट से नफरत है जब व्यक्ति वास्तव में एक अंतर्दृष्टिपूर्ण स्पष्टीकरण प्रदान करने के लिए कुछ समय लेता है। यदि यह समाधान आपके लिए काम नहीं करता है, तो ठीक है। लेकिन यह एक डाउनवोट के लायक नहीं है। मेरे लिए +1 अगर मुझे पहले एक आसान लगता है, तो दूसरा धागे और समरूपता पर अंतर्दृष्टि प्रदान करता है। –

+2

मेरा डाउनवोट नहीं है, लेकिन 10+ साल के आलेख के आधार पर प्रदर्शन निर्णय देने पर iffy लगता है। –

1

एक नई कक्षा बनाएं जो BlockingQueue के कार्यान्वयन को बढ़ाती है।दो नई विधियां pause() और unpause() जोड़ें। कहाँ की जरूरत है, (put() में केवल take() विधि में मेरे उदाहरण में, और नहीं) paused ध्वज पर विचार और अन्य blockingQueue2 का उपयोग प्रतीक्षा करने के लिए:

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 184661285402L; 

    private Object lock1 = new Object();//used in pause() and in take() 
    private Object lock2 = new Object();//used in pause() and unpause() 

    //@GuardedBy("lock") 
    private volatile boolean paused; 

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>(); 

    public void pause() { 
     if (!paused) { 
      synchronized (lock1) { 
      synchronized (lock2) { 
       if (!paused) { 
        paused = true; 
        blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty 
       } 
      } 
      } 
     } 
    } 

    public void unpause() throws InterruptedException { 
     if (paused) { 
      synchronized (lock2) { 
       paused = false; 
       blockingQueue2.put(new Object());//release waiting thread, if there is one 
      } 
     } 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E result = super.take(); 

     if (paused) { 
      synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting 
       if (paused) { 
        blockingQueue2.take(); 
       } 
      } 
     } 

     return result; 
    } 

    //TODO override similarly the poll() method. 
} 
संबंधित मुद्दे