2012-09-19 18 views
28

क्या आपके पास कोई संकेत है कि सदस्यता समस्या कब हुई है, तो मैं फिर से कनेक्ट कर सकता हूं?RabbitMQ C# ड्राइवर संदेश प्राप्त करना बंद कर देता है

मेरी सेवा RabbitMQ.Client.MessagePatterns का उपयोग करती है। इसकी सदस्यता के लिए सदस्यता। कुछ समय बाद, मेरा ग्राहक चुपचाप संदेशों को प्राप्त करना बंद कर देता है। मुझे नेटवर्क के मुद्दों पर संदेह है क्योंकि मेरा वीपीएन कनेक्शन सबसे विश्वसनीय नहीं है।

मैंने थोड़ी देर के लिए दस्तावेज़ों को पढ़ा है, यह पता लगाने के लिए कि जब यह नेटवर्क बिना किसी किस्मत के नेटवर्क समस्या के कारण टूटा जा सकता है। मैंने यह जांचने की कोशिश की है कि कनेक्शन और चैनल अभी भी खुले हैं, लेकिन यह हमेशा रिपोर्ट करता है कि यह अभी भी खुला है।

संदेश जो संदेश करता है वह काफी अच्छी तरह से काम करता है और कतार में वापस स्वीकार किया जाता है, इसलिए मुझे नहीं लगता कि यह "एएके" के साथ एक मुद्दा है।

मुझे यकीन है कि मुझे बस कुछ आसान याद आना चाहिए, लेकिन मुझे अभी तक यह नहीं मिला है।

public void Run(string brokerUri, Action<byte[]> handler) 
{ 
    log.Debug("Connecting to broker: {0}".Fill(brokerUri)); 
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri }; 

    using (IConnection connection = factory.CreateConnection()) 
    { 
     using (IModel channel = connection.CreateModel()) 
     { 
      channel.QueueDeclare(queueName, true, false, false, null); 

      using (Subscription subscription = new Subscription(channel, queueName, false)) 
      { 
       while (!Cancelled) 
       { 
        BasicDeliverEventArgs args; 

        if (!channel.IsOpen) 
        { 
         log.Error("The channel is no longer open, but we are still trying to process messages."); 
         throw new InvalidOperationException("Channel is closed."); 
        } 
        else if (!connection.IsOpen) 
        { 
         log.Error("The connection is no longer open, but we are still trying to process message."); 
         throw new InvalidOperationException("Connection is closed."); 
        } 

        bool gotMessage = subscription.Next(250, out args); 

        if (gotMessage) 
        { 
         log.Debug("Received message"); 
         try 
         { 
          handler(args.Body); 
         } 
         catch (Exception e) 
         { 
          log.Debug("Exception caught while processing message. Will be bubbled up.", e); 
          throw; 
         } 

         log.Debug("Acknowledging message completion"); 
         subscription.Ack(args); 
        } 
       } 
      } 
     } 
    } 
} 

अद्यतन:

मैं एक आभासी मशीन में सर्वर चल रहा है और मैं एक अपवाद (RabbitMQ.Client.Exceptions.OperationInterruptedException: AMQP आपरेशन बाधित किया गया था) मिलता है द्वारा एक नेटवर्क विफलता नकली जब मैं लंबे समय तक कनेक्शन तोड़ता हूं तो शायद यह नेटवर्क समस्या नहीं है। अब मुझे नहीं पता कि यह क्या होगा लेकिन यह केवल कुछ घंटों के चलने में विफल रहता है।

उत्तर

50

संपादित करें: जब से मैं चौखट इस पर upvotes हो रही हूँ, मैं कहना चाहिए नेट RabbitMQ ग्राहक अब इस कार्यक्षमता में बनाया गया है कि: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

आदर्श रूप में, आप और मैन्युअल रूप से बचने के लिए इस का उपयोग करने में सक्षम होना चाहिए पुन: कनेक्शन तर्क लागू करना।


मुझे हाल ही में लगभग एक ही चीज़ लागू करना पड़ा। जो मैं बता सकता हूं, सेबिटएमक्यू पर उपलब्ध अधिकांश जानकारी मानती है कि या तो आपका नेटवर्क बहुत विश्वसनीय है या आप एक ही मशीन पर एक खरगोश एमक्यू ब्रोकर चलाते हैं क्योंकि कोई क्लाइंट संदेश भेज रहा है या प्राप्त कर रहा है, जिससे खरगोश किसी भी कनेक्शन के मुद्दों से निपटने की इजाजत देता है।

खरगोश क्लाइंट को गिराए गए कनेक्शन के खिलाफ मजबूत होने के लिए वास्तव में मुश्किल नहीं है, लेकिन कुछ idiosyncrasies हैं जिनसे आपको निपटने की आवश्यकता है।

पहली बात दिल की धड़कन को चालू करने की जरूरत है:

ConnectionFactory factory = new ConnectionFactory() 
{ 
    Uri = brokerUri, 
    RequestedHeartbeat = 30, 
}; 

30 करने के लिए "RequestedHeartbeat" की स्थापना ग्राहक की जांच हर 30 सेकंड कर देगा अगर कनेक्शन अभी भी जीवित है। इसके बिना, संदेश ग्राहक खुशी से बैठेगा कि किसी अन्य संदेश के बिना किसी सुराग के आने का इंतजार है कि उसका कनेक्शन खराब हो गया है।

