2015-06-16 6 views
6

मैंने जेडएमक्यू (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching) के अंतिम मूल्य कैशिंग (एलवीसी) उदाहरण को लागू किया है, लेकिन बैकएंड पर पंजीकरण करने के लिए दूसरा ग्राहक नहीं प्राप्त कर सकता है।जेडएमक्यू: एकाधिक ग्राहकों के लिए XPUB सॉकेट पर कोई सदस्यता संदेश नहीं (अंतिम मूल्य कैशिंग पैटर्न)

पहली बार एक ग्राहक बोर्ड पर आता है, event[0] == b'\x01' स्थिति पूरी हो जाती है और कैश किए गए मान को भेजा जाता है, लेकिन दूसरा ग्राहक (एक ही विषय) पंजीकरण भी नहीं करता है (if backend in events: कभी सच नहीं है)। बाकी सब कुछ ठीक काम करता है। डेटा प्रकाशक से ग्राहकों को (सभी) पास हो जाता है।

इसका कारण क्या हो सकता है? बैकएंड सही तरीके से जुड़ा हुआ तरीका है? क्या यह पैटर्न केवल पहले ग्राहक के साथ काम करना चाहिए?

अद्यतन

जब मैं एक और विषय के लिए 2 ग्राहक सदस्यता लेते हैं, मैं सही व्यवहार मिलता है (अर्थात \x01 जब सदस्यता लेने)। यह वास्तव में पहले ग्राहक के लिए काम करता प्रतीत होता है। ZeroMQ में एक बग है?

अद्यतन 2

यहाँ एक न्यूनतम काम कर उदाहरण से पता चलता है कि उस LVC पैटर्न काम नहीं कर रहा (वैसे कम से कम नहीं इसे यहाँ लागू हो जाता है) है।

# subscriber.py 
import zmq 

def main(): 
    ctx = zmq.Context.instance() 
    sub = ctx.socket(zmq.SUB) 
    sub.connect("tcp://127.0.0.1:5558") 

    # Subscribe to every single topic from publisher 
    print 'subscribing (sub side)' 
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic") 

    poller = zmq.Poller() 
    poller.register(sub, zmq.POLLIN) 
    while True: 
     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if sub in events: 
      msg = sub.recv_multipart() 
      topic, current = msg 
      print 'received %s on topic %s' % (current, topic) 

if __name__ == '__main__': 
    main() 

और यहाँ दलाल है (उदाहरण के रूप में, लेकिन थोड़ा अधिक शब्दाडंबर और एक एकीकृत प्रकाशक के साथ)।

# broker.py 
# from http://zguide.zeromq.org/py:lvcache 
import zmq 
import threading 
import time 


class Publisher(threading.Thread): 
    def __init__(self): 
     super(Publisher, self).__init__() 

    def run(self): 
     time.sleep(10) 
     ctx = zmq.Context.instance() 
     pub = ctx.socket(zmq.PUB) 
     pub.connect("tcp://127.0.0.1:5557") 

     cnt = 0 
     while True: 
      msg = 'hello %d' % cnt 
      print 'publisher is publishing %s' % msg 
      pub.send_multipart(['my-topic', msg]) 
      cnt += 1 
      time.sleep(5) 


def main(): 
    ctx = zmq.Context.instance() 
    frontend = ctx.socket(zmq.SUB) 
    frontend.bind("tcp://*:5557") 
    backend = ctx.socket(zmq.XPUB) 
    backend.bind("tcp://*:5558") 

    # Subscribe to every single topic from publisher 
    frontend.setsockopt(zmq.SUBSCRIBE, b"") 

    # Store last instance of each topic in a cache 
    cache = {} 

    # We route topic updates from frontend to backend, and 
    # we handle subscriptions by sending whatever we cached, 
    # if anything: 
    poller = zmq.Poller() 
    poller.register(frontend, zmq.POLLIN) 
    poller.register(backend, zmq.POLLIN) 


    # launch a publisher 
    p = Publisher() 
    p.daemon = True 
    p.start() 

    while True: 

     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if frontend in events: 
      msg = frontend.recv_multipart() 
      topic, current = msg 
      cache[topic] = current 
      backend.send_multipart(msg) 

     ### this is where it fails for the 2nd subscriber. 
     ### There's never even an event from the backend 
     ### in events when the 2nd subscriber is subscribing. 

     # When we get a new subscription we pull data from the cache: 
     if backend in events: 
      print 'message from subscriber' 
      event = backend.recv() 
      # Event is one byte 0=unsub or 1=sub, followed by topic 
      if event[0] == b'\x01': 
       topic = event[1:] 
       print ' => subscribe to %s' % topic 
       if topic in cache: 
        print ("Sending cached topic %s" % topic) 
        backend.send_multipart([ topic, cache[topic] ]) 
      elif event[0] == b'\x00': 
       topic = event[1:] 
       print ' => unsubscribe from %s' % topic 

if __name__ == '__main__': 
    main() 

