2011-06-23 11 views
7

का उपयोग कर उपभोग करने के बाद हॉर्नेटक्यू संदेश अभी भी कतार में शेष हैं, मैं हॉर्नेटक्यू में नया हूं इसलिए कृपया मेरे साथ भालू। मुझे सबसे पहले आपको अपनी आवश्यकताओं को बताने दें:कोर एपीआई

मुझे मिडलवेयर कतार में एक संदेश की आवश्यकता है जो कम विलंबता और दृढ़ता के साथ विभिन्न प्रक्रियाओं के बीच संदेशों को लगभग 1k तक पास कर सकता है (यानी यह सिस्टम क्रैश से बचना चाहिए)। मेरे पास एक ही कतार में कई प्रक्रियाओं को पढ़ने और समान कतार से पढ़ने के लिए कई प्रक्रियाएं होंगी।

इसके लिए मैंने हॉर्नेटक चुना क्योंकि यह दृढ़ता से गुज़रने वाले संदेश के लिए सबसे अच्छी रेटिंग है।

मैं वर्तमान के रूप में खड़े अकेले सर्वरHornetq v2.2.2Final usung कर रहा हूँ।
मैं सफलतापूर्वक टिकाऊ/गैर-टिकाऊ कोर एपीआई(ClientSession) का उपयोग कर कतारों बनाने के लिए, और सफलतापूर्वक संदेश पोस्ट क़तार में (ClientProducer) कर रहा हूँ।
इसी प्रकार मैं कोर एपीआई (क्लाइंटकंस्यूमर) का उपयोग कर कतार से संदेशों को पढ़ने में सक्षम हूं।

ग्राहक के संदेश को पढ़ने के बाद समस्या तब आती है, संदेश अभी भी कतार में रहता है, यानी कतार में संदेशों की संख्या निरंतर बनी हुई है। हो सकता है कि मुझे यह गलत लगे लेकिन मैं इस छाप के तहत था कि एक बार संदेश (पढ़ें + एएके), इसे कतार से हटा दिया गया है। लेकिन यह मेरे मामले में नहीं हो रहा है, और वही संदेश हो रहा है बार-बार पढ़ें।

इसके अलावा, मैं यह बताना चाहता हूं कि मैंने गैर-टिकाऊ संदेशों के साथ गैर-टिकाऊ कतारों का उपयोग करने का प्रयास किया है। लेकिन समस्या बनी हुई है। निर्माता के लिए

कोड है कि मैं उपयोग कर रहा हूँ:

public class HQProducer implements Runnable { 

    private ClientProducer producer; 
    private boolean killme; 
    private ClientSession session; 
    private boolean durableMsg; 

