2009-05-29 16 views
8

मैं एक वर्ग बनाना चाहता हूं जिनकी विधियों को एकाधिक धागे से बुलाया जा सकता है। लेकिन जिस धागे से इसे बुलाया गया था, उस विधि को निष्पादित करने के बजाय, इसे सभी को अपने स्वयं के धागे में करना चाहिए। किसी भी परिणाम को वापस करने की आवश्यकता नहीं है और इसे कॉलिंग थ्रेड को अवरुद्ध नहीं करना चाहिए।इवेंट/टास्क कतार मल्टीथ्रेडिंग सी ++

एक पहला प्रयास कार्यान्वयन मैंने नीचे शामिल किया है। सार्वजनिक विधियां एक कार्य सूचक और डेटा को नौकरी में डालती हैं कतार, जिसे कार्यकर्ता धागा तब उठाता है। हालांकि यह विशेष रूप से अच्छा कोड नहीं है और नई विधियां जोड़ना बोझिल है।

आदर्श रूप से मैं इसे बेस क्लास के रूप में उपयोग करना चाहता हूं, जिसे मैं न्यूनतम जल्दी और कोड डुप्लिकेशंस के साथ विधियों को जोड़ सकता हूं (तर्कों की एक चर संख्या के साथ)।

ऐसा करने का बेहतर तरीका क्या है? क्या कोई मौजूदा कोड उपलब्ध है जो कुछ समान करता है? धन्यवाद

#include <queue> 

using namespace std; 

class GThreadObject 
{ 
    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     void * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 

private: 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionOneProxy(void * buffer); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 



#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* It should block on a condition, but Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    //Execute the function pointer with the attached data 
    (*this.*receivedEvent->funcPtr)(receivedEvent->data); 
} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 

    //Malloc an object the size of the function arguments 
    int argumentSize = sizeof(char*)+sizeof(int); 
    void * myData = malloc(argumentSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, &argOne, argumentSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = myData; 
    myEvent->funcPtr = &GThreadObject::functionOneProxy; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

/* 
* This is the function I would like to remove if possible 
* Split the void * buffer into arguments for the internal Function 
*/ 
void GThreadObject::functionOneProxy(void * buffer) 
{ 
    char * cBuff = (char*)buffer; 
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*))); 
}; 

int main() 
{ 
    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 

    return 0; 
} 

उत्तर

6

Futures लाइब्रेरी Boost और सी ++ मानक पुस्तकालय में अपना रास्ता बना रही है। एसीई में एक ही तरह का कुछ भी है, लेकिन मैं इसे किसी को भी सिफारिश करने से नफरत करता हूं (जैसा कि @ लोथार पहले से ही इंगित करता है, यह सक्रिय ऑब्जेक्ट है।)

+0

मैं बूस्ट :: वायदा की तलाश में था, लेकिन चूंकि यह रिलीज़ किए गए बूस्ट संस्करण का हिस्सा नहीं है, इसलिए मुझे अपने विश्वसनीय एसीई में वापस आना पड़ा :-) – lothar

+0

वायदा लाइब्रेरी बूस्ट 1.41 का हिस्सा होगा। यह http://www.stdthread.co.uk –

+0

पर धन्यवाद, एंथनी पर मेरे सी ++ 0x थ्रेड लाइब्रेरी कार्यान्वयन के हिस्से के रूप में भी उपलब्ध है। आपसे सुनकर अच्छा लगा :) –

1

आप ACE framework की ACE Patterns की Active Object एक में रुचि हो सकती। (यमक इरादा)

के रूप में निकोलाई ने बताया futures मानक सी ++ के लिए भविष्य में कुछ समय planned हैं।

2

POCO लाइब्रेरी में ActiveMethod नामक एक ही पंक्ति के साथ कुछ है (साथ ही थ्रेडिंग सेक्शन में कुछ संबंधित कार्यक्षमता जैसे ActiveResult)। स्रोत कोड आसानी से उपलब्ध है और आसानी से समझा जाता है।

1

