2009-02-10 6 views
11

मेरे पास एक विषय है जो ग्राहकों को Subscribe(Observer*) और Unsubscribe(Observer*) प्रदान करता है। विषय अपने स्वयं के धागे में चलता है (जिसमें से यह सब्सक्राइब किए गए पर्यवेक्षकों पर Notify() पर कॉल करता है) और एक म्यूटेक्स पर्यवेक्षकों की अपनी आंतरिक सूची की सुरक्षा करता है।मैं multithreaded C++ में पर्यवेक्षक संबंध कैसे फाड़ सकता हूं?

मुझे क्लाइंट कोड चाहिए - जिसे मैं नियंत्रित नहीं करता - सदस्यता के बाद पर्यवेक्षक को सुरक्षित रूप से हटाने में सक्षम होने के लिए। यह कैसे हासिल किया जा सकता है?

  • म्युटेक्स होल्डिंग - यहां तक ​​कि एक पुनरावर्ती म्युटेक्स - जब मैं सूचित पर्यवेक्षकों क्योंकि गतिरोध जोखिम के एक विकल्प नहीं है।
  • मैं सदस्यता रद्द करने में हटाने के लिए पर्यवेक्षक को चिह्नित कर सकता हूं और विषय धागे से हटा सकता हूं। फिर ग्राहक विशेष 'सुरक्षित करने के लिए सुरक्षित' अधिसूचना के लिए प्रतीक्षा कर सकते हैं। यह सुरक्षित दिखता है, लेकिन ग्राहकों के लिए अतिसंवेदनशील है।

संपादित

कुछ उदाहरण कोड इस प्रकार है। समस्या यह है कि जब सदस्यता 'यहां समस्या' टिप्पणी पर है, तो सदस्यता समाप्त होने से रोकने के लिए कैसे करें। फिर मैं एक हटाए गए ऑब्जेक्ट पर वापस कॉल कर सकता था। वैकल्पिक रूप से, अगर मैं प्रतिलिपि बनाने के बजाए म्यूटेक्स को पकड़ता हूं, तो मैं कुछ ग्राहकों को डेडलॉक कर सकता हूं।

#include <set> 
#include <functional> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

using namespace std; 
using namespace boost; 

class Observer 
{ 
public: 
    void Notify() {} 
}; 

class Subject 
{ 
public: 
    Subject() : t(bind(&Subject::Run, this)) 
    { 
    } 

    void Subscribe(Observer* o) 
    { 
     mutex::scoped_lock l(m); 
     observers.insert(o); 
    } 

    void Unsubscribe(Observer* o) 
    { 
     mutex::scoped_lock l(m); 
     observers.erase(o); 
    } 

    void Run() 
    { 
     for (;;) 
     { 
      WaitForSomethingInterestingToHappen(); 
      set<Observer*> notifyList; 
      { 
       mutex::scoped_lock l(m); 
       notifyList = observers; 
      } 
      // Problem here 
      for_each(notifyList.begin(), notifyList.end(), 
        mem_fun(&Observer::Notify)); 
     } 
    } 

private: 
    set<Observer*> observers; 
    thread t; 
    mutex m; 
}; 

संपादित

मैं क्योंकि गतिरोध जोखिम के म्युटेक्स दबाते हुए पर्यवेक्षकों को सूचित नहीं कर सकते। यह सबसे स्पष्ट तरीका हो सकता है - क्लाइंट कॉल नोटिफ़ाई के अंदर सदस्यता लें या सदस्यता छोड़ें - म्यूटेक्स रिकर्सिव बनाकर आसानी से उपचार किया जाता है। अधिक थकाऊ विभिन्न धागे पर अस्थायी डेडलॉक का खतरा है।

