2013-07-31 12 views
5

के साथ एएसओ पैटर्न को बढ़ावा दें मैं जीयूआई के लिए थ्रेड का उपयोग करके बूस्ट एएसओ पैटर्न और कुछ सॉकेट आईओ के लिए एक वर्कर थ्रेड का उपयोग करना चाहता हूं।जीयूआई और वर्कर थ्रेड

कार्यकर्ता धागा सॉकेट क्लाइंट को प्रबंधित करने के लिए boost::asio::io_service का उपयोग करेगा। सॉकेट पर सभी संचालन केवल कार्यकर्ता धागे द्वारा किया जाएगा।

जीयूआई थ्रेड को कार्यकर्ता थ्रेड से संदेश भेजने और प्राप्त करने की आवश्यकता है।

मैं बिल्कुल नहीं समझ सकता कि बूस्ट एएसओ का उपयोग करके इस पैटर्न को कैसे कार्यान्वित किया जाए।

मैंने पहले से ही मानक एएसओ तरीके से सॉकेट संचार लागू किया है (मैं वर्कर थ्रेड से io_service.run() पर कॉल करता हूं और मैं async_read_some/async_send का उपयोग करता हूं)। मुझे strands की आवश्यकता नहीं है क्योंकि io_service.run() केवल कार्यकर्ता धागे से ही कहा जाता है।

अब मैं क्रॉस थ्रेड संदेश कतार जोड़ने की कोशिश कर रहा हूं। मैं इसे कैसे कार्यान्वित कर सकता हूं?

क्या मुझे runio_service GUI थ्रेड से भी चाहिए?

या मैं बस (जीयूआई धागे से io_service.run() या io_service.poll_one() बुला के बिना) कार्यकर्ता धागा करने के लिए जीयूआई धागे से संदेश पोस्ट करने post साथ strands का उपयोग करना चाहिए, और कार्यकर्ता धागे से संदेश पोस्ट करने ऑपरेटिंग सिस्टम के GUI संदेश पाश का उपयोग जीयूआई धागे के लिए?

मैं, के बाद से io_service दो धागे के बीच साझा किया जाता है io_service.run() या io_service.poll_one() जीयूआई धागे से भी कॉल करने की आवश्यकता मैं सॉकेट कार्यों पर strands का उपयोग करने की आवश्यकता है, अगर

संपादित करें: मेरे प्रश्न को स्पष्ट करने के लिए, मैं जो कुछ भी कर सकता हूं, वह संदेश कतार को लागू करने के लिए, बूस्ट एएसओ का उपयोग करके, अन्य पुस्तकालयों पर निर्भर करता है, केवल बूस्ट एशियाई नौकरी नहीं कर सकता है।

+0

इस तरह जीयूआई से कार्यकर्ता धागा करने के लिए पोस्ट करने के बारे में है: http://stackoverflow.com/questions/17321506/c-non-blocking-asio-run/17321862 # 17321862। यह इसके विपरीत है: http: //stackoverflow.com/questions/17311512/howto-post-messages-between-threads-with-boostasio/17315567#17315567 –

उत्तर

7

संदेश पासिंग काफी सामान्य है। समस्या से निपटने के कई तरीके हैं, और समाधान वांछित व्यवहार संबंधी विवरणों पर निर्भर करेगा। उदाहरण के लिए, अवरुद्ध या गैर अवरुद्ध, स्मृति आवंटन, संदर्भ, आदि को नियंत्रित करने के लिए

  • Boost.Lockfree झुलसाना/बहु उपभोक्ता/उत्पादकों के लिए धागा सुरक्षित ताला मुक्त गैर अवरुद्ध कतारों प्रदान करता है। यह इवेंट लूप के लिए खुद को काफी अच्छी तरह से उधार देता है, जहां उपभोक्ता को अवरुद्ध करने के लिए आदर्श नहीं है, निर्माता को सिंक्रनाइज़ेशन निर्माण सिग्नल करने का इंतजार है।

    boost::lockfree::queue<message_type> worker_message_queue; 
    
    void send_worker_message(const message_type& message) 
    { 
        // Add message to worker message queue. 
        worker_message_queue.push(message); 
    
        // Add work to worker_io_service that will process the queue. 
        worker_io_service.post(&process_message); 
    } 
    
    void process_message() 
    { 
        message_type message; 
    
        // If the message was not retrieved, then return early. 
        if (!worker_message_queue.pop(message)) return; 
    
        ... 
    } 
    
  • वैकल्पिक रूप से, Boost.Asio के io_service एक कतार के रूप में कार्य कर सकते हैं। संदेश को केवल निर्दिष्ट हैंडलर से बाध्य होना चाहिए।

    void send_worker_message(const message_type& message) 
    { 
        // Add work to worker_io_service that will process the message. 
        worker_io_service.post(boost::bind(&process_message, message)); 
    } 
    
    void process_message(message_type& message) 
    { 
        ... 
    } 
    

