2015-05-19 7 views
5

मैं एक्सएमएल स्प्रिंग AMQP config से एक जावा एक आधारित एनोटेशन को विस्थापित करने के लिए है, क्योंकि यह है "सरल" कोशिश कर रहा हूँ कोशिश कर रहा है। यकीन नहीं है कि मैं क्या कर रहा हूं एक्सएमएल कॉन्फ़िगरेशन ठीक काम करता है लेकिन जावा @ कॉन्फ़िगर करने योग्य "कारण: java.net.SocketException: कनेक्शन रीसेट" अपवाद के कारण फेंकता है।एक जावा @Configurable को <rabbit:> एक्सएमएल नाम अंतरिक्ष कॉन्फिग से अधिक विस्थापित करने के लिए दोहराने नहीं कर सकते

एक्सएमएल config (पूरी तरह से काम करता है):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xmlns:util="http://www.springframework.org/schema/util" 
     xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
      http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/util 
      http://www.springframework.org/schema/util/spring-util.xsd 
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <!-- define which properties files will be used --> 
    <context:property-placeholder location="classpath:*.properties" /> 

    <rabbit:connection-factory id="connectionFactory" 
           addresses='${rabbitmq.hostname}' 
           username='${rabbitmq.username}' 
           password='${rabbitmq.password}' 
           virtual-host='${rabbitmq.virtual_host}' 
           cache-mode='${rabbitmq.cache_mode}'        
           channel-cache-size='${rabbitmq.channel_cache_size}'/> 

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="3"/> 
     <property name="maxPoolSize" value="5"/> 
     <property name="queueCapacity" value="15"/>       
    </bean>        


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> 
    <rabbit:admin connection-factory="connectionFactory"/> 
     <rabbit:queue name="${rabbitmq.queue_name}" /> 
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}"> 
    <rabbit:bindings> 
     <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/> 


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor"> 
      <rabbit:listener ref="listener" queues="notification.main" /> 

    </rabbit:listener-container> 
</beans> 

जावा config:

@Configurable 
@PropertySource("classpath:rabbitmq.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.hostname}") 
private String hostname; 

@Value("${rabbitmq.port}") 
private String port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.virtual_host}") 
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}") 
//private String cacheMode; 

@Value("${rabbitmq.channel_cache_size}") 
private String channelCacheSize; 

@Value("${rabbitmq.topic_exchange_name}") 
private String topicExchangeName; 

@Value("${rabbitmq.topic_exchange_pattern}") 
private String topicExchangePattern; 

@Value("${rabbitmq.queue_name}") 
private String queueName; 

@Autowired 
private ConnectionFactory cachingConnectionFactory; 

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

    connectionFactory.setUsername(username); 
    connectionFactory.setPassword(password); 

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode)); 
    connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

    return connectionFactory; 
} 

@Bean(name="taskExecutor") 
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor(); 
    tpte.setCorePoolSize(3); 
    tpte.setMaxPoolSize(5); 
    tpte.setQueueCapacity(15); 
    return tpte; 
} 

@Bean 
public AmqpTemplate AmqpTemplate() { 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 

    return template; 
} 


@Bean 
public AmqpAdmin amqpAdmin() { 
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory); 

    return amqpAdmin; 
} 

@Bean 
public Queue queue() { 
    return new Queue(queueName); 
} 

@Bean 
public TopicExchange topicExchange() { 
    TopicExchange topicExchange = new TopicExchange(topicExchangeName); 
    return topicExchange; 
} 

@Bean 
public Binding dataBinding(TopicExchange topicExchange, Queue queue) { 
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern); 
} 

@Bean 
public DefaultMessageListener defaultMessageListener() { 
    return new DefaultMessageListener(); 
} 

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(cachingConnectionFactory); 
    container.setQueueNames(queueName); 
    container.setAutoStartup(true); 
    container.setMessageListener(defaultMessageListener); 
    //container.setTaskExecutor(taskExecutor); 
    return container; 
} 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

जावा config त्रुटि:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container. 
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s). 
org.springframework.amqp.AmqpIOException: java.io.IOException 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004) 
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208) 
    ... 12 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 
    ... 16 more 
