java - Netflix Hystrix - HystrixObservableCommand 异步运行

标签 java asynchronous spring-boot netflix hystrix

我有一些基本项目,其中有四个对某些外部资源的调用,在当前版本中同步运行。我想要实现的是将调用包装到 HystrixObservableCommand 中,然后异步调用它。

据我所知,在 HystrixObservableCommand 对象调用 .observe() 后,应该立即异步调用包装的逻辑。然而我做错了,因为它是同步工作的。

在示例代码中,输出为 Void,因为我对输出不感兴趣(目前)。这也是为什么我没有将 Observable 分配给任何对象,只是调用了constructor.observe()。

@Component
public class LoggerProducer {

    private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class);

    @Autowired
    SimpMessagingTemplate template;

    private void push(Iterable<Message> messages, String topic) throws Exception {
        template.convertAndSend("/messages/"+topic, messages);
    }

    public void splitAndPush(Iterable<Message> messages) {

        Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true)
                .collect(Collectors.groupingBy(Message::getType));

        //should be async - it's not 
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO),
                MessageTypeEnum.INFO.toString().toLowerCase()).observe();
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN),
                MessageTypeEnum.WARN.toString().toLowerCase()).observe();
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR),
                MessageTypeEnum.ERROR.toString().toLowerCase()).observe();

    }

    class CommandPushToBrowser extends HystrixObservableCommand<Void> {

        private Iterable<Message> messages;
        private String messageTypeName;

        public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) {
            super(HystrixCommandGroupKey.Factory.asKey("Messages"));
            this.messageTypeName = messageTypeName;
            this.messages = messages;
        }

        @Override
        protected Observable<Void> construct() {
            return Observable.create(new Observable.OnSubscribe<Void>() {

                @Override
                public void call(Subscriber<? super Void> observer) {
                    try {
                        for (int i = 0 ; i < 50 ; i ++ ) {
                            LOGGER.info("Count: " + i + " messageType " + messageTypeName);
                        }
                        if (null != messages) {
                            push(messages, messageTypeName);
                            LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages);
                        }
                        if (!observer.isUnsubscribed()) {
                            observer.onCompleted();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        observer.onError(e);
                    }
                }
            });
        }
    }
}

那里有一些纯粹的“测试”代码片段,因为我试图找出问题,所以忽略逻辑,主要焦点是使其与 .observe() 异步运行。我确实知道我可以使用标准 HystrixCommand 来实现这一目标,但这不是目标。

希望有人帮忙:) 问候,

最佳答案

找到答案:

"Observables do not add concurrency automatically. If you are modeling synchronous, blocking execution with an Observable, then they will execute synchronously.

You can easily make it asynchronous by scheduling on a thread using subscribeOn(Schedulers.io()). Here is a simply example for wrapping a blocking call with an Observable: https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33

However, if you are wrapping blocking calls, you should just stick with using HystrixCommand as that’s what it’s built for and it defaults to running everything in a separate thread. Using HystrixCommand.observe() will give you the concurrent, async composition you’re looking for.

HystrixObservableCommand is intended for wrapping around async, non-blocking Observables that don’t need extra threads."

-- Ben Christensen - Netflix Edge Engineering

来源:https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs

关于java - Netflix Hystrix - HystrixObservableCommand 异步运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39291689/

相关文章:

java - 如何使用 Mockito 将模拟注入(inject)抽象父类中的 @Autowired 字段

java - MyObjectBox 为红色/未找到,但 Java 应用程序仍然有效

database - 在哪里将 -ifNotExists 标志添加到 h2 start 命令?

java - LazyToOne 和 Spring LoadTimeWeaver

python - 我怎样才能杀死后台python进程?

java - 在 Spring Boot 应用程序中调用 Apache CXF 客户端时无法创建安全的 XMLInputFactory

java - ArrayList 和 Tuple 的 scala 版本是什么?

java - 通用数据库方法

c++ - 如何使用 casablanca (PPL) http_client 返回的 XmlLite 处理 XML?

javascript - Node.js 在函数之间异步传递变量