2011-10-13 20 views
20

यहाँ मेरी कार्यान्वयन है:बूस्ट asio async_write: async_write कॉल को अंतःस्थापित नहीं करना है?

  • ग्राहक एक संदेश ग्राहक बी के लिए
  • सर्वर संदेश प्रक्रिया async_read से सही मात्रा में डेटा की (भेजने और ब्लॉक करने के लिए नहीं आदेश में ग्राहक एक से नए डेटा के लिए इंतजार करेंगे क्लाइंट ए)
  • बाद में सर्वर जानकारी को संसाधित करेगा (शायद एक mysql क्वेरी करें) और फिर async_write के साथ क्लाइंट बी को संदेश भेजें।

समस्या यह है कि यदि क्लाइंट ए संदेश भेजता है तो वास्तव में तेज़, async_writes पिछले async_write हैंडलर से पहले इंटरलीव करेगा।

क्या इस समस्या से बचने के लिए कोई आसान तरीका है?

संपादित करें 1: एक ग्राहक सी बस ग्राहक एक के बाद, एक ही मुद्दा दिखाई देनी चाहिए ग्राहक बी को एक संदेश भेजता है ...

संपादित करें 2: यह काम करेगा? क्योंकि यह ब्लॉक करने के लिए लगता है, मैं नहीं जानता कि जहां ...

namespace structure {                
    class User {                  
    public:                   
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : 
     m_socket(io_service, context), m_strand(io_service), is_writing(false) {}  

    ssl_socket& getSocket() {              
     return m_socket;                
    }                    

    boost::asio::strand getStrand() {            
     return m_strand;                
    }                    

    void push(std::string str) {             
     m_strand.post(boost::bind(&structure::User::strand_push, this, str));   
    }                    

    void strand_push(std::string str) {            

     std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;  
     m_queue.push(str);               
     if (!is_writing) {               
     write();                 
     std::cout << "going to write" << std::endl;         
     }                    
     std::cout << "Already writing" << std::endl;         
    }                    

    void write() {                 
     std::cout << "writing" << std::endl;           
     is_writing = true;               
     std::string str = m_queue.front();           
     boost::asio::async_write(m_socket,           
           boost::asio::buffer(str.c_str(), str.size()),  
           boost::bind(&structure::User::sent, this)   
           );             
    }                    

    void sent() {                 
     std::cout << "sent" << std::endl;            
     m_queue.pop();                
     if (!m_queue.empty()) {              
     write();                 
     return;                  
     }                    
     else                   
     is_writing = false;               
     std::cout << "done sent" << std::endl;          
    }           

    private:          
    ssl_socket   m_socket;    
    boost::asio::strand m_strand;    
    std::queue<std::string>  m_queue;  
    bool      is_writing;  
    };           
}            

#endif 
+0

ध्यान दें कि async लिखने बहुत कम मूल्यवान से async पढ़ा है। अधिकांश लिखने वस्तुतः तत्काल होते हैं क्योंकि ओएस स्थानीय रूप से डेटा को बफर करेगा। दूसरी तरफ पढ़ता है रिमोट साइड के लिए इंतजार कर रहा है, और आप स्थानीय रूप से इसके बारे में कुछ भी नहीं कर सकते हैं। सिंक्रोनस लेखन इसलिए अनुक्रमण को कार्यान्वित करने का एक व्यवहार्य तरीका है। यह डेटा स्वामित्व के मुद्दे को भी हल करता है - ऊपर दिया गया कोड गलत है क्योंकि 'लिखना ('' रिटर्न' होता है, जो 'बूस्ट :: asio_async_write()' बफर तक पहुंचने से पहले हो सकता है। – MSalters

उत्तर

37

इस समस्या से बचने के लिए एक आसान तरीका है?

हां, प्रत्येक ग्राहक के लिए आउटगोइंग कतार बनाए रखें। async_write पूर्णता हैंडलर में कतार आकार का निरीक्षण करें, यदि शून्य न हो, तो async_write ऑपरेशन शुरू करें। यहां एक नमूना

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 

#include <deque> 
#include <iostream> 
#include <string> 

class Connection 
{ 
public: 
    Connection(
      boost::asio::io_service& io_service 
      ) : 
     _io_service(io_service), 
     _strand(_io_service), 
     _socket(_io_service), 
     _outbox() 
    { 

    } 