This टिप्पणी का सुझाव है कि इच्छा संदेश गुजर से अधिक है। ऐसा लगता है कि अंतिम लक्ष्य एक थ्रेड को किसी अन्य थ्रेड को मनमाने ढंग से कार्य करने के लिए अनुमति देना है।

यदि यह मामला है, तो विचार करें:

  • एक प्रबंधित संकेतों और स्लॉट कार्यान्वयन के लिए Boost.Signals2 का उपयोग करना। यह एक सिग्नल के साथ पंजीकरण करने के लिए मनमानी कार्यों की अनुमति देता है।
  • सिग्नल उत्सर्जन सेट करने के लिए Boost.Asio के io_service का उपयोग करना। यदि जीयूआई थ्रेड और वर्कर थ्रेड के पास प्रत्येक का अपना io_service है, तो वर्कर थ्रेड एक हैंडलर को जीयूआई थ्रेड के io_service में पोस्ट कर सकता है जो सिग्नल उत्सर्जित करेगा। जीयूआई थ्रेड के मुख्य पाश में, यह io_service पर मतदान करेगा, सिग्नल उत्सर्जित करेगा, और जीयूआई थ्रेड के संदर्भ में स्लॉट का आह्वान किया जाएगा।

यहाँ पूरा उदाहरण है, जहां दो धागे संदेश एक-दूसरे के पास (एक unsigned int के रूप में) के साथ ही पैदा कर रहा मनमाना कार्यों एक और धागा भीतर लागू किया जा रहा है।

#include <iostream> 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/signals2.hpp> 
#include <boost/thread.hpp> 

/// @brief io_service dedicated to gui. 
boost::asio::io_service gui_service; 

/// @brief io_service dedicated to worker. 
boost::asio::io_service worker_service; 

/// @brief work to keep gui_service from stopping prematurely. 
boost::optional<boost::asio::io_service::work> gui_work; 

/// @brief hello slot. 
void hello(int x) 
{ 
    std::cout << "hello with " << x << " from thread " << 
       boost::this_thread::get_id() << std::endl; 
} 

/// @brief world slot. 
void world(int x) 
{ 
    std::cout << "world with " << x << " from thread " << 
       boost::this_thread::get_id() << std::endl; 
} 

/// @brief Type for signals. 
typedef boost::signals2::signal<void (int)> signal_type; 

void emit_then_notify_gui(signal_type& signal, unsigned int x); 

/// @brief Emit signals then message worker. 
void emit_then_notify_worker(signal_type& signal, unsigned int x) 
{ 
    // Emit signal, causing registered slots to run within this thread. 
    signal(x); 

    // If x has been exhausted, then cause gui service to run out of work. 
    if (!x) 
    { 
    gui_work = boost::none; 
    } 
    // Otherwise, post work into worker service. 
    else 
    { 
    std::cout << "GUI thread: " << boost::this_thread::get_id() << 
       " scheduling other thread to emit signals" << std::endl; 
    worker_service.post(boost::bind(
     &emit_then_notify_gui, 
     boost::ref(signal), --x)); 
    } 
} 