दिल की धड़कन को चालू करने से सर्वर यह देखने के लिए जांच करता है कि कनेक्शन अभी भी ऊपर है या नहीं, जो बहुत महत्वपूर्ण हो सकता है। यदि ग्राहक द्वारा संदेश उठाए जाने के बाद कोई कनेक्शन खराब हो जाता है लेकिन इसे स्वीकार करने से पहले, सर्वर बस मानता है कि ग्राहक लंबे समय से ले रहा है, और संदेश बंद होने तक मृत कनेक्शन पर "अटक गया" हो जाता है। दिल की धड़कन चालू होने पर, सर्वर कब पहचानता है कि कनेक्शन खराब हो जाता है और इसे बंद कर देता है, संदेश को कतार में वापस डाल देता है ताकि कोई अन्य ग्राहक इसे संभाल सके। दिल की धड़कन के बिना, मुझे मैन्युअल रूप से जाना होगा और खरगोश प्रबंधन यूआई में कनेक्शन बंद करना होगा ताकि अटक गया संदेश किसी ग्राहक को पास कर सके।

दूसरा, आपको OperationInterruptedException को संभालने की आवश्यकता होगी। जैसा कि आपने देखा है, यह आमतौर पर अपवाद है जब खरगोश क्लाइंट फेंक देगा जब यह नोटिस करता है कि कनेक्शन बाधित हो गया है। यदि IModel.QueueDeclare() को कॉल किया गया है जब कनेक्शन बाधित हो गया है, तो यह अपवाद है जो आपको मिलेगा। अपनी सदस्यता, चैनल, और कनेक्शन का निपटारा करके और नए बनाने के द्वारा इस अपवाद को संभालें।

अंत में, बंद कनेक्शन से संदेशों का उपभोग करने का प्रयास करते समय आपको अपने उपभोक्ता को क्या करना होगा, इसे संभालना होगा। दुर्भाग्यवश, खरगोश के ग्राहक में कतार से संदेश लेने का प्रत्येक अलग तरीका अलग-अलग प्रतिक्रिया करता है। QueueingBasicConsumerEndOfStreamException फेंकता है यदि आप बंद कनेक्शन पर QueueingBasicConsumer.Queue.Dequeue पर कॉल करते हैं। EventingBasicConsumer कुछ भी नहीं करता है, क्योंकि यह सिर्फ एक संदेश की प्रतीक्षा कर रहा है। मैं इसे कोशिश करने से क्या कह सकता हूं, Subscription कक्षा जिसका उपयोग आप कर रहे हैं, कॉल से Subscription.Next पर वापस लौटने लगता है, लेकिन args का मान शून्य है। एक बार फिर, अपने कनेक्शन, चैनल, और सदस्यता का निपटान करके और उन्हें पुनर्निर्माण करके इसे संभाल लें।

connection.IsOpen का मान गलत होने पर अपडेट किया जाएगा जब कनेक्शन दिल की धड़कन के साथ विफल रहता है, ताकि आप यह जांच सकें कि आप चाहें तो। हालांकि, चूंकि दिल की धड़कन एक अलग थ्रेड पर चलती है, इसलिए आपको उस मामले को संभालने की आवश्यकता होगी जहां कनेक्शन जांचने पर कनेक्शन खुलता है, लेकिन subscription.Next() से पहले बंद हो जाता है।

IConnection.Dispose() के लिए बाहर देखने के लिए एक अंतिम बात है। यदि कनेक्शन बंद होने के बाद आप निपटान करते हैं तो यह कॉल EndOfStreamException फेंक देगा। यह मेरे लिए एक बग जैसा लगता है, और मुझे IDisposable ऑब्जेक्ट पर निपटान नहीं करना पसंद है, इसलिए मैं इसे कॉल करता हूं और अपवाद निगलता हूं।

कि सबको एक साथ रखें एक त्वरित और गंदा उदाहरण में:

public bool Cancelled { get; set; } 

IConnection _connection = null; 
IModel _channel = null; 
Subscription _subscription = null; 

public void Run(string brokerUri, string queueName, Action<byte[]> handler) 
{ 
    ConnectionFactory factory = new ConnectionFactory() 
    { 
     Uri = brokerUri, 
     RequestedHeartbeat = 30, 
    }; 

    while (!Cancelled) 
    {    
     try 
     { 
      if(_subscription == null) 
      { 
       try 
       { 
        _connection = factory.CreateConnection(); 
       } 
       catch(BrokerUnreachableException) 
       { 
        //You probably want to log the error and cancel after N tries, 
        //otherwise start the loop over to try to connect again after a second or so. 
        continue; 
       } 

       _channel = _connection.CreateModel(); 
       _channel.QueueDeclare(queueName, true, false, false, null); 
       _subscription = new Subscription(_channel, queueName, false); 
      } 

      BasicDeliverEventArgs args; 
      bool gotMessage = _subscription.Next(250, out args); 
      if (gotMessage) 
      { 
       if(args == null) 
       { 
        //This means the connection is closed. 
        DisposeAllConnectionObjects(); 
        continue; 
       } 

       handler(args.Body); 
       _subscription.Ack(args); 
      } 
     } 
     catch(OperationInterruptedException ex) 
     { 
      DisposeAllConnectionObjects(); 
     } 
    } 
    DisposeAllConnectionObjects(); 
} 

private void DisposeAllConnectionObjects() 
{ 
    if(_subscription != null) 
    { 
     //IDisposable is implemented explicitly for some reason. 
     ((IDisposable)_subscription).Dispose(); 
     _subscription = null; 
    } 

    if(_channel != null) 
    { 
     _channel.Dispose(); 
     _channel = null; 
    } 

    if(_connection != null) 
    { 
     try 
     { 
      _connection.Dispose(); 
     } 
     catch(EndOfStreamException) 
     { 
     } 
     _connection = null; 
    } 
} 
+2

वाह। यह बहुत अच्छा लग रहा है। मैंने आज सुबह मेरी सेवा में कोड किया है और इसे तैनात किया है। आपने मुझे बहुत समय बचा लिया है। –

संबंधित मुद्दे