विस्तारशीलता और रखरखाव (और अन्य-क्षमताओं) के लिए आप "नौकरी" के लिए एक अमूर्त वर्ग (या इंटरफेस) परिभाषित कर सकते हैं जो थ्रेड करना है। फिर आपके थ्रेड पूल के उपयोगकर्ता इस इंटरफ़ेस को कार्यान्वित करेंगे और ऑब्जेक्ट को थ्रेड पूल में संदर्भ देंगे। यह सिम्बियन सक्रिय ऑब्जेक्ट डिज़ाइन के समान ही है: प्रत्येक एओ सबक्लास कैक्टिव है और इसे रन() और रद्द करें() जैसे विधियों को कार्यान्वित करना है।

सादगी अपने इंटरफेस (अमूर्त वर्ग) के लिए के रूप में सरल हो सकता है के रूप में:

class IJob 
{ 
    virtual Run()=0; 
}; 

तब थ्रेड पूल, या एकल थ्रेड अनुरोध स्वीकार की तरह कुछ होगा:

class CThread 
{ 
    <...> 
public: 
    void AddJob(IJob* iTask); 
    <...> 
}; 

स्वाभाविक रूप से आप चाहते हैं ऐसे कई कार्य हैं जिनमें सभी प्रकार के अतिरिक्त सेटर्स/गेटर्स/विशेषताओं और जीवन के किसी भी चलने की आवश्यकता हो सकती है। हालांकि, केवल जरूरी विधि रन() को लागू करने की है, जो लंबा परिकलन होता है:

class CDumbLoop : public IJob 
{ 
public: 
    CDumbJob(int iCount) : m_Count(iCount) {}; 
    ~CDumbJob() {}; 
    void Run() 
    { 
     // Do anything you want here 
    } 
private: 
    int m_Count; 
}; 
+0

यह सही तरीका है जिसका उपयोग हम अपनी कंपनी में अगली-जेन सिस्टम पर गेम विकसित करने के लिए करते हैं (एक अतिरिक्त प्रदर्शन बोनस के लिए, लॉक-फ्री विधियों के माध्यम से कार्य कतार में आइटम जोड़ने में देखें) –

+0

कोई भी सुझाव लॉक-फ्री कतार को लागू करने पर? –

+0

नेट पर लॉक-फ्री कतारों के कुछ अच्छे उदाहरण हैं (यह लागू करने के लिए सबसे आसान लॉक मुक्त संरचनाओं में से एक है)। यह वह है जिसे मैंने लॉक-फ्री बैंडवागन पर जाने पर अपने दांतों को काट दिया: http://www.boyet.com/Articles/LockfreeQueue.html –

2

आपको बूस्ट का धागा -library का उपयोग करके इस का समाधान कर सकते हैं। इस (आधे छद्म) की तरह कुछ:


class GThreadObject 
{ 
     ... 

     public: 
       GThreadObject() 
       : _done(false) 
       , _newJob(false) 
       , _thread(boost::bind(&GThreadObject::workerThread, this)) 
       { 
       } 

       ~GThreadObject() 
       { 
         _done = true; 

         _thread.join(); 
       } 

       void functionOne(char *argOne, int argTwo) 
       { 
         ... 

         _jobQueue.push(myEvent); 

         { 
           boost::lock_guard l(_mutex); 

           _newJob = true; 
         } 

         _cond.notify_one(); 
       } 

     private: 
       void workerThread() 
       { 
         while (!_done) { 
           boost::unique_lock l(_mutex); 

           while (!_newJob) { 
             cond.wait(l); 
           } 

           Event *receivedEvent = _jobQueue.front(); 

           ... 
         } 
       } 

     private: 
       volatile bool    _done; 
       volatile bool    _newJob; 
       boost::thread    _thread; 
       boost::mutex    _mutex; 
       boost::condition_variable _cond; 
       std::queue<Event*>  _jobQueue; 
}; 

कृपया यह भी नोट कैसे RAII हमें इस कोड को छोटे और बेहतर प्रबंधन करने के लिए प्राप्त करने के लिए अनुमति देते हैं।