मैं एक बहुप्रचारित वातावरण में हूं, इसलिए थ्रेड के निष्पादन में किसी भी बिंदु पर, यह आमतौर पर ताले एल 1, एल 2, ... एलएन का अनुक्रम आयोजित करेगा। एक और धागा ताले K1, K2, ... Km होगा। एक उचित लिखित ग्राहक यह सुनिश्चित करेगा कि अलग-अलग धागे हमेशा एक ही क्रम में ताले हासिल करेंगे। लेकिन जब ग्राहक मेरे विषय के म्यूटेक्स से बातचीत करते हैं - इसे एक्स कहते हैं - यह रणनीति टूटी जाएगी: ऑर्डर करने के लिए कॉल करें/सदस्यता रद्द करें एल 1, एल 2, ... एलएन, एक्स। मेरे विषय धागे से अधिसूचित करने के लिए कॉल लॉक प्राप्त करें ऑर्डर एक्स, के 1, के 2, ... किमी। यदि ली या केजे में से कोई भी कॉल कॉल को कम कर सकता है, तो क्लाइंट को डिबग करने की थोड़ी संभावना के साथ, एक अस्थायी डेडलॉक पीड़ित है। चूंकि मैं क्लाइंट कोड को नियंत्रित नहीं करता, इसलिए मैं ऐसा नहीं कर सकता।

+0

मुझे कुछ याद आ रही है। जब कोई विशिष्ट पर्यवेक्षक किसी अन्य पर्यवेक्षक की सदस्यता लेता है?विशिष्ट पर्यवेक्षक केवल स्वयं को सदस्यता छोड़ना चाहिए। अधिसूचना सूची में केवल एक ही पर्यवेक्षक हो सकता है, तो पहले से ही निकाला जाने वाला अधिसूचना कैसे पुरानी हो सकती है? – jmucchiello

+0

ग्राहक किसी भी समय एक पर्यवेक्षक को सदस्यता छोड़ सकता है, न केवल अधिसूचना के अंदर। यह इस सामान्य मामले में है कि हमें विषय धागे के खिलाफ दौड़ मिलती है। – fizzer

+0

@ फिज़ज़र: यह अस्पष्ट है कि आपने मेमोरी सुरक्षा कैसे कार्यान्वित की है। आप कैसे 'ऑब्जर्वर *' में 'पर्यवेक्षक' के सभी उदाहरण 'पर्यवेक्षक' के वैध उदाहरण की गारंटी देते हैं? –

उत्तर

1

'आदर्श' समाधान shared_ptr और weak_ptr का उपयोग कर शामिल होगा। हालांकि, जेनेरिक होने के लिए, इसे Subject के मुद्दे के लिए भी Observer (हाँ, यह भी हो सकता है) से पहले गिरा दिया जाना चाहिए।

class Subject { 
public: 
    void Subscribe(std::weak_ptr<Observer> o); 
    void Unsubscribe(std::weak_ptr<Observer> o); 

private: 
    std::mutex mutex; 
    std::set< std::weak_ptr<Observer> > observers; 
}; 

class Observer: boost::noncopyable { 
public: 
    ~Observer(); 

    void Notify(); 

private: 
    std::mutex; 
    std::weak_ptr<Subject> subject; 
}; 
इस संरचना के साथ

, हम एक चक्रीय ग्राफ बनाने के लिए, लेकिन weak_ptr के विवेकपूर्ण उपयोग ताकि दोनों Observer और Subject समन्वय के बिना नष्ट किया जा सकता है।

नोट: मैं सादगी के लिए मान लिया है, कि एक Observer का मानना ​​है एक समय में एक ही Subject, लेकिन यह आसानी से अनेक विषयों का निरीक्षण कर सकते हैं।


अब, ऐसा लगता है कि आप असुरक्षित स्मृति प्रबंधन के साथ फंस रहे हैं। यह एक कठिन परिस्थिति है, जैसा कि आप कल्पना कर सकते हैं। इस मामले में, मैं एक प्रयोग का सुझाव दूंगा: एक असीमित Unsubscribe। या कम से कम, Unsubscribe पर कॉल बाहर से तुल्यकालिक होगा, लेकिन असीमित रूप से कार्यान्वित किया जाएगा।