/// @brief Emit signals then message worker. 
void emit_then_notify_gui(signal_type& signal, unsigned int x) 
{ 
    // Emit signal, causing registered slots to run within this thread. 
    signal(x); 

    // If x has been exhausted, then cause gui service to run out of work. 
    if (!x) 
    { 
    gui_work = boost::none; 
    } 
    // Otherwise, post more work into gui. 
    else 
    { 
    std::cout << "Worker thread: " << boost::this_thread::get_id() << 
       " scheduling other thread to emit signals" << std::endl; 
    gui_service.post(boost::bind(
     &emit_then_notify_worker, 
     boost::ref(signal), --x)); 
    } 
} 

void worker_main() 
{ 
    std::cout << "Worker thread: " << boost::this_thread::get_id() << std::endl; 
    worker_service.run(); 
} 

int main() 
{ 
    signal_type signal; 

    // Connect slots to signal. 
    signal.connect(&hello); 
    signal.connect(&world); 

    boost::optional<boost::asio::io_service::work> worker_work(
    boost::ref(worker_service)); 
    gui_work = boost::in_place(boost::ref(gui_service)); 

    std::cout << "GUI thread: " << boost::this_thread::get_id() << std::endl; 

    // Spawn off worker thread. 
    boost::thread worker_thread(&worker_main); 

    // Add work to worker. 
    worker_service.post(boost::bind(
     &emit_then_notify_gui, 
     boost::ref(signal), 3)); 

    // Mocked up GUI main loop. 
    while (!gui_service.stopped()) 
    { 
    // Do other GUI actions. 

    // Perform message processing. 
    gui_service.poll_one(); 
    } 

    // Cleanup. 
    worker_work = boost::none; 
    worker_thread.join(); 
} 

और इसके उत्पादन:

GUI thread: b7f2f6d0 
Worker thread: b7f2eb90 
hello with 3 from thread b7f2eb90 
world with 3 from thread b7f2eb90 
Worker thread: b7f2eb90 scheduling other thread to emit signals 
hello with 2 from thread b7f2f6d0 
world with 2 from thread b7f2f6d0 
GUI thread: b7f2f6d0 scheduling other thread to emit signals 
hello with 1 from thread b7f2eb90 
world with 1 from thread b7f2eb90 
Worker thread: b7f2eb90 scheduling other thread to emit signals 
hello with 0 from thread b7f2f6d0 
world with 0 from thread b7f2f6d0
+0

धन्यवाद, यह वही है जो मैं पूरा करना चाहता था। केवल एक प्रश्न: जब आप 'worker_io_service.post (boost :: bind (& process_message, message)) करते हैं;' या 'worker_service.post (boost :: bind (& emit_then_notify_gui, boost :: ref (signal), --x) ; 'क्या आपको' bind' को लपेटने के लिए बूस्ट एएसियो 'स्ट्रैंड्स' का उपयोग करने की आवश्यकता है, क्योंकि आप 'io_service' की तुलना में एक differente थ्रेड से कॉल करते हैं? –

+0

यह भी इंगित करने के लिए धन्यवाद कि सही पैटर्न दो अलग-अलग 'io_service' का उपयोग GUI amd कार्यकर्ता थ्रेड के लिए करेगा, और दो थ्रेड के बीच साझा एक 'io_service' नहीं। बूस्ट एएसओ का मेरा ज्ञान काफी बुनियादी है और आपके उत्तर ने इसके सही उपयोग पर कुछ प्रकाश डाला है। –

+2

