java - 发送消息时出现 SocketException : (Connection Reset, AmqpIOException : java. io.IOException)

标签 java spring spring-integration spring-amqp

我正在开发 Spring Integration AMQP RabbitMQ 项目。 我试图让 <rabbit:admin> 在现有主题交换上创建一个不存在的队列,并将它们与路由键绑定(bind)。然后我尝试通过控制台将消息发送到队列。

<rabbit:template id="amqpTemplate" 
                 connection-factory="connectionFactory" 
                 exchange="e.products.official" 
                 routing-key="kevin.test.routing.key"
                 queue="q.Kevin.Spring.Rabbit" reply-timeout="100000" />

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

<rabbit:connection-factory id="connectionFactory" 
                           host="MyHost"
                           username="MyUserName"
                           password="MyPassword"
                           virtual-host="/nids" 
                           port="5672"/> 

<int-stream:stdin-channel-adapter id="consoleIn" channel="toRabbit">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-stream:stdin-channel-adapter>

<int:channel id="toRabbit" />

<int-amqp:outbound-channel-adapter 
    channel="toRabbit"
    exchange-name="e.products.official"
    routing-key="kevin.test.routing.key"
    amqp-template="amqpTemplate" />

<rabbit:topic-exchange name="e.products.official" declared-by="rabbitAdmin" >
    <rabbit:bindings>
        <rabbit:binding queue="q.Kevin.Spring.Rabbit" pattern="kevin.test.routing.key" />
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:queue name="q.Kevin.Spring.Rabbit" auto-delete="true" declared-by="rabbitAdmin" />

当我运行程序时,一切都会“进行”,但是当我在控制台中键入内容并按“Enter”键时,这是我得到的错误。我确信我错过了一些东西。有人可以帮我填补我的空白吗? 对于巨大的堆栈跟踪感到抱歉:

.........
2015-02-03 15:55:38 DEBUG DefaultLifecycleProcessor:170 - Starting bean 'consoleIn' of type [class org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean]
2015-02-03 15:55:38 DEBUG DefaultListableBeanFactory:248 - Returning cached instance of singleton bean 'taskScheduler'
2015-02-03 15:55:38 INFO  SourcePollingChannelAdapter:97 - started consoleIn
2015-02-03 15:55:38 DEBUG DefaultLifecycleProcessor:179 - Successfully started bean 'consoleIn'
2015-02-03 15:55:38 DEBUG SourcePollingChannelAdapter:208 - Received no Message during the poll, returning 'false'
2015-02-03 15:55:38 DEBUG DefaultListableBeanFactory:248 - Returning cached instance of singleton bean 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0'
2015-02-03 15:55:38 DEBUG DefaultListableBeanFactory:248 - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2015-02-03 15:55:38 DEBUG PropertySourcesPropertyResolver:81 - Searching for key 'spring.liveBeansView.mbeanDomain' in [systemProperties]
2015-02-03 15:55:38 DEBUG PropertySourcesPropertyResolver:81 - Searching for key 'spring.liveBeansView.mbeanDomain' in [systemEnvironment]
2015-02-03 15:55:38 DEBUG PropertySourcesPropertyResolver:103 - Could not find key 'spring.liveBeansView.mbeanDomain' in any property source. Returning [null]
2015-02-03 15:55:39 DEBUG SourcePollingChannelAdapter:208 - Received no Message during the poll, returning 'false'
2015-02-03 15:55:40 DEBUG SourcePollingChannelAdapter:208 - Received no Message during the poll, returning 'false'
2015-02-03 15:55:41 DEBUG SourcePollingChannelAdapter:208 - Received no Message during the poll, returning 'false'
asdf
2015-02-03 15:55:42 DEBUG SourcePollingChannelAdapter:214 - Poll resulted in Message: GenericMessage [payload=asdf, headers={id=908efc3c-fd13-052c-357b-e3ccba5ef973, timestamp=1423000542576}]
2015-02-03 15:55:42 DEBUG AmqpOutboundEndpoint:72 - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=asdf, headers={id=908efc3c-fd13-052c-357b-e3ccba5ef973, timestamp=1423000542576}]
2015-02-03 15:55:45 DEBUG DefaultListableBeanFactory:248 - Returning cached instance of singleton bean 'errorChannel'
2015-02-03 15:55:45 DEBUG LoggingHandler:72 - (inner bean)#77888435 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException, headers={id=10cf2fee-6132-ce5c-6531-db67dfc037ad, timestamp=1423000545683}]
2015-02-03 15:55:45 ERROR LoggingHandler:145 - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:130)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:207)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:441)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1036)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1029)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:541)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:636)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 28 more
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:617)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:651)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:203)
    ... 40 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 44 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:189)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534)
    ... 1 more

2015-02-03 15:55:45 DEBUG PublishSubscribeChannel:383 - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException, headers={id=10cf2fee-6132-ce5c-6531-db67dfc037ad, timestamp=1423000545683}]

现在,我不是专家,但我认为这与管理员无法声明队列并将其绑定(bind)到交换有关。我这样说是因为我能够使用队列名称创建 <int-amqp:inbound-channel-adapter>,它会创建该队列,但不会将其绑定(bind)到任何交换。

最佳答案

Caused by: java.net.SocketException: Connection reset

这意味着服务器由于某种未知原因关闭了连接。

如果这是一个“简单”队列声明问题,您会发现这是 ShutdownSignalException 的原因;在这种情况下,原因是网络级连接重置。

我建议您查看服务器日志,看看它是否提供了更多线索来说明服务器为何在没有提供原因的情况下关闭连接。

编辑:

我刚刚使用与您类似的配置进行了测试,没有任何问题。

编辑#2:

通过在连接工厂上使用无效的vhost,我能够得到类似(但略有不同)的错误...

Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 44 more
Caused by: java.io.EOFException

关于java - 发送消息时出现 SocketException : (Connection Reset, AmqpIOException : java. io.IOException),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28309644/

相关文章:

java - Spring Integration DSL 中是否有任何工具可以处理消息抽象而不是 JMS、AMQP...?

java - Spring 集成: TcpInboundGateway read stream closed before @MessageEndpoint writes to out stream

java - 如何在服务器上托管jsp网站(页面)?

java - NoClassDefFoundError:错误的名称

java - Google Oauth 登录窗口无法正常工作

java - 访问 spring 上下文

spring - 验证 JWT 不适用于颁发者 uri

spring-integration - Spring集成IDE

java - 我收到无效字符常量错误,我做错了什么?

java - 单击列表底部获取列表顶部的元素