विचार सरल है: हम सिंक्रनाइज़ेशन प्राप्त करने के लिए ईवेंट कतार का उपयोग करेंगे। यही कारण है: कतार (पेलोड Observer*) में एक घटना

  • कॉल Unsubscribe करने के लिए पोस्ट और फिर इंतजार कर रहा है
  • जब Subject धागा Unsubscribe घटना (रों) संसाधित हो जाए, यह प्रतीक्षा धागा जाग (रों)

आप या तो व्यस्त प्रतीक्षा या एक शर्त चर का उपयोग कर सकते हैं, मैं एक शर्त चर की सलाह दूंगा जब तक प्रदर्शन अन्यथा निर्देशित न हो।

नोट: यह समाधान पूरी तरह से Subject के लिए समय पर मरने में विफल रहता है।

+0

मैं किसी ईवेंट कतार को नहीं मानना ​​चाहता हूं, या ग्राहकों पर विशेष स्वामित्व अर्थशास्त्र लागू नहीं करना चाहता हूं, लेकिन मुझे यह विचार पसंद है कि सदस्यता छोड़ें() सदस्यता को थ्रेड को अधिसूचना के लिए छोड़ने के लिए प्रतीक्षा करता है। – fizzer

3

क्या आप सदस्यता() की सदस्यता रद्द कर सकते हैं() एक सदस्यता छोड़ें()? पर्यवेक्षक * को कुछ साझा किया गया जैसे shared_ptr < पर्यवेक्षक > चीजों को आसान बना देगा।

संपादित करें: उपरोक्त "आसान" द्वारा "आसान" प्रतिस्थापित किया गया। उदाहरण के लिए "सही होने" के लिए मुश्किल कैसे है, Boost.Signals और adopted -but-not-yet-in-the-वितरण Boost.Signals2 (पूर्व में Boost.ThreadSafeSignals) पुस्तकालयों का इतिहास देखें।

+0

मैंने इस बारे में सोचा - मैं विषय में कमजोर_प्टर भी पकड़ सकता हूं, और ग्राहकों को सदस्यता समाप्त करने की भी आवश्यकता नहीं होगी। दुर्भाग्य से, मैं इस इंटरफ़ेस को नहीं बदल सकता, लेकिन मैं निश्चित रूप से भविष्य में ऐसा करूँगा। – fizzer

+0

मैंने साझा_ptr और weak_ptr का उपयोग करके चीजों को बहुत कुछ बनाया है और वे अच्छी तरह से काम करते हैं। बहुत बुरा आप एपीआई नहीं बदल सकते हैं। – timday

+0

हालांकि यह सिंक्रनाइज़ेशन समस्या को सुलझाने के बिना "कौन हटता है" संसाधन समस्या हल करता है, फिर भी आप उस स्थिति में भाग ले सकते हैं जहां एक पर्यवेक्षक पर एक अधिसूचना होती है जिसे पहले से ही सदस्यता समाप्त कर दिया गया हो - शायद यह कोई समस्या नहीं है। –

1

एमएमएम ... मैं वास्तव में आपके प्रश्न को नहीं समझता, क्योंकि यदि कोई ग्राहक सदस्यता छोड़ता है तो आपको क्लाइंट को इसे हटाने में सक्षम होना चाहिए (यह आपके द्वारा उपयोग नहीं किया जाता है)।हालांकि, यदि किसी कारण से क्लाइंट पर्यवेक्षक की सदस्यता समाप्त करने के बाद आप रिश्ते को बंद नहीं कर सकते हैं, तो आप पर्यवेक्षक को सुरक्षित रूप से हटाने के लिए "विषय" को एक नया ऑपरेशन जोड़ सकते हैं, या केवल ग्राहकों के लिए यह संकेत दे सकते हैं कि वे किसी पर्यवेक्षक में रुचि नहीं रखते हैं ।