@ एनीरिको: 'io_service' थ्रेड सुरक्षित है, इसलिए 'स्ट्रैंड' की कोई आवश्यकता नहीं है। हालांकि, एक 'स्ट्रैंड' कुछ अतिरिक्त ओवरहेड के साथ एक ही परिणाम प्रदान करेगा। एक 'स्ट्रैंड' एक [हैंडलर की कतार] रखता है (http://stackoverflow.com/a/14320871/1053968), केवल एक समय में अपने हैंडलर को अपने 'io_service' में पोस्ट कर रहा है [' प्रेषण() '] के माध्यम से (http://stackoverflow.com/a/14414809/1053968)। उपरोक्त उदाहरण के संदर्भ में, 'प्रेषण() '' post() 'को प्रतिनिधि करेगा। तारों के बारे में अधिक जानकारी के लिए और जब उनकी आवश्यकता होती है, तो यह [यह] पढ़ने के लायक हो सकता है (http://stackoverflow.com/a/12801042/1053968) उत्तर। –

0

यदि आपके पास केवल एक कार्यकर्ता है, तो यह आसान है।

एएसआईओ के हैंडलर को थ्रेड (ओं) द्वारा निष्पादित किया जाता है जो io_service.run() पर कॉल कर रहे हैं। आपके मामले में, इसका मतलब है कि केवल एक धागा, कार्यकर्ता एक, कॉलबैक हैंडलर निष्पादित कर सकता है। तो आपको यहां थ्रेड सुरक्षा के बारे में चिंता करने की आवश्यकता नहीं है।

आपका जीयूआई थ्रेड, यह मानते हुए कि उसके पास किसी की सॉकेट तक पहुंच है, बिना किसी समस्या के boost::asio::async_write() पर कॉल कर सकते हैं। कॉलबैक हैंडलर, हालांकि, कार्यकर्ता धागे में निष्पादित किया जाएगा। boost::asio::async_write() फोन करके

  1. व्यापार तर्क धागा (अपने जीयूआई धागा हो सकता है) ने अपने ग्राहक से एक के लिए एक लेख को आसानी से निर्धारित कर सकते हैं:

    मेरे अनुभव (admitedly सीमित) से, मैं इस पद्धति का इस्तेमाल किया कार्यकर्ता धागा इसका ख्याल रखेगा।

  2. कार्यकर्ता धागा कुछ boost::asio::async_read() शुरू करता है, और "व्यापार तर्क पैकेट" बना सकता है। मेरा मतलब यह है कि, यह अर्थपूर्ण संदेश बनाता है (कच्चे डेटा से कस्टम वर्ग Packet या Event या w/e you what) का उप-वर्ग हो सकता है।
  3. जब कार्यकर्ता धागे के पास ऐसा संदेश बनाने के लिए पर्याप्त डेटा होता है, तो यह करता है, और उसके बाद इसे थ्रेड-सुरक्षित कतार में लगाया जाता है जो GUI थ्रेड खींच जाएगा।
  4. जीयूआई (या व्यापार तर्क) धागा संदेश को संसाधित करता है।

मुझे बताएं कि क्या यह स्पष्ट नहीं है/अगर मैं और अधिक सहायता कर सकता हूं।

+0

मुझे यह बताने के लिए धन्यवाद कि boost :: asio :: async_write() को जीयूआई थ्रेड से बुलाया जा सकता है, यह एक अतिरिक्त जानकारी है जो भविष्य में आसान हो सकती है (अभी के लिए, मुझे सीधे सॉकेट पर लिखने के लिए जीयूआई थ्रेड की आवश्यकता नहीं है: कार्यकर्ता थ्रेड पूरे व्यवसाय तर्क का प्रबंधन करता है)। लेकिन मेरा सवाल अलग था: मुझे दो धागे के बीच संदेशों का आदान-प्रदान करने की ज़रूरत है, यानी जीयूआई थ्रेड से वर्कर थ्रेड तक मनमानी तरीके से कॉल करें, और इसके विपरीत। सिग्नल और स्लॉट का उपयोग करके क्यूटी में वास्तव में क्या हासिल किया जाता है। –

+0

ओह, मैं देखता हूं, मुझे इस तरह समझ में नहीं आया, क्षमा करें। मुझे डर है कि मुझे जवाब देने के लिए पर्याप्त अनुभव नहीं हुआ है। मनमाने ढंग से विधि के कॉल के लिए बहुत सारे काम की आवश्यकता होती है (जिसे क्यूटी-एमओसी फाइल, आईआईआरसी में कुछ कोड उत्पन्न करके संभाला जाता है)। मुझे नहीं पता कि बूस्ट ऐसे उपकरण प्रदान करता है या नहीं। मैं संदेश पास करने की अनुशंसा करता हूं, लेकिन मुझे यकीन नहीं है कि यह काम करेगा, क्योंकि 'io_service.run()' एक अवरुद्ध कॉल है। आइए आशा करते हैं कि अधिक अनुभव वाला कोई व्यक्ति मदद करने में सक्षम होगा :)। – Xaqq

