java - Spring Integration 轮询器有时会运行一次然后停止

标签 java spring spring-integration

我在 Spring Integration 应用程序中有以下基于注释的配置:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(@Value("${poll-interval}") int pollInterval) {
    return Pollers.fixedRate(pollInterval).maxMessagesPerPoll(1).get();
}

@Bean
@InboundChannelAdapter(value = "httpRequestChannel", poller = @Poller(PollerMetadata.DEFAULT_POLLER),
    autoStartup = "${auto-start:true}")
public MessageSource<String> httpRequestTrigger() {
    Expression expression = new SpelExpressionParser().parseExpression(
        "#{@timeService.getLastQueryTime()}",
        new TemplateParserContext("#{", "}"));
    return new ExpressionEvaluatingMessageSource<>(expression, String.class);
}

@Bean
@Qualifier("httpRequestChannel")
public MessageChannel httpRequestChannel() {
    return new RendezvousChannel();
}

@Bean
@ServiceActivator(inputChannel = "httpRequestChannel", poller = @Poller(fixedRate="100"))
public MessageHandler httpRequestExecutingMessageHandler(
    URI backendURI,
    @Qualifier("httpReplyChannel") MessageChannel httpReplyChannel,
    RestTemplate restTemplate,
    @Qualifier("filterExpression") Expression filterExpression
) {
    HttpRequestExecutingMessageHandler messageHandler = new HttpRequestExecutingMessageHandler(
        backendURI.toString() + "?filter={filter}", restTemplate);
    messageHandler.setHttpMethod(HttpMethod.GET);
    messageHandler.setExpectedResponseType(String.class);
    messageHandler.setOutputChannel(httpReplyChannel);
    messageHandler.setExpectReply(true);
    messageHandler.setSendTimeout(1000);
    Map<String, Expression> uriVariableExpressions = new HashMap<>();
    uriVariableExpressions.put("filter", filterExpression);
    messageHandler.setUriVariableExpressions(uriVariableExpressions);
    return messageHandler;
}

@Bean
@Qualifier("httpReplyChannel")
public MessageChannel httpReplyChannel() {
    return new DirectChannel();
}

在我的 application.properties 文件中,auto-start 设置为 true 并且 poll-interval 为设置为 2000。

有时,启动应用程序时,轮询器会按预期工作,每 poll-interval 毫秒触发一次。然而,正如通常情况下,在启动时,轮询器将触发一次,并且不会再次触发。我的配置有错误吗?如何让轮询器持续工作?

编辑:当轮询器挂起时我进行了线程转储:

"task-scheduler-1@4820" prio=5 tid=0x13 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
      at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
      at java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877)
      at org.springframework.integration.channel.QueueChannel.doSend(QueueChannel.java:93)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
      at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
      at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
      at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
      at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
      at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-ClientPoller-1@5021" daemon prio=5 tid=0x18 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b0> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b1> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b2> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-ClientPoller-0@5017" daemon prio=5 tid=0x17 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b3> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b4> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b5> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
      at java.lang.Thread.run(Thread.java:745)

"NioBlockingSelector.BlockPoller-1@5010" daemon prio=5 tid=0x16 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b6> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b7> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b8> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioBlockingSelector$BlockPoller.run(NioBlockingSelector.java:339)

"task-scheduler-3@4847" prio=5 tid=0x15 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
      at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
      at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-Acceptor-0@5022" daemon prio=5 tid=0x19 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.ServerSocketChannelImpl.accept0(ServerSocketChannelImpl.java:-1)
      at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
      at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
      - locked <0x13af> (a java.lang.Object)
      at org.apache.tomcat.util.net.NioEndpoint$Acceptor.run(NioEndpoint.java:456)
      at java.lang.Thread.run(Thread.java:745)

"task-scheduler-2@4837" prio=5 tid=0x14 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
      at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.net.SocketInputStream.read(SocketInputStream.java:170)
      at java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
      at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
      at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
      - locked <0x13b9> (a java.io.BufferedInputStream)
      at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
      at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
      at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1569)
      - locked <0x13ba> (a sun.net.www.protocol.http.HttpURLConnection)
      at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
      at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
      at org.springframework.http.client.SimpleBufferingClientHttpRequest.executeInternal(SimpleBufferingClientHttpRequest.java:84)
      at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
      at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
      at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
      at org.howdoevenexist.AuthInterceptor.intercept(AuthInterceptor.java:24)
      at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:85)
      at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:69)
      at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
      at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
      at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:619)
      at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:595)
      at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:516)
      at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.handleRequestMessage(HttpRequestExecutingMessageHandler.java:382)
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
      at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
      at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
      at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
      at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-AsyncTimeout@5025" daemon prio=5 tid=0x1a nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.coyote.AbstractProtocol$AsyncTimeout.run(AbstractProtocol.java:1137)
      at java.lang.Thread.run(Thread.java:745)

"container-0@4216" prio=5 tid=0x12 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.catalina.core.StandardServer.await(StandardServer.java:427)
      at org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer$1.run(TomcatEmbeddedServletContainer.java:166)

"ContainerBackgroundProcessor[StandardEngine[Tomcat]]@4204" daemon prio=5 tid=0x11 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.run(ContainerBase.java:1339)
      at java.lang.Thread.run(Thread.java:745)

"Finalizer@5037" daemon prio=8 tid=0x3 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
      at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
      at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler@5038" daemon prio=10 tid=0x2 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
      at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"DestroyJavaVM@5035" prio=5 tid=0x1c nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"Signal Dispatcher@5036" daemon prio=9 tid=0x4 nid=NA runnable
  java.lang.Thread.State: RUNNABLE

最佳答案

参见Configuring the Task Scheduler bean - 默认情况下只有 10 个线程。

在混合队列 channel 的情况下,线程将暂停 receiveTimeout(默认情况下为 1 秒)。如果您有很多队列 channel ,您可能会遇到线程匮乏的情况。

顺便说一句,您通常不需要很多队列 channel ;通常,流程中单线程切换就足够了。

关于java - Spring Integration 轮询器有时会运行一次然后停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42723061/

相关文章:

java - Google-Eclipse 插件是否自动生成 war 目录内容?

java - 等待调用 invokeLater()

java - 初学者 Spring Batch 查询以了解 itemstream 和一些概念

java - Spring Bean 重写父子关系中的注解

java - 在运行时配置生存时间属性

java - 谷歌应用引擎500未知错误

java - Spring 和接口(interface)

mysql - 插入多行 JPA Spring Boot Mysql

junit - SFTP 连接的 Spring junit 测试 : com. jcraft.jsch.JSchException: Packet corrupt

java - 如何从两个 MessageProducerSpec 创建 Spring Integration Flow?