+1

std :: queue :: push threadsafe है? ऐसा लगता है कि फ़ंक्शन एक लॉक_गार्ड _jobQueue.push कॉल से पहले जाना चाहिए। –

1

यहां एक वर्ग है जिसे मैंने इसी उद्देश्य के लिए लिखा है (मैं इसे ईवेंट हैंडलिंग के लिए उपयोग करता हूं लेकिन आप इसे एक्शनक्यूयू का नाम बदल सकते हैं - और इसके तरीकों का नाम बदल सकते हैं)।

आप इस तरह इसका इस्तेमाल: void foo (const int x, const int y) { /*...*/ }

और::

समारोह के साथ आप कॉल करना चाहते हैं EventQueue q;

q.AddEvent (बढ़ावा :: बाँध (foo, 10, 20));

कार्यकर्ता धागा में

q.PlayOutEvents();

नोट: सीपीयू चक्रों का उपयोग करने से बचने के लिए स्थिति को अवरुद्ध करने के लिए कोड जोड़ने के लिए यह काफी आसान होना चाहिए।

कोड (बढ़ावा 1.34.1 साथ दृश्य स्टूडियो 2003):

#pragma once 

#include <boost/thread/recursive_mutex.hpp> 
#include <boost/function.hpp> 
#include <boost/signals.hpp> 
#include <boost/bind.hpp> 
#include <boost/foreach.hpp> 
#include <string> 
using std::string; 


// Records & plays out actions (closures) in a safe-thread manner. 

class EventQueue 
{ 
    typedef boost::function <void()> Event; 

public: 

    const bool PlayOutEvents() 
    { 
     // The copy is there to ensure there are no deadlocks. 
     const std::vector<Event> eventsCopy = PopEvents(); 

     BOOST_FOREACH (const Event& e, eventsCopy) 
     { 
      e(); 
      Sleep (0); 
     } 

     return eventsCopy.size() > 0; 
    } 

    void AddEvent (const Event& event) 
    { 
     Mutex::scoped_lock lock (myMutex); 

     myEvents.push_back (event); 
    } 

protected: 

    const std::vector<Event> PopEvents() 
    { 
     Mutex::scoped_lock lock (myMutex); 

     const std::vector<Event> eventsCopy = myEvents; 
     myEvents.clear(); 

     return eventsCopy; 
    } 

private: 

    typedef boost::recursive_mutex Mutex; 
    Mutex myMutex; 

    std::vector <Event> myEvents; 

}; 

मुझे आशा है कि इस मदद करता है। :)

मार्टिन Bilski

+0

मैं अत्यधिक म्यूटेक्स (या "सिंक्रनाइज़ेशन आदिम" का कोई अन्य रूप) का उपयोग करने की अत्यधिक अनुशंसा करता हूं क्योंकि वे एकाधिक प्रोसेसर के साथ स्केल नहीं करते हैं (लगभग 4-8 के बाद वे वास्तव में perfomance कम करते हैं)। वास्तव में स्केलेबल इम्प्लेमेनेशंस के लिए लॉक-फ्री कोडिंग में देखें। इसके अलावा, यदि आपको सिंक्रनाइज़ेशन आदिम का उपयोग करने की आवश्यकता है, तो एक महत्वपूर्ण सेक्शन का उपयोग करें क्योंकि वे एक म्यूटेक्स से तेज हैं (म्यूटेक्स प्रक्रिया सुरक्षित है, महत्वपूर्ण खंड थ्रेड सुरक्षित है, यानी प्रक्रियाओं के बीच सिंक्रनाइज़ करते समय एक म्यूटेक्स का उपयोग करें, सीएस में थ्रेड को सिंक करते समय प्रक्रिया) –

0

आपको बूस्ट ASIO पुस्तकालय पर एक नज़र रखना चाहिए। यह घटनाओं को असंकालिक रूप से प्रेषित करने के लिए डिज़ाइन किया गया है। आपके द्वारा वर्णित सिस्टम बनाने के लिए इसे बूस्ट थ्रेड लाइब्रेरी के साथ जोड़ा जा सकता है।

