rabbitmq - Turbine AMQP 不接收 Hystrix 流

标签 rabbitmq spring-cloud netflix hystrix turbine

我有一个 Turbine 和 Hystrix 设置在工作,但决定将其更改为 Turbine AMQP,这样我就可以将多个服务聚合到一个流/仪表板中。

我已经设置了一个在 localhost:8989 上运行的 Turbine AMQP 服务器,但它似乎没有从客户端服务获取 Hystrix 数据。当我在浏览器中访问 Turbine 服务器的 IP 时,我反复看到 data: {"type":"Ping"},即使我正在轮询 Hystrix 的 URL。如果我尝试在 Hystrix 仪表板中显示 Turbine AMQP 流,我会得到:Unable to connect to Command Metric Stream。

我在端口 5672 上默认安装了 RabbitMQ。

我使用 Hystrix-AMQP 的客户端服务有一个 application.yml 文件,如下所示:

spring:
  application:
    name: policy-service
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
spring:
  rabbitmq:
    addresses: ${vcap.services.${PREFIX:}rabbitmq.credentials.uri:amqp://${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}}

启动日志的尾端是这样的:

2015-09-14 16:31:13.030  INFO 52844 --- [           main] com.netflix.discovery.DiscoveryClient    : Starting heartbeat executor: renew interval is: 30
2015-09-14 16:31:13.047  INFO 52844 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application policy-service with eureka with status UP
2015-09-14 16:31:13.194  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'policy-service:8088.errorChannel' has 1 subscriber(s).
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {filter} as a subscriber to the 'cloudBusOutboundFlow.channel#0' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-    service:8088.cloudBusOutboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {filter} as a subscriber to the 'cloudBusInboundChannel' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusInboundChannel' has 1 subscriber(s).
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler} as a subscriber to the 'cloudBusInboundFlow.channel#0' channel
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter} as a subscriber to the 'cloudBusWiretapChannel' channel
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusWiretapChannel' has 1 subscriber(s).
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'cloudBusOutboundChannel' channel
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusOutboundChannel' has 1 subscriber(s).
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'cloudBusAmqpInboundFlow.channel#0' channel
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusAmqpInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'hystrixStream' channel
2015-09-14 16:31:13.199  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.hystrixStream' has 1 subscriber(s).
2015-09-14 16:31:13.199  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#6
2015-09-14 16:31:13.219  INFO 52844 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 1073741823
2015-09-14 16:31:13.219  INFO 52844 --- [           main] ApplicationEventListeningMessageProducer : started org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer#0
2015-09-14 16:31:13.555  INFO 52844 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (4640c1c8-ff8f-45d7-8426-19d1b7a4cdb0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2015-09-14 16:31:13.572  INFO 52844 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter#0
2015-09-14 16:31:13.573  INFO 52844 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2015-09-14 16:31:13.576  INFO 52844 --- [           main] c.n.h.c.m.e.HystrixMetricsPoller         : Starting HystrixMetricsPoller
2015-09-14 16:31:13.609  INFO 52844 --- [           main] ration$HystrixMetricsPollerConfiguration : Starting poller
2015-09-14 16:31:13.803  INFO 52844 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8088 (http)
2015-09-14 16:31:13.805  INFO 52844 --- [           main] com.ml.springboot.PolicyService     : Started PolicyService in 22.544 seconds (JVM running for 23.564)

因此看起来 PolicyService 已成功连接到消息代理。

Turbine AMQP 服务器的日志结尾:

2015-09-14 16:58:05.887  INFO 51944 --- [           main] i.reactivex.netty.server.AbstractServer  : Rx server started at port: 8989
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'bootstrap:-1.errorChannel' has 1 subscriber(s).
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'hystrixStreamAggregatorInboundFlow.channel#0' channel
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'bootstrap:-1.hystrixStreamAggregatorInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 1073741823
2015-09-14 16:58:06.238  INFO 51944 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (spring.cloud.hystrix.stream) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2015-09-14 16:58:06.289  INFO 51944 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter#0
2015-09-14 16:58:06.290  INFO 51944 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2015-09-14 16:58:06.434  INFO 51944 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): -1 (http)

为什么 Turbine AMQP 服务器没有从 Hystrix AMQP 客户端接收通信有什么想法吗?