+0

असल में मुझे संदेह है कि आप केवल io_service का उपयोग करके GUI => कार्यकर्ता, और कार्यकर्ता => GUI संदेश भेज सकते हैं। मुझे यकीन नहीं है, लेकिन एएसओ स्ट्रैंड्स का उपयोग करके, आप io_service को "जागृत" कर सकते हैं और एक हैंडलर को कॉल कर सकते हैं। मैं जानना चाहता हूं कि किसी को ऐसे तरीके से कैसे कार्यान्वित किया जाए, जो बूस्ट एएसओ –

1

जिस तरह से मैं 2+ धागे के बीच संदेशों का आदान-प्रदान करता हूं वह एक कतार की तरह एक कंटेनर का उपयोग करना है और उन्हें वहां स्टोर करना है और फिर कार्यकर्ता धागे को जागने और संसाधित करने के लिए सूचित करने के लिए एक ईवेंट का उपयोग करना है।

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf) 
{ 
    // This method creates a msg object and saves it in the SendMsgQ object. 
    // 
    Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf); 
    SendMsgQ.Push(pMsg); 
    // Signal the send worker thread to wake up and send the msg to the server. 
    SetEvent(hEvent); 
} 

हेडर फाइल में:

std::queue<Message*> SendMsgQueue; // Queue of msgs to send to the server. 

ऊपर कोड माइक्रोसॉफ्ट कुलपति ++ के लिए है यहाँ एक उदाहरण है। यदि आपका विकास वातावरण अलग है तो आपको एक अलग वर्ग या विधियों का उपयोग करना पड़ सकता है। लेकिन, विचार एक जैसा होना चाहिए।

संपादित करें - अधिक पूरा कोड उदाहरण

#include "StdAfx.h" 
#include "SSLSocket.h" 

boost::shared_ptr<boost::asio::io_service> SSLSocket::IOService; 
bool SSLSocket::LobbySocketOpen = false; 
SSLSocket* SSLSocket::pSSLLobby = 0; 
int SSLSocket::StaticInit = 0; 
Callback SSLSocket::CallbackFunction; 
BufferManagement SSLSocket::BufMang; 
volatile bool SSLSocket::ReqAlive = true; 
Logger SSLSocket::Log; 
HANDLE SSLSocket::hEvent; 
bool SSLSocket::DisplayInHex; 
ConcurrentMsgQueue SSLSocket::SendMsgQ; 
bool SSLSocket::RcvThreadCreated = 0; 
BufferManagement* Message::pBufMang; 
bool SSLSocket::ShuttingDown = false; 
std::vector<SSLSocket *> SocketList; 

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, 
    const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0) 
{ 
    // SSLSocket Constructor. 
    // If the static members have not been intialized yet, then initialize them. 
    LockCode = new Lock(); 
    if (!StaticInit) 
    { 
     SocketList.push_back(this); 
     DisplayInHex = displayInHex; 
     BufMang.Init(bufMangLen); 
     Message::SetBufMang(&BufMang); 
     // This constructor enables logging according to the vars passed in. 
     Log.Init(logToFile, logToConsole, levelOfLog, logFileName); 
     StaticInit = 1; 
     hEvent = CreateEvent(NULL, false, false, NULL); 
     // Define the ASIO IO service object. 
     // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service); 
     boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service); 
     IOService = IOServ; 
     pSSLLobby = this; 
    } 
} 