रीथिंक संपादित करें: ठीक है, अब मुझे लगता है कि मैं समझता हूं कि आपकी समस्या क्या है। मुझे लगता है कि आपकी समस्या का सबसे अच्छा समाधान निम्न कार्य कर रहा है:

  1. प्रत्येक संग्रहीत पर्यवेक्षक तत्व को "मान्य" ध्वज रखने के लिए रखें। इस ध्वज का उपयोग अधिसूचना लूप में होने पर इसे सूचित करने के लिए किया जाएगा या नहीं।
  2. आपको उस "वैध" ध्वज तक पहुंच की सुरक्षा के लिए एक म्यूटेक्स की आवश्यकता है। फिर, सदस्यता रद्द करने के ऑपरेशन "वैध" ध्वज के लिए mutex को लॉक करता है, इसे चयनित पर्यवेक्षक के लिए गलत पर सेट करता है।
  3. अधिसूचना पाश को वैध ध्वज के म्यूटेक्स को लॉक और अनलॉक करना होगा, और केवल "मान्य" पर्यवेक्षकों पर कार्य करना होगा।

यह देखते हुए कि सदस्यता रद्द करने का ऑपरेशन वैध ध्वज को रीसेट करने के लिए म्यूटेक्स पर अवरुद्ध करेगा (और वह विशेष पर्यवेक्षक आपके धागे में और भी उपयोग नहीं किया जाएगा), कोड थ्रेड सुरक्षित है, और ग्राहक किसी भी को हटा सकते हैं जैसे ही सदस्यता समाप्त हो गई है पर्यवेक्षक वापस आ गए हैं।

1

क्या ऐसा कुछ संतोषजनक होगा? यद्यपि अधिसूचित होने पर भी पर्यवेक्षक की सदस्यता समाप्त करना सुरक्षित नहीं है, इसके लिए आपको बताए गए इंटरफ़ेस की आवश्यकता होगी (जहां तक ​​मैं कह सकता हूं)। (ग्राहक पर IMO एक बुरा डिजाइन निर्णय ...) आप अपने डेटा में सूचक धागे की धागा आईडी जोड़ सकते हैं -

Subscribe(Observer *x) 
{ 
    mutex.lock(); 
    // add x to the list 
    mutex.unlock(); 
} 

Unsubscribe(Observer *x) 
{ 
    mutex.lock(); 
    while (!ok_to_delete) 
     cond.wait(mutex); 
    // remove x from list 
    mutex.unlock(); 
} 

NotifyLoop() 
{ 
    while (true) { 
     // wait for something to trigger a notify 

     mutex.lock(); 
     ok_to_delete = false; 
     // build a list of observers to notify 
     mutex.unlock(); 

     // notify all observers from the list saved earlier 

     mutex.lock(); 
     ok_to_delete = true; 
     cond.notify_all(); 
     mutex.unlock(); 
    } 
} 

आप (सदस्यता रद्द करें करने में सक्षम होना चाहते हैं) के अंदर सूचित करें() संरचना। सदस्यता रद्द करने के फ़ंक्शन में आप उस थ्रेड आईडी को वर्तमान थ्रेड की आईडी के विरुद्ध देख सकते हैं (अधिकांश थ्रेडिंग लाइब्रेरी इसे प्रदान करते हैं - उदाहरण के लिए। Pthread_self)। यदि वे वही हैं, तो आप शर्त चर पर प्रतीक्षा किए बिना आगे बढ़ सकते हैं।

नोट: यदि ग्राहक पर्यवेक्षक को हटाने के लिए ज़िम्मेदार है, तो इसका मतलब है कि आप उस स्थिति में भाग लेते हैं जहां अधिसूचना कॉलबैक के अंदर, आपने सदस्यता रद्द कर दी होगी और पर्यवेक्षक को हटा दिया होगा, लेकिन अभी भी इस पॉइंटर के साथ कुछ निष्पादित कर रहे हैं। यह कुछ ऐसा है जो ग्राहक को सूचित करना होगा और केवल अधिसूचना() के अंत में इसे हटाना होगा।

+0

लेकिन अगर कोई पर्यवेक्षक अनसब्सक्राइब() अपने नोटिफ़िक कॉलबैक से स्वयं है तो वह डेडलॉक नहीं होगा? –

+0