编辑:Turbine-AMQP 主体看起来像:

package com.turbine.amqp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.turbine.amqp.EnableTurbineAmqp;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableAutoConfiguration
@EnableTurbineAmqp
@EnableDiscoveryClient
public class TurbineAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(TurbineAmqpApplication.class, args);
    }
}

这是它的 application.yml:

server:
  port: 8989
spring:
  rabbitmq:
addresses: ${vcap.services.${PREFIX:}rabbitmq.credentials.uri:amqp://${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}}

点击http://localhost:8989/turbine.stream 会产生一个重复的数据流:{"type":"Ping"}

并在控制台中显示:

2015-09-15 08:54:37.960  INFO 83480 --- [o-eventloop-3-1] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : SSE Request Received
2015-09-15 08:54:38.025  INFO 83480 --- [o-eventloop-3-1] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : Starting aggregation

编辑:当我停止收听涡轮流时抛出以下异常,而不是在我尝试使用仪表板收听时抛出。

2015-09-15 08:56:47.934  INFO 83480 --- [o-eventloop-3-3] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : SSE Request Received
2015-09-15 08:56:47.946  WARN 83480 --- [o-eventloop-3-3] io.netty.channel.DefaultChannelPipeline  : An exception was thrown by a user handler's exceptionCaught() method while handling the following exception:

java.lang.NoSuchMethodError: rx.Observable.collect(Lrx/functions/Func0;Lrx/functions/Action2;)Lrx/Observable;
    at com.netflix.turbine.aggregator.StreamAggregator.lambda$null$36(StreamAggregator.java:89)
    at rx.internal.operators.OnSubscribeMulticastSelector.call(OnSubscribeMulticastSelector.java:60)
    at rx.internal.operators.OnSubscribeMulticastSelector.call(OnSubscribeMulticastSelector.java:40)
    at rx.Observable.unsafeSubscribe(Observable.java:8591)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:190)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:160)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:96)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:173)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
    at rx.subjects.PublishSubject.onNext(PublishSubject.java:101)
    at org.springframework.cloud.netflix.turbine.amqp.Aggregator.handle(Aggregator.java:53)
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:112)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:102)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:342)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:101)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: GroupedObservable.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
    ... 58 common frames omitted

我对 turbine-amqp 的依赖如下:

dependencies {
    compile('org.springframework.cloud:spring-cloud-starter-turbine-amqp:1.0.3.RELEASE')
    compile 'org.springframework.boot:spring-boot-starter-web:1.2.5.RELEASE'
    compile 'org.springframework.boot:spring-boot-starter-actuator:1.2.5.RELEASE'

    testCompile("org.springframework.boot:spring-boot-starter-test") 
}

dependencyManagement {
    imports { 
         mavenBom 'org.springframework.cloud:spring-cloud-starter-parent:1.0.2.RELEASE'
    }
}

最佳答案

很难找到解决方案。

使用 Spring cloud 2.1.4.RELEASE 我遇到了类似的问题。

主要原因是 rabbitMQ 之间的不兼容[交换]名称: spring-cloud-netflix-hystrix-stream 和 spring-cloud-starter-netflix-turbine-stream。

于是解决: 启动服务组件时查看创建的名称 exchange name {与声明 hystrix-stream 的相同}

在声明 {turbine-stream} 的组件上 更新属性

turbine.stream.destination=

以我为例 turbine.stream.destination=hystrixStreamOutput

关于rabbitmq - Turbine AMQP 不接收 Hystrix 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32573966/

相关文章:

java - 访问服务器端口@Value ("${local.server.port}")不起作用

java - 如何使用 Jackson 将 Java Map 转换为组合的 POJO?

database - 高性能交易引擎应该同步保存到数据库吗?

spring - Spring Cloud Angel.SR4和Grails 3.0.9无法正常工作

node.js - 当没有消费者时获取消息以保留在 RabbitMQ 中

spring-cloud - 将 spring-cloud-starter-dataflow-server-local 升级到 1.3.0 时出现构建错误

java - 将 Netflix Zuul 与 Netflix Hystrix 相结合

netflix - Netflix 如何检测代理?

node.js - 无法在 Windows 上启动 RabbitMQ 集群的多个 Node

java - Spring Cloud Stream RabbitMQ 多个接收者