आपको एक boost::asio::io_service ऑब्जेक्ट को तुरंत चालू करने और एसिंक्रोनस ईवेंट (boost::asio::io_service::post या boost::asio::io_service::dispatch) की एक श्रृंखला शेड्यूल करने की आवश्यकता होगी। इसके बाद, आप सदस्य फ़ंक्शन को n थ्रेड से कॉल करें। io_service ऑब्जेक्ट थ्रेड-सुरक्षित है और गारंटी देता है कि आपके एसिंक्रोनस हैंडलर केवल उस थ्रेड में प्रेषित किए जाएंगे, जिससे आपने io_service::run कहा था।

boost::asio::strand ऑब्जेक्ट सरल थ्रेड सिंक्रनाइज़ेशन के लिए भी उपयोगी है।

इसके लायक होने के लिए, मुझे लगता है कि एएसआईओ लाइब्रेरी इस समस्या का एक बहुत ही सुरुचिपूर्ण समाधान है।

1

नीचे एक कार्यान्वयन है जिसे "functionProxy" विधि की आवश्यकता नहीं है। हालांकि नई विधियों को जोड़ना आसान है, फिर भी यह गन्दा है।

बूस्ट :: बाइंड और "वायदा" ऐसा लगता है जैसे वे इसे बहुत साफ करेंगे। मुझे लगता है कि मुझे बूस्ट कोड पर एक नज़र डालेंगी और देखें कि यह कैसे काम करता है। आपके सुझावों के लिए सभी को धन्यवाद।

GThreadObject.h

#include <queue> 

using namespace std; 

class GThreadObject 
{ 

    template <int size> 
    class VariableSizeContainter 
    { 
     char data[size]; 
    }; 

    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     int dataSize; 
     char * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 
    void functionTwo(int argTwo, int arg2); 


private: 
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize); 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionTwoInternal(int argTwo, int arg2); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 

GThreadObject.cpp

#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument. 
    * This is the bit i would like to remove 
    * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize 
    * Subsequent data sizes would need to be added with an else if 
    * */ 
    if (receivedEvent->dataSize == 8) 
    { 
     const int size = 8; 

     void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>); 
     newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr; 

     //Execute the function 
     (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data)); 
    } 

    //Clean up 
    free(receivedEvent->data); 
    delete receivedEvent; 

} 

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize) 
{ 

    //Malloc an object the size of the function arguments 
    void * myData = malloc(argSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, (char*)argStart, argSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = (char*)myData; 
    myEvent->dataSize = argSize; 
    myEvent->funcPtr = funcPtr; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 

} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

void GThreadObject::functionTwo(int argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionTwoInternal(int argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl; 
} 

main.cpp

#include <iostream> 
#include "GThreadObject.h" 

int main() 
{ 

    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 
    myObj.functionTwo(456, 23); 


    return 0; 
} 

संपादित करें: बस संपूर्णता के लिए मैं बूस्ट :: बाँध के साथ एक कार्यान्वयन किया था।मुख्य अंतर:

queue<boost::function<void()> > jobQueue; 

void GThreadObjectBoost::functionOne(char * argOne, int argTwo) 
{ 
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo)); 

    workerThread(); 
} 

void GThreadObjectBoost::workerThread() 
{ 
    boost::function<void()> func = jobQueue.front(); 
    func(); 
} 

functionOne के 10,000,000 पुनरावृत्ति के लिए बढ़ावा कार्यान्वयन का उपयोग करना() यह ~ 19sec ले लिया। हालांकि गैर वृद्धि कार्यान्वयन केवल ~ 6.5 सेकंड लिया। तो लगभग 3x धीमा। मैं अनुमान लगा रहा हूं कि एक अच्छी गैर-लॉकिंग कतार यहां सबसे बड़ी प्रदर्शन बोतल गर्दन होगी। लेकिन यह अभी भी काफी बड़ा अंतर है।

संबंधित मुद्दे