SSLSocket::~SSLSocket(void) 
{ 
    if (pSocket) 
     delete pSocket; 
    if (--StaticInit == 0) 
     CloseHandle(hEvent); 
} 

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port) 
{ 
    // Connects to the server. 
    // serverPath - specifies the path to the server. Can be either an ip address or url. 
    // port - port server is listening on. 
    // 
    try 
    { 
     LockCode->Acquire(); // Single thread the code. 
     // If the user has tried to connect before, then make sure everything is clean before trying to do so again. 
     if (pSocket) 
     { 
     delete pSocket; 
     pSocket = 0; 
     }                         
     // If serverPath is a URL, then resolve the address. 
     if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr. 
     { 
     // Create the resolver and query objects to resolve the host name in serverPath to an ip address. 
     boost::asio::ip::tcp::resolver resolver(*IOService); 
     boost::asio::ip::tcp::resolver::query query(serverPath, port); 
     boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // The thread we are on now, is most likely the user interface thread. Create a thread to handle all incoming socket work messages. 
     // Only one thread is created to handle the socket I/O reading and another thread is created to handle writing. 
     if (!RcvThreadCreated) 
     { 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this)); 
      RcvThreadCreated = true; 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this)); 
     } 
     // Try to connect to the server. Note - add timeout logic at some point. 
     boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator, 
      boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
     // serverPath is an ip address, so try to connect using that. 
     // 
     stringstream ss1; 
     boost::system::error_code EC; 
     ss1 << "SSLSocket::Connect: Preparing to connect to game server " << serverPath << " : " << port << ".\n"; 
     Log.LogString(ss1.str(), LogInfo); 
     // Create an endpoint with the specified ip address. 
     const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath)); 
     int iport = atoi(port.c_str()); 
     const boost::asio::ip::tcp::endpoint EP(IP, iport); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // Try to connect to the server. Note - add timeout logic at some point. 
     pSocket->next_layer().connect(EP, EC); 
     if (EC) 
     { 
      // Log an error. This worker thread should exit gracefully after this. 
      stringstream ss; 
      ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << EC.message() + ".\n"; 
      Log.LogString(ss.str(), LogError); 
     } 
     stringstream ss; 
     ss << "SSLSocket::Connect: Calling HandleConnect for game server " << serverPath << " : " << port << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     HandleConnect(EC); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
    LockCode->Release(); 
} 

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf) 
{ 
    // This method creates a msg object and saves it in the SendMsgQ object. 
    // sends the number of bytes specified by bytesInMsg in pBuf to the server. 
    // 
    Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf); 
    SendMsgQ.Push(pMsg); 
    // Signal the send worker thread to wake up and send the msg to the server. 
    SetEvent(hEvent); 
} 


void SSLSocket::SendWorkerThread(SSLSocket* psSLS) 
{ 
    // This thread method gets called to process the messages to be sent to the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->ProcessSendRequests(); 
} 

