का उपयोग कर उपभोग करने के बाद हॉर्नेटक्यू संदेश अभी भी कतार में शेष हैं, मैं हॉर्नेटक्यू में नया हूं इसलिए कृपया मेरे साथ भालू। मुझे सबसे पहले आपको अपनी आवश्यकताओं को बताने दें:कोर एपीआई
मुझे मिडलवेयर कतार में एक संदेश की आवश्यकता है जो कम विलंबता और दृढ़ता के साथ विभिन्न प्रक्रियाओं के बीच संदेशों को लगभग 1k तक पास कर सकता है (यानी यह सिस्टम क्रैश से बचना चाहिए)। मेरे पास एक ही कतार में कई प्रक्रियाओं को पढ़ने और समान कतार से पढ़ने के लिए कई प्रक्रियाएं होंगी।
इसके लिए मैंने हॉर्नेटक चुना क्योंकि यह दृढ़ता से गुज़रने वाले संदेश के लिए सबसे अच्छी रेटिंग है।
मैं वर्तमान के रूप में खड़े अकेले सर्वरHornetq v2.2.2Final usung कर रहा हूँ।
मैं सफलतापूर्वक टिकाऊ/गैर-टिकाऊ कोर एपीआई(ClientSession) का उपयोग कर कतारों बनाने के लिए, और सफलतापूर्वक संदेश पोस्ट क़तार में (ClientProducer) कर रहा हूँ।
इसी प्रकार मैं कोर एपीआई (क्लाइंटकंस्यूमर) का उपयोग कर कतार से संदेशों को पढ़ने में सक्षम हूं।
ग्राहक के संदेश को पढ़ने के बाद समस्या तब आती है, संदेश अभी भी कतार में रहता है, यानी कतार में संदेशों की संख्या निरंतर बनी हुई है। हो सकता है कि मुझे यह गलत लगे लेकिन मैं इस छाप के तहत था कि एक बार संदेश (पढ़ें + एएके), इसे कतार से हटा दिया गया है। लेकिन यह मेरे मामले में नहीं हो रहा है, और वही संदेश हो रहा है बार-बार पढ़ें।
इसके अलावा, मैं यह बताना चाहता हूं कि मैंने गैर-टिकाऊ संदेशों के साथ गैर-टिकाऊ कतारों का उपयोग करने का प्रयास किया है। लेकिन समस्या बनी हुई है। निर्माता के लिए
कोड है कि मैं उपयोग कर रहा हूँ:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
if (queueExists(queueName)) {
if (deleteQ) {
System.out.println("Deleting existing queue :: " + queueName);
session.deleteQueue(queueName);
System.out.println("Creating queue :: " + queueName);
session.createQueue(address, queueName, true);
}
} else {
System.out.println("Creating new queue :: " + queueName);
session.createQueue(address, queueName, durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address), pRate);
killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);
message.getBodyBuffer().writeString("Hello world");
producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("Producer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killme) {
this.killme = killme;
}
private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (HornetQException ex) {
res = false;
}
return res;
}
}
इसके अलावा उपभोक्ता के लिए कोड है:
public class HQConsumer implements Runnable {
private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;
public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
session.start();
consumer = session.createConsumer(queueName, "",0,-1,browseOnly);
killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.acknowledge();
//System.out.println("message = " + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("ConSumer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}
HornetQ सर्वर config ::
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
एक/ग के लिए [इस] (http के लिए धन्यवाद: // docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354) प्रसंस्करण के बाद आपको संदेश स्वीकार करने की आवश्यकता है क्या आप वही कर रहे हैं? –