+1 क्योंकि यह समस्या के क्रूक्स को संबोधित करता है। मैं चाहता हूं कि क्लाइंट कॉलबैक से सब्सक्राइब करने में सक्षम हों, हालांकि, मुझे थोड़ी और सोच की आवश्यकता होगी। यह डरावना है कि इस समस्या के लिए एक प्रसिद्ध मुहावरे नहीं है। मैं हर दिन इस पैटर्न को हमेशा टूटा हुआ देखता हूं। – fizzer

+0

फिज़ज़र - कॉलबैक से सदस्यता लेना ठीक है। सब्सक्राइब करते समय आपको कंडीशन वैरिएबल पर कभी भी इंतजार करने की आवश्यकता नहीं है, केवल म्यूटेक्स (जो स्पष्ट रूप से कॉल करने से पहले रिलीज़ हो जाता है()। यह केवल सदस्यता छोड़ रहा है कि समस्या है। –

1

ग्राहकों को "सुरक्षित करने के लिए" अधिसूचना प्राप्त करने के बजाय, उन्हें एक सदस्यता (पर्यवेक्षक *) विधि प्रदान करें। क्लाइंट कोड तब बन जाता है:

subject.Unsubscribe(obsever);l 
while(subject.IsSubscribed(observer)) { 
    sleep_some_short_time; // OS specific sleep stuff 
} 
delete observer; 

जो बहुत कठिन नहीं है।

+0

यदि सदस्यता है() बस पर्यवेक्षकों के सेट में दिखता है, यह अभी भी टूटा हुआ है। पूरी चीज पूरी हो सकती है जबकि विषय धागा मैंने पोस्ट किए गए उदाहरण कोड में 'समस्या यहां' टिप्पणी पर सोया है। – fizzer

7

सदस्यता छोड़ें() सिंक्रोनस होना चाहिए, ताकि यह तब तक वापस न आए जब तक पर्यवेक्षक की विषय की सूची में नहीं होने की गारंटी दी जाती है। इसे सुरक्षित रूप से करने का यही एकमात्र तरीका है।

ईटीए (उत्तर करने के लिए मेरी टिप्पणी चलती):

के बाद से समय, एक मुद्दा हो लेते हैं और प्रत्येक पर्यवेक्षक को अधिसूचित करने के बीच म्युटेक्स जारी करने के लिए प्रतीत नहीं होता। आप जिस तरह से हैं, उसके लिए आप इसका उपयोग करने में सक्षम नहीं होंगे, और आपको यह सुनिश्चित करने के लिए इटेटरेटर को जांचना होगा कि यह अभी भी वैध है।

for (...) 
{ 
    take mutex 
    check iterator validity 
    notify 
    release mutex 
} 

वह वही करेगा जो आप चाहते हैं।

+0

मुझे वह समाधान पसंद है। यह अच्छा रहेगा. – MarkR

+0

बहुत अच्छा। अगर केवल इसे कोड करने का कोई तरीका था ... – fizzer

+0

चूंकि समय कोई समस्या नहीं प्रतीत होता है, इसलिए प्रत्येक पर्यवेक्षक को सूचित करने के बीच म्यूटेक्स को ले जाएं और छोड़ दें। –

1

आप CSubject प्रकार में "टू-डिलीट कतार" बना सकते हैं।जब आप पर्यवेक्षक को हटाते हैं, तो आप pSubject-> QueueForDelete (PObserver) को कॉल कर सकते हैं। फिर जब विषय धागा अधिसूचनाओं के बीच होता है, तो यह कतार से पर्यवेक्षकों को सुरक्षित रूप से हटा सकता है। एक map को observers कुंजी Observer* और मूल्य Observer का एक आवरण के साथ

class Subject { 
public: 
Subject() : t(bind(&Subject::Run, this)),m_key(0) { } 
void Subscribe(Observer* o) { 
    mutex::scoped_lock l(m); 
    InternalObserver io(o); 
    boost::shared_ptr<InternalObserver> sp(&io); 
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp)); 
} 

void Unsubscribe(Observer* o) { 
    mutex::scoped_lock l(m); 
    observers.find(MakeKey(o))->second->exists = false; } 