Caused by: java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) 
    ... 1 more 

मैं वसंत amqp कोड में कदम रखा और अपराधी RabbitAdmin # getQueueProperties विधि है। एक्सएमएल कॉन्फ़िगरेशन में यह ठीक निष्पादित करता है ... लेकिन जैसे ही यह जावा कॉन्फ़िगरेशन के साथ निष्पादित करता है, यह ऊपर अपवाद फेंकता है? मैं जो कर रहा हूं वह अलग है? दोनों विन्यास मेरे लिए समान दिखते हैं।

package org.springframework.amqp.rabbit.core; 

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean { 
//... 
    @Override 
    public Properties getQueueProperties(final String queueName) { 
     Assert.hasText(queueName, "'queueName' cannot be null or empty"); 
     return this.rabbitTemplate.execute(new ChannelCallback<Properties>() { 
      @Override 
      public Properties doInRabbit(Channel channel) throws Exception { 
       try { 
        DeclareOk declareOk = channel.queueDeclarePassive(queueName); 
        Properties props = new Properties(); 
        props.put(QUEUE_NAME, declareOk.getQueue()); 
        props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount()); 
        props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount()); 
        return props; 
       } 
       catch (Exception e) { 
        if (logger.isDebugEnabled()) { 
         logger.debug("Queue '" + queueName + "' does not exist"); 
        } 
        return null; 
       } 
      } 
     }); 
    } 
} 

दोनों कॉन्फ़िगरेशन classpath पर ठीक उसी rabbitmq.properties फ़ाइल का उपयोग करें। मैं भी दोनों कॉन्फ़िगरेशन के लिए कार्यावधि में RabbitAdmin और RabbitTemplate कक्षाओं की विशेषताओं की जाँच कर रहा हूँ और वे ठीक उसी देखो ...

उत्तर

1

मैं '/' रूट वर्चुअल होस्ट का उपयोग नहीं कर रहा हूं। वर्चुअल_होस्ट के लिए मेरा अपना कस्टम मान था। हालांकि, मैंने अपनी जावा कॉन्फ़िगरेशन में स्पेल के माध्यम से इस संपत्ति को इंजेक्ट किया था, मैंने इसे स्पष्ट रूप से कनेक्शन पर सेट नहीं किया था। फैक्टरी।

connectionFactory.setVirtualHost(virtualHost); 

शूट करने में मेरी सहायता के लिए @ गैरी रसेल के लिए धन्यवाद।

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 

     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

     connectionFactory.setUsername(username); 
     connectionFactory.setPassword(password); 

     connectionFactory.setVirtualHost(virtualHost); 
     connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

     return connectionFactory; 
    } 
4

आप @Configuration@Configurable का उपयोग करना चाहिए, नहीं।

संपादित करें:

लगता RabbitMQ-सर्वर की तरह कनेक्शन बंद हो रहा है:

Caused by: java.net.SocketException: Connection reset 

सर्वर लॉग में देखो; अगर वह मदद नहीं करता है; कहीं भी org.springframework के लिए एक पूर्ण DEBUG लॉग पोस्ट करें (शायद यहां के लिए बहुत बड़ा)।

EDIT2:

आप एक प्रमाणीकरण समस्या है ...

{handshake_error,opening,0, 
      {amqp_error,access_refused, 
         "access to vhost '/' refused for user 'gggdw'", 
         'connection.open'}} 

... अपने उपयोगकर्ता नाम और pasword (और vhost) की जाँच करें।

+0

निष्पक्ष बिंदु लेकिन मैं वसंत AMQP कोड में कदम रखा और कहा कि RabbitAdmin # getQueueProperties विधि है अभी भी (मेरे लिए) एक ही त्रुटि – Selwyn

+0

रसेल http://pastebin.com/p0KzJv3g पूर्ण स्टैक बहुत खुलासा नहीं है हो रही है जो जावा कॉन्फ़िगरेशन में अपवाद फेंकता है लेकिन एक्सएमएल कॉन्फ़िगरेशन – Selwyn

+1

में ठीक निष्पादित करता है क्रेडेंशियल के साथ कुछ समस्या। –

संबंधित मुद्दे