    public HQProducer(String host, int port, String address, String queueName, 
      boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { 
     this.durableMsg = durableMsg; 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      if (queueExists(queueName)) { 
       if (deleteQ) { 
        System.out.println("Deleting existing queue :: " + queueName); 
        session.deleteQueue(queueName); 
        System.out.println("Creating queue :: " + queueName); 
        session.createQueue(address, queueName, true); 
       } 
      } else { 
       System.out.println("Creating new queue :: " + queueName); 
       session.createQueue(address, queueName, durable); 
      } 
      producer = session.createProducer(SimpleString.toSimpleString(address), pRate); 

      killme = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killme) { 
      try { 
       ClientMessage message = session.createMessage(durableMsg); 

       message.getBodyBuffer().writeString("Hello world"); 

       producer.send(message); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("Producer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killme) { 
     this.killme = killme; 
    } 

    private boolean queueExists(String qname) { 
     boolean res = false; 
     try { 
      //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); 
      QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); 
      if (queueQuery.isExists()) { 
       res = true; 
      } 
     } catch (HornetQException ex) { 
      res = false; 
     } 
     return res; 
    } 
} 

इसके अलावा उपभोक्ता के लिए कोड है:

public class HQConsumer implements Runnable { 

    private ClientSession session; 
    private ClientConsumer consumer; 
    private boolean killMe; 

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) { 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      session.start(); 

      consumer = session.createConsumer(queueName, "",0,-1,browseOnly); 

      killMe = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killMe) { 
      try { 
       ClientMessage msgReceived = consumer.receive(); 
       msgReceived.acknowledge(); 
       //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("ConSumer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killMe) { 
     this.killMe = killMe; 
    } 
} 

HornetQ सर्वर config ::

<configuration xmlns="urn:hornetq" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> 

    <paging-directory>${data.dir:../data}/paging</paging-directory> 

    <bindings-directory>${data.dir:../data}/bindings</bindings-directory> 

    <journal-directory>${data.dir:../data}/journal</journal-directory> 

    <journal-min-files>10</journal-min-files> 

    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> 

    <connectors> 
     <connector name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </connector> 

     <connector name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     </connector> 
    </connectors> 

    <acceptors> 
     <acceptor name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </acceptor> 

     <acceptor name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     <param key="direct-deliver" value="false"/> 
     </acceptor> 
    </acceptors> 

    <security-settings> 
     <security-setting match="#"> 
     <permission type="createNonDurableQueue" roles="guest"/> 
     <permission type="deleteNonDurableQueue" roles="guest"/> 
     <permission type="createDurableQueue" roles="guest"/> 
     <permission type="deleteDurableQueue" roles="guest"/> 
     <permission type="consume" roles="guest"/> 
     <permission type="send" roles="guest"/> 
     </security-setting> 
    </security-settings> 

    <address-settings> 
     <!--default for catch all--> 
     <address-setting match="#"> 
     <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
     <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
     <redelivery-delay>0</redelivery-delay> 
     <max-size-bytes>10485760</max-size-bytes>  
     <message-counter-history-day-limit>10</message-counter-history-day-limit> 
     <address-full-policy>BLOCK</address-full-policy> 
     </address-setting> 
    </address-settings> 

</configuration> 
+0

एक/ग के लिए [इस] (http के लिए धन्यवाद: // docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354) प्रसंस्करण के बाद आपको संदेश स्वीकार करने की आवश्यकता है क्या आप वही कर रहे हैं? –

उत्तर

13

hornetq कोर API के साथ आपको स्पष्ट रूप से एक संदेश देना होगा। मैं नहीं देखता कि यह आपके परीक्षण में कहां हो रहा है।

यदि आप नक़्क़ाशी नहीं कर रहे हैं, तो यही कारण है कि आपके संदेश अवरुद्ध हो रहे हैं। आपको एक पूर्ण उत्तर देने के लिए मुझे अपना पूरा उदाहरण देखना होगा।

इसके अलावा: आप के साथ अपने createSession परिभाषित करना चाहिए: createSession (सच, सच, 0)

कोर एपीआई बैच ACKs का विकल्प होता है। आप एक ट्रांज़ेक्टेड सत्र का उपयोग नहीं कर रहे हैं, इसलिए जब तक आप अपने सर्वर लोकेटर पर कॉन्फ़िगर किए गए ackBatchSize तक नहीं पहुंच जाते, तब तक आप सर्वर पर एएक्स नहीं भेजेंगे। इस जगह के साथ, जैसे ही आप अपने संदेश पर पावती() को कॉल करते हैं, सर्वर पर कोई भी एक भेजा जाएगा।

जिस विकल्प का आप वर्तमान में उपयोग कर रहे हैं वह एक निश्चित DUPS_SIZE के साथ JMS DUPS_OK ​​के बराबर है।

+1

'क्लाइंट मैसेज msgReceived = consumer.receive(); msgReceived.acknowledge(); '.. मैं कोड –

+0

को स्वीकार कर रहा हूं कोर एपीआई में एसीके बैच करने का विकल्प है। आप एक ट्रांज़ेक्टेड सत्र का उपयोग नहीं कर रहे हैं, इसलिए जब तक आप अपने सर्वर लोकेटर पर कॉन्फ़िगर किए गए ackBatchSize तक नहीं पहुंच जाते, तब तक आप सर्वर पर एएक्स नहीं भेजेंगे। आपको अपने निर्माण सत्र को परिभाषित करना चाहिए: create सत्र (सत्य, सत्य, 0); इस जगह के साथ, आपके संदेश –

+1

पर किसी भी एएके को सर्वर पर भेजा जाएगा जैसे ही आप इस धागे पर वापस नहीं आये। तो मुझे लगता है कि आप अपनी समस्या तय कर चुके हैं? –

2

स्थापना ackbatchsize मुझे मदद की समस्या को हल करने (पोस्ट आप के साथ कुछ यात्रा के बाद अपने प्रारंभिक जवाब संपादित) .. मदद

+2

आपको शायद यहां किसी एक जवाब में वोट देना चाहिए। –