void SSLSocket::ProcessSendRequests() 
{ 
    // This method handles sending msgs to the server. 
    // 
    std::stringstream ss; 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Front(); 
      SSLSocket* pSSL = pMsg->pSSL; 
      SendMsgQ.Pop(); 
      const Byte* pBuf = pMsg->pBuf; 
      const int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      LockCode->Acquire(); // Single thread the code. 
      try 
      { 
       boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this, 
        boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
      } 
      catch (std::exception& e) 
      { 
       stringstream ss; 
       ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n"; 
       Log.LogString(ss.str(), LogError); 
      } 
      ss.str(std::string()); 
      ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
      LockCode->Release(); 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. Nothing to do really since reading is handled by the HandleRead method. 
    // 
    std::stringstream ss; 
    try 
    { 
     if (error) 
     { 
     ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS) 
{ 
    // This is the method that gets called when the receive thread is created by this class. 
    // This thread method focuses on processing messages received from the server. 
    // 
    // Since this has to be a static method, call an instance method on the class to handle server requests. 
    psSLS->InitAsynchIO(); 
} 

void SSLSocket::InitAsynchIO() 
{ 
    // This method is responsible for initiating asynch i/o. 
    boost::system::error_code Err; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for asynch i/o. The thread will hang here until the stop method has been called or an error occurs. 
     // Add a work object so the thread will be dedicated to handling asynch i/o. 
     boost::asio::io_service::work work(*IOService); 
     IOService->run(); 
     Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleConnect(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the connect request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     LockCode->Acquire(); // Single thread the code. 
     pSocket->async_handshake(boost::asio::ssl::stream_base::client, 
      boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error)); 
     LockCode->Release(); 
     ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     } 
     else 
     { 
     // Log an error. This worker thread should exit gracefully after this. 
     ss << "SSLSocket::HandleConnect: connect failed. Error: " << error.message() + ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleHandshake(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the handshake request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // Try to send the first message that the server is expecting. This msg tells the server we want to connect. 
     // 
     unsigned char Msg[5] = {0x17, 0x00, 0x00, 0x00, 0x06}; 
     boost::system::error_code Err; 
     // 
     if (pSSLLobby == this) 
      LobbySocketOpen = true; 
     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     LockCode->Acquire(); // Single thread the code. 
     int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(5), Err); 
     if (Err) 
     { 
      ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n"; 
      Log.LogString(ss.str(), LogInfo); 
     } 
     HandleFirstWrite(Err, Count); 
     LockCode->Release(); 
     ss.str(""); 
     ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     } 
     else 
     { 
     ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n"; 
     IOService->stop(); 
     } 
     Log.LogString(ss.str(), LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. This method is only called from HandleHandShake. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // Notify the UI that we are now connected. Create a 6 byte msg for this. 
     pDataBuf = BufMang.GetPtr(6); 
     BYTE* p = pDataBuf; 
     // Create msg type 500 
     *p = 244; 
     *++p = 1; 
     CallbackFunction(this, 2, (void*)pDataBuf); 
     // Get the 1st 4 bytes of the next msg, which is always the length of the msg. 
     pDataBuf = BufMang.GetPtr(MsgLenBytes); 
     try 
     { 
      boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this, 
       boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     } 
     catch (std::exception& e) 
     { 
      stringstream ss; 
      ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n"; 
      Log.LogString(ss.str(), LogError); 
      Stop(); 
     } 
     } 
     else 
     { 
     ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called to process an incoming message. 
    // 
    std::stringstream ss; 
    int ByteCount; 
    try 
    { 
     // ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     // Log.LogString(ss.str(), LogInfo); 
     // Set to exit this thread if the user is done. 
     if (!ReqAlive) 
     { 
     // IOService->stop(); 
     return; 
     } 
     if (!error) 
     { 
     // Get the number of bytes in the message. 
     if (bytesTransferred == 4) 
     { 
      ByteCount = BytesToInt(pDataBuf); 
     } 
     else 
     { 
      // Call the C# callback method that will handle the message. 
      ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      if (bytesTransferred > 0) 
      { 
       Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3); 
       Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2); 
       CallbackFunction(this, bytesTransferred, (void*)pDataBuf); 
      } 
      else 
      { 
       // # of bytes transferred = 0. Don't do anything. 
       bytesTransferred = 0; // For debugging. 
      } 
      // Prepare to read in the next message length. 
      ByteCount = MsgLenBytes; 
     } 
     pDataBuf = BufMang.GetPtr(ByteCount); 
     boost::system::error_code Err; 
     try 
     { 
      boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
       this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     } 
     catch (std::exception& e) 
     { 
      stringstream ss; 
      ss << "SSLSocket::HandleRead: threw this error - " << e.what() << ".\n"; 
      Log.LogString(ss.str(), LogError); 
     } 
     } 
     else 
     { 
     Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::Stop() 
{ 
    // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on. If this is not done, then an exception will be thrown 
    // when it comes time to delete this object. 
    // 
    boost::system::error_code EC; 
    try 
    { 
     // This method can be called from the handler as well. So once the ShuttingDown flag is set, don't go throught the same code again. 
     if (ShuttingDown) 
     return; 
     LockCode->Acquire(); // Single thread the code. 
     if (!ShuttingDown) 
     { 
     ShuttingDown = true; 
     pSocket->next_layer().cancel(); 
     pSocket->shutdown(EC); 
     if (EC) 
     { 
      stringstream ss; 
      ss << "SSLSocket::Stop: socket shutdown error - " << EC.message() << ".\n"; 
     } 
     else 
     { 
      pSocket->next_layer().close(); 
     } 
     delete pSocket; 
     pSocket = 0; 
     ReqAlive = false; 
     SetEvent(hEvent); 
     IOService->stop(); 
     LobbySocketOpen = false; 
     WorkerThreads.join_all(); 
     } 
     LockCode->Release(); 
     delete LockCode; 
     LockCode = 0; 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::Stop: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

तो, के बारे में आप एक कतार है या नहीं का उपयोग करने के लिए कि क्या आपके सवाल का जवाब में। Xaqq पर आपकी टिप्पणी में, आपने कहा "मुझे दो धागे के बीच संदेशों का आदान-प्रदान करने की आवश्यकता है।" तो एक कतार की तरह एक कंटेनर का उपयोग यह है कि कैसे प्रसंस्करण के लिए संदेशों को किसी अन्य धागे में पारित किया जा सकता है। यदि आपको एसटीएल कंटेनर पसंद नहीं हैं, तो बूस्ट में some है। जहां तक ​​मुझे पता है, कोई बूस्ट एएसआईओ आंतरिक कंटेनर नहीं है जिसे एक्सेस किया जा सकता है। संदेशों को संग्रहीत करना और पास करना कुछ ऐसा है जो आपको अपने कोड में करना है।

कॉल के बारे में एक अंतिम नोट io_service :: रन। काम करने के दौरान यह केवल ब्लॉक होगा। यह link देखें। उपरोक्त मेरे उदाहरण कोड में, रन विधि कहने से पहले io_service ऑब्जेक्ट में एक कार्य आइटम जोड़ा जाता है, इसलिए यह अनिश्चित काल तक अवरुद्ध होगा - जो मैं चाहता हूं। अगर मैं वास्तव में केवल एक धागा चाहता था, तो मैं कार्यकर्ता थ्रेड को कार्य विधि के साथ रन विधि को कॉल करने के लिए सेट कर सकता हूं ताकि यह अनिश्चित काल तक अवरुद्ध हो सके। यह सभी एसिंक्रोनस I/O से आने और सर्वर पर जाने से संभाल लेगा। कक्षा के अंदर, मैं एक इंटरफ़ेस विधि या दो लिखूंगा ताकि gui सर्वर पर डेटा भेज सके। ये विधियां async लेखन .vs का उपयोग कर सकती हैं। सिंच लिखने की विधि और इस प्रकार तुरंत वापस आ जाएगी - इसलिए आपका गुई लंबे समय तक अवरुद्ध नहीं होगा। आपको हैंडलवाइट विधि लिखनी होगी। मेरा कोड इसके साथ बहुत कुछ नहीं करता है - अगर कोई होता है तो बस एक त्रुटि लॉग करता है।

+0

आपको कार्यकर्ता थ्रेड में संदेश कैसे प्राप्त होते हैं, जो 'io_service.run()' पर अवरुद्ध है? – Xaqq

+0

ठीक है, मेरा सवाल था: क्या एक संदेश कतार एक ही io_service का उपयोग करके कार्यान्वित किया जा सकता है, जिसे मैं सॉकेट के साथ, स्ट्रैंड्स के साथ उपयोग करता हूं? और, यदि हां, तो कैसे? यह करने के लिए सबसे अच्छा तरीका क्या है? मैं अपनी खुद की संदेश कतार लागू नहीं करना चाहता हूं। यदि संभव हो तो मैं बूस्ट असियो का फायदा उठाना चाहता हूं। –

+0

सबसे पहले, io_service.Run() तब तक वापस आ जाएगा जब तक कि कोई कार्य आइटम लोड नहीं किया गया हो - मान लीजिए कि हम यहां किसी क्लाइंट के बारे में बात कर रहे हैं। सर्वर थोड़ा अलग हैं। इसलिए, सर्वर से आने और आने वाले दोनों अनुरोधों को संसाधित करने के लिए एक धागा का उपयोग किया जा सकता है। मेरा कोड 2 धागे का उपयोग करता है। सर्वर से आने वाले संदेशों को संसाधित करने के लिए एक थ्रेड का उपयोग किया जाता है - जो वह है जो io_service को कॉल करता है। एक कार्य आइटम के साथ चलाएं ताकि वह कभी वापस न आए। अन्य धागा सर्वर को संदेश भेजने के लिए ज़िम्मेदार है। मैं एक और पूर्ण कोड उदाहरण के साथ अपना जवाब संपादित कर रहा हूं। –

संबंधित मुद्दे