कम से कम ActiveMQ में क्या आप चाहते हैं पूरी तरह से समर्थित है, उसका नाम VirtualTopic
है अवधारणा है:
- आप बनाने के एक VirtualTopic (सीधे शब्दों में उपसर्ग
VirtualTopic.
का उपयोग कर एक विषय बनाने) जैसे । VirtualTopic.Color
- इस वर्चुअलटॉपिक इस पैटर्न से मेल खाने वाला उपभोक्ता
Consumer.<clientName>.VirtualTopic.<topicName>
उदाहरण के लिए सदस्यता लेने वाला उपभोक्ता बनाएं। Consumer.client1.VirtualTopic.Color
, यह कर रही है, ActiveMQ उस नाम का कोई कतार पैदा करेगा और उस कतार, VirtualTopic.Color
तो इस आभासी विषय पर प्रकाशित वाले प्रत्येक संदेश में client1 कतार वितरित किया जाएगा करने के लिए सदस्यता ध्यान दें कि यह RabbitMQ एक्सचेंजों की तरह काम करता होगा।
- आप इस बिंदु पर, किया जाता है अब आप उपभोग कर सकते हैं client1 कतार हर पंक्ति की तरह कई उपभोक्ताओं, DLQ, अनुकूलित पुनर्वितरण नीति, आदि
- साथ, मुझे लगता है कि आप समझ गया कि आप बना सकते हैं client2, client3 और कितने ग्राहकों आप चाहते हैं, उन सभी को संदेश
VirtualTopic.Color
यहाँ कोड को प्रकाशित की एक प्रति प्राप्त करेंगे
+०१२३५१६४१०६१
@Component
public class ColorReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JmsTemplate jmsTemplate;
// simply generating data to the topic
long id=0;
@Scheduled(fixedDelay = 500)
public void postMail() throws JMSException, IOException {
final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
final Color color = new Color(++id, colorName.getName());
final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setObject(color);
message.setProperty("color", color.getName());
LOGGER.info("status=color-post, color={}", color);
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
selector = "color <> 'RED'"
)
public void genericReceiveMessage(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver, color={}", color);
}
/**
* Listen only red colors messages
*
* the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
* the containers clientId need to be different between each other
*/
@JmsListener(
// destination = "Consumer.redColorContainer.VirtualTopic.color",
destination = "Consumer.client1.VirtualTopic.color",
containerFactory = "redColorContainer", selector = "color='RED'"
)
public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
)
public void genericReceiveMessage2(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver-2, color={}", color);
}
}
@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {
/**
* Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
* clientIds per consumer pool (as two @JmsListener above, or two application instances)
*
*/
@Bean
public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-5");
configurer.configure(factory, connectionFactory);
// container.setClientId("aId..."); lets spring generate a random ID
return factory;
}
@Bean
public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
// necessary when post serializable objects (you can set it at application.properties)
connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-2");
configurer.configure(factory, connectionFactory);
return factory;
}
}
public class Color implements Serializable {
public static final Color WHITE = new Color("WHITE");
public static final Color BLUE = new Color("BLUE");
public static final Color RED = new Color("RED");
private String name;
private long id;
// CONSTRUCTORS, GETTERS AND SETTERS
}
क्या आप स्पष्टीकरण दे सकते हैं? जब मैं "एकाधिक संदेश श्रोताओं को समसामयिक रूप से विषय से लगातार संदेश संभालने के लिए देखता हूं" मुझे लगता है कि इसका मतलब है कि आप नहीं चाहते हैं कि श्रोताओं को प्रत्येक को एक ही संदेश की एक प्रति प्राप्त हो, बल्कि एक ही विषय पर एक दूसरे के खिलाफ संदेश के लिए प्रतिस्पर्धा करने के लिए। क्या वो सही है? –
यह उपयोगी लग रहा है: http://bsnyderblog.blogspot.com/2010/05/tuning-jms-message-consumption-in.html – skaffman