    void write( 
      const std::string& message 
      ) 
    { 
     _strand.post(
       boost::bind(
        &Connection::writeImpl, 
        this, 
        message 
        ) 
       ); 
    } 

private: 
    void writeImpl(
      const std::string& message 
      ) 
    { 
     _outbox.push_back(message); 
     if (_outbox.size() > 1) { 
      // outstanding async_write 
      return; 
     } 

     this->write(); 
    } 

    void write() 
    { 
     const std::string& message = _outbox[0]; 
     boost::asio::async_write(
       _socket, 
       boost::asio::buffer(message.c_str(), message.size()), 
       _strand.wrap(
        boost::bind(
         &Connection::writeHandler, 
         this, 
         boost::asio::placeholders::error, 
         boost::asio::placeholders::bytes_transferred 
         ) 
        ) 
       ); 
    } 

    void writeHandler(
      const boost::system::error_code& error, 
      const size_t bytesTransferred 
      ) 
    { 
     _outbox.pop_front(); 

     if (error) { 
      std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; 
      return; 
     } 

     if (!_outbox.empty()) { 
      // more messages to send 
      this->write(); 
     } 
    } 


private: 
    typedef std::deque<std::string> Outbox; 

private: 
    boost::asio::io_service& _io_service; 
    boost::asio::io_service::strand _strand; 
    boost::asio::ip::tcp::socket _socket; 
    Outbox _outbox; 
}; 

int 
main() 
{ 
    boost::asio::io_service io_service; 
    Connection foo(io_service); 
} 

कुछ महत्वपूर्ण बिंदुओं

  • boost::asio::io_service::strandConnection::_outbox
  • हैंडलर Connection::write() से भेजा जाता है, क्योंकि यह सार्वजनिक है

यह स्पष्ट नहीं था के लिए उपयोग की रक्षा करता है मेरे लिए यदि आप अपने प्रश्न में उदाहरण में समान प्रथाओं का उपयोग कर रहे थे क्योंकि सभी विधियां सार्वजनिक हैं।

+0

मैंने इस समाधान की कोशिश की है, बात यह है कि मेरे पास एकाधिक थ्रेड रनिंग रन() के साथ एक एकल io_service है, कतार पर डेटा को धक्का देने के लिए स्ट्रैंडपोस्ट का उपयोग करने के साथ भी बात यह है कि यह segfault लगता है क्योंकि इसे 2 अलग-अलग धागे से बुलाया जाता है ... कोई विचार क्यों? – TheSquad

+0

@TheSquad जो मुझे एक अलग प्रश्न की तरह लगता है। आपने संभवतः अपने तर्क को गलत तरीके से कार्यान्वित किया है, यह स्ट्रैंड और एकाधिक धागे के साथ करना आसान हो सकता है। कतार का उपयोग आपके मूल प्रश्न के लिए एक उपयुक्त समाधान है। –

+0

कतार से डेटा कब पॉप करना चाहिए, यह जानने के लिए आप क्या उपयोग करेंगे? – TheSquad

4

बस सैम के महान उत्तर में सुधार करने की कोशिश कर रहा है। सुधार बिंदु हैं:

  • async_write को पूरा करने, जिसका अर्थ है आप सभी इनपुट डेटा आप लिखने आपरेशन करने के लिए है कि आपूर्ति से पहले बफर (रों) से हर एक बाइट भेजने के लिए कठिन प्रयास करता है, अन्यथा टीसीपी पैकेट्स की तुलना में छोटे होने के कारण फ़्रेमिंग ओवरहेड बढ़ सकता है।

  • asio::streambuf, जबकि उपयोग करने में बहुत सुविधाजनक है, शून्य प्रतिलिपि नहीं है।नीचे दिया गया उदाहरण दर्शाता है कि शून्य-प्रति दृष्टिकोण: इनपुट डेटा भाग रखें जहां वे हैं और async_write के स्कैटर/इकट्ठा अधिभार का उपयोग करें जो इनपुट बफर का अनुक्रम लेता है (जो वास्तविक इनपुट डेटा के पॉइंटर्स हैं) ।

पूर्ण स्रोत कोड:

#include <boost/asio.hpp> 
#include <iostream> 
#include <memory> 
#include <mutex> 
#include <string> 
#include <thread> 
#include <unordered_set> 
#include <vector> 

using namespace std::chrono_literals; 
using boost::asio::ip::tcp; 

class Server 
{ 
    class Connection : public std::enable_shared_from_this<Connection> 
    { 
    friend class Server; 
    void ProcessCommand(const std::string& cmd) { 
     if (cmd == "stop") { 
     server_.Stop(); 
     return; 
     } 
     if (cmd == "") { 
     Close(); 
     return; 
     } 
     std::thread t([this, self = shared_from_this(), cmd] { 
     for (int i = 0; i < 30; ++i) { 
      Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n"); 
     } 
     server_.io_service_.post([this, self] { 
      DoReadCmd(); 
     }); 
     }); 
     t.detach(); 
    } 

    void DoReadCmd() { 
     read_timer_.expires_from_now(server_.read_timeout_); 
     read_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Read timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) { 
     read_timer_.cancel(); 
     if (!ec) { 
      const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data()); 
      std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1)); 
      buf_in_.consume(bytes_read); 
      ProcessCommand(cmd); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 

    void DoWrite() { 
     active_buffer_ ^= 1; // switch buffers 
     for (const auto& data : buffers_[active_buffer_]) { 
     buffer_seq_.push_back(boost::asio::buffer(data)); 
     } 
     write_timer_.expires_from_now(server_.write_timeout_); 
     write_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Write timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) { 
     write_timer_.cancel(); 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_].clear(); 
     buffer_seq_.clear(); 
     if (!ec) { 
      std::cout << "Wrote " << bytes_transferred << " bytes\n"; 
      if (!buffers_[active_buffer_^1].empty()) // have more work 
      DoWrite(); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 
    bool Writing() const { return !buffer_seq_.empty(); } 

    Server& server_; 
    boost::asio::streambuf buf_in_; 
    std::mutex buffers_mtx_; 
    std::vector<std::string> buffers_[2]; // a double buffer 
    std::vector<boost::asio::const_buffer> buffer_seq_; 
    int active_buffer_ = 0; 
    bool closing_ = false; 
    bool closed_ = false; 
    boost::asio::deadline_timer read_timer_, write_timer_; 
    tcp::socket socket_; 
    public: 
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) { 
    } 

    void Start() { 
     socket_.set_option(tcp::no_delay(true)); 
     DoReadCmd(); 
    } 

    void Close() { 
     closing_ = true; 
     if (!Writing()) 
     Shutdown(); 
    } 

    void Shutdown() { 
     if (!closed_) { 
     closing_ = closed_ = true; 
     boost::system::error_code ec; 
     socket_.shutdown(tcp::socket::shutdown_both, ec); 
     socket_.close(); 
     server_.active_connections_.erase(shared_from_this()); 
     } 
    } 

    void Write(std::string&& data) { 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_^1].push_back(std::move(data)); // move input data to the inactive buffer 
     if (!Writing()) 
     DoWrite(); 
    } 

    }; 

    void DoAccept() { 
    if (acceptor_.is_open()) { 
     auto session = std::make_shared<Connection>(*this); 
     acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) { 
     if (!ec) { 
      active_connections_.insert(session); 
      session->Start(); 
     } 
     DoAccept(); 
     }); 
    } 
    } 

    boost::asio::io_service io_service_; 
    tcp::acceptor acceptor_; 
    std::unordered_set<std::shared_ptr<Connection>> active_connections_; 
    const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30); 
    const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30); 

public: 
    Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { } 

    void Run() { 
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n"; 
    DoAccept(); 
    io_service_.run(); 
    } 

    void Stop() { 
    acceptor_.close(); 
    { 
     std::vector<std::shared_ptr<Connection>> sessionsToClose; 
     copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose)); 
     for (auto& s : sessionsToClose) 
     s->Shutdown(); 
    } 
    active_connections_.clear(); 
    io_service_.stop(); 
    } 

}; 

int main() { 
    try { 
    Server srv(8888); 
    srv.Run(); 
    } 
    catch (const std::exception& e) { 
    std::cerr << "Error: " << e.what() << "\n"; 
    } 
} 
संबंधित मुद्दे