void WaitForSomethingInterestingToHappen() {} 
void Run() 
{ 
    for (;;) 
    { 
     WaitForSomethingInterestingToHappen(); 
     for(unsigned int i = 0; i < observers.size(); ++ i) 
     { 
      mutex::scoped_lock l(m); 
      if(observers[i]->exists) 
      { 
       mem_fun(&Observer::Notify);//needs changing 
      } 
      else 
      { 
       observers.erase(i); 
       --i; 
      } 
     } 
    } 
} 
private: 

int MakeKey(Observer* o) { 
    return ++m_key;//needs changeing, sha of the object? 
} 
class InternalObserver { 
public: 
    InternalObserver(Observer* o) : m_o(o), exists(true) {} 
    Observer* m_o; 
    bool exists; 
}; 

map< int, boost::shared_ptr<InternalObserver> > observers; 
thread t; 
mutex m; 
int m_key; 
}; 
0

मैं यह बहुत ही सुंदर ढंग से काम कर देता है, तो नहीं लगता। Observer मान्य है या नहीं, यह इंगित करने के लिए रैपर में volatile बूलियन शामिल है। subscribe विधि में, रैपर ऑब्जेक्ट मान्य स्थिति में बनाया गया है। unsubscribe विधि में, रैपर को अमान्य के रूप में चिह्नित किया गया है। Notify को रैपर पर वास्तविक पर्यवेक्षक के बजाय पर बुलाया जाता है। आवरण वास्तविक ऑब्जर्वर पर Notify फोन करेगा अगर यह वैध (अभी भी इसके सदस्य) है

#include <map> 
#include <functional> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

using namespace std; 
using namespace boost; 

class Observer 
{ 
public: 
    void Notify() {} 
}; 

class ObserverWrapper : public Observer 
{ 
public: 
    Observer* wrappee; 
    volatile bool valid; 
    ObserverWrapper(Observer* o) 
    { 
     wrappee = o; 
     valid = true; 
    } 

    void Notify() 
    { 
     if (valid) wrappee->Notify(); 
    } 
} 
class Subject 
{ 
public: 
    Subject() : t(bind(&Subject::Run, this)) 
    { 
    } 

    void Subscribe(Observer* o) 
    { 
     mutex::scoped_lock l(m); 
     boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o)); 
     observers.insert(pair<Observer*, sptr)); 
    } 

    void Unsubscribe(Observer* o) 
    { 
     mutex::scoped_lock l(m); 
     observers.find(o)->second->valid = false; 
     observers.erase(o); 
    } 

    void Run() 
    { 
     for (;;) 
     { 
      WaitForSomethingInterestingToHappen(); 
      vector<ObserverWrapper*> notifyList; 
      { 
       mutex::scoped_lock l(m); 
       boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList)); 
      } 
      // Should be no problem here 
      for_each(notifyList.begin(), notifyList.end(), 
        mem_fun(&ObserverWrapper::Notify)); 
     } 
    } 

private: 
    map<Observer*, ObserverWrapper*> observers; 
    thread t; 
    mutex m; 
}; 
0

बदलें:

+0

कोई भी क्लाइंट थ्रेड सदस्यता रद्द नहीं करता है अगर विषय धागा को 'यहां कोई समस्या नहीं होनी चाहिए' पर निलंबित कर दिया गया है। – fizzer

+0

@ फिज़ज़र हां, सदस्यता समाप्त हो सकती है। मेरा मानना ​​है कि कोई समस्या नहीं होनी चाहिए क्योंकि सदस्यता रद्द करने का ऑपरेशन पर्यवेक्षक रैपर में गलत झंडा सेट करेगा। इसलिए पर्यवेक्षक को सूचित नहीं किया जाएगा। – Sameer

+1

मेरी माफ़ी। भेद्यता वैध ध्वज का परीक्षण करने और रैपिपी पर अधिसूचना को कॉल करने के बीच है। – fizzer

संबंधित मुद्दे

 संबंधित मुद्दे