इस कोड चल रहा है (1 x broker.py, 2 एक्स subscriber.py) से पता चलता है कि पहले ग्राहक दलाल पर पंजीकृत करता है के रूप में उम्मीद (\x01 और कैश देखने), लेकिन 2 ग्राहक उसी तरह पंजीकृत नहीं है। दिलचस्प बात यह है कि दूसरा ग्राहक पब/उप चैनल तक लगा हुआ है, थोड़ी देर के बाद (10 सेकंड) दोनों ग्राहकों को प्रकाशक से डेटा प्राप्त होता है।

यह बहुत अजीब है। शायद मेरे कुछ पुस्तकालय पुराने हैं। यहाँ मैं क्या मिला है:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import zmq 
>>> zmq.__version__ 
'14.1.1' 

$ brew info zeromq 
zeromq: stable 4.0.5 (bottled), HEAD 
High-performance, asynchronous messaging library 
http://www.zeromq.org/ 
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) * 
    Poured from bottle 
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb 
==> Dependencies 
Build: pkg-config ✔ 
Optional: libpgm ✘, libsodium ✘ 

अद्यतन 3

यह व्यवहार भी (के साथ या libpgm और libsodium स्थापित बिना) zeromq 4.1.2 और pyzmq-14.7.0 में मनाया जा सकता है।

अद्यतन 4

एक और अवलोकन पता चलता है कि पहले ग्राहक किसी भी तरह अलग ढंग से नियंत्रित किया जाता है: पहला ग्राहक \x00 साथ अपनी सदस्यता विषय पूर्ववर्ती द्वारा XPUB सॉकेट (backend) से उम्मीद तरह से सदस्यता समाप्ति के एक ही है । अन्य ग्राहकों (मैंने 2 से अधिक की कोशिश की) बैकएंड चैनल पर म्यूट कर रहे थे (हालांकि संदेश प्राप्त करना)।

अद्यतन 5

मुझे आशा है कि मैं नीचे एक खरगोश की मांद नहीं जा रहा हूँ, लेकिन मैं czmq बाइंडिंग में देखा और सी में मेरी अजगर उदाहरण दौड़ा लिया हैपरिणाम एक जैसे हैं, इसलिए मुझे लगता है कि यह बाइंडिंग के साथ कोई समस्या नहीं है, लेकिन libzmq के साथ।

मैं भी पुष्टि की कि 2 ग्राहक संदेश सदस्यता ले भेज रहा है और वास्तव में मैं तार पर यह देख सकते हैं:

पहले सदस्यता ली है:

0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? [email protected]@... 
0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q. 
0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3...... 
0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to 
0040 70 69 63           pic    

2 (ऊपर करने के लिए) अंतर के साथ संदेश सदस्यता ले चिह्नित और समझाया। सब्सक्राइब फ्रेम में वही डेटा भेजा जाता है।

       identification 
           v 
0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? [email protected]@... 
          src port  sequence number 
            v  v v v v 
0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........ 

Acknowledgement number window scaling factor 
     v v v v   v 
0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3...... 

timestamp value timestamp echo reply 
      v   v v |<-------- data ------- 
0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to 

     ------>| 
0040 70 69 63           pic    

उत्तर

7

मैं इस समस्या के लिए समाधान मिल गया, और भले ही मैं वापस करने के लिए डॉक्स सामने पढ़ सकते हैं और वापस सामने करने के लिए, मैं इसे नहीं देखा था। कुंजी XPUB_VERBOSE है। बैकएंड initialisation के बाद करने के लिए इस पंक्ति जोड़ें और सब कुछ काम करता है ठीक

backend.setsockopt(zmq.XPUB_VERBOSE, True) 

यहाँ एक उद्धरण from the official documentation:

ZMQ_XPUB_VERBOSE: XPUB सॉकेट पर सभी सदस्यता संदेशों प्रदान नई सदस्यता और पर XPUB सॉकेट व्यवहार सेट unsubscriptions। 0 का मान डिफ़ॉल्ट है और अपस्ट्रीम के लिए केवल नए सदस्यता संदेश पास करता है। 1 का मान सभी सदस्यता संदेश अपस्ट्रीम पास करता है।

विकल्प मूल्य प्रकार int विकल्प मूल्य इकाई 0, 1 डिफ़ॉल्ट मान 0 लागू सॉकेट प्रकार ZMQ_XPUB

पीटर हिंटजेन्स इस in his blog पर कुछ अधिक जानकारी है।

कुछ महीने पहले हम XPUB सॉकेट कि डुप्लिकेट सदस्यता के अपने छानने को निष्क्रिय करने के लिए एक साफ थोड़ा विकल्प (ZMQ_XPUB_VERBOSE) कहा: यह प्रासंगिक अनुभाग है। अब यह किसी भी संख्या के ग्राहकों के लिए काम करता है। हम इसका उपयोग इस प्रकार है: इस पद्धति के रूप में नहीं तो काम नहीं करेगा

void *publisher = zsocket_new (ctx, ZMQ_XPUB); 
zsocket_set_xpub_verbose (publisher, 1); 
zsocket_bind (publisher, "tcp://*:6001"); 

LVC पैटर्न विवरण, इस सेटिंग को प्रतिबिंबित करने के लिए अद्यतन किया जाना चाहिए।

संबंधित मुद्दे