2012-08-28 16 views
6

मैंने boost::asio का उपयोग करके एक थ्रेड पूल लागू किया है, और कुछ संख्या boost::thread ऑब्जेक्ट्स boost::asio::io_service::run() पर कॉल कर रही हैं। हालांकि, मुझे जो आवश्यकता है वह "स्वास्थ्य" के लिए सभी धागे की निगरानी करने का एक तरीका है। मेरा इरादा एक साधारण सेंटीनेल ऑब्जेक्ट बनाना है जिसे थ्रेड पूल के माध्यम से पारित किया जा सकता है - अगर यह इसे बनाता है, तो हम मान सकते हैं कि थ्रेड अभी भी काम कर रहा है।बूस्ट :: एएसओ, थ्रेड पूल और थ्रेड मॉनिटरिंग

हालांकि, मेरे कार्यान्वयन के बाद, मुझे यकीन नहीं है कि कैसे (अगर) मैं पूल में सभी धागे को विश्वसनीय रूप से देख सकता हूं। मैंने बस थ्रेड फ़ंक्शन को boost::asio::io_service::run() पर सौंप दिया है, इसलिए io_service इंस्टेंस में एक सेंटीनेल ऑब्जेक्ट पोस्ट करना यह गारंटी नहीं देगा कि कौन सा थ्रेड वास्तव में उस सेंटीनेल को प्राप्त करेगा और काम करेगा।

एक विकल्प समय-समय पर सेंटीनेल डालने के लिए हो सकता है, और उम्मीद है कि यह प्रत्येक थ्रेड द्वारा कम से कम एक बार उचित समय में उठाया जाता है, लेकिन यह स्पष्ट रूप से आदर्श नहीं है।

निम्नलिखित उदाहरण लें। जिस तरह से हैंडलर को कोड किया गया है, इस उदाहरण में हम देख सकते हैं कि प्रत्येक थ्रेड एक ही काम करेगा, लेकिन असल में मुझे हैंडलर कार्यान्वयन पर नियंत्रण नहीं होगा, कुछ लंबे समय तक चल सकते हैं जबकि अन्य लगभग तत्काल।

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

उत्तर

2

समाधान है कि मैं प्रयोग किया जाता है तथ्य यह है कि मैं चलने पूल वस्तुओं के कार्यान्वयन के मालिक हैं पर निर्भर करता है:

अपने थ्रेड पूल में एक कार्य आप कर सकते हैं पोस्ट करने के लिए। मैंने एक रैपर प्रकार बनाया जो आंकड़ों को अपडेट करेगा, और थ्रेड पूल पर पोस्ट किए गए उपयोगकर्ता परिभाषित हैंडलर की प्रतिलिपि बनाएँ। केवल इस रैपर प्रकार को अंतर्निहित io_service पर पोस्ट किया गया है। यह विधि मुझे उपयोगकर्ता कोड में घुसपैठ किए बिना, पोस्ट/निष्पादित किए गए हैंडलरों का ट्रैक रखने की अनुमति देती है।

यहाँ एक नीचे छीन और सरल उदाहरण है:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

आउटपुट:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

क्या आप अपने उत्तर @Chad में कोड जोड़ सकते हैं? –

+0

हो गया। इस पर किसी भी प्रतिक्रिया के लिए खुश है। – Chad

6

आप के बीच सभी धागे और हर धागे के लिए एक निजी io_service उदाहरण एक आम io_service उदाहरण का उपयोग कर सकते हैं। हर धागा इस तरह की एक विधि पर अमल होगा:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

इस वैसे, अगर आप यह सुनिश्चित करना है कि कुछ काम के लिए एक सूत्र में निष्पादित किया जाता है चाहते हैं, आप केवल अपने स्वामित्व io_service में इस कार्य पोस्ट करने के लिए किया है।

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

एक दिलचस्प दृष्टिकोण है, लेकिन मैं थोड़ा और सीधे आगे कुछ के लिए देख रहा हूँ। अगर अगले कुछ दिनों में कुछ भी नहीं आता है, तो मैं यह जवाब स्वीकार कर सकता हूं। – Chad

+0

मुझे लगता है कि बूट एएसओ का उपयोग करके कोई आसान समाधान नहीं है। मैंने कुछ कोड के साथ इस तरह का एक समाधान विकसित किया और यह काम करता है। –

संबंधित मुद्दे