java - spring 集成中的自定义并发 TcpOutboundGateway

标签 java spring multithreading tcp spring-integration

我正在尝试使用 TcpOutboundGateway 和客户端 TcpConnectionFactory 与外部 TCP 服务器通信。在我的场景中,每个连接都应该与不同的线程相关联(线程上的每个连接可能用于多个请求/响应)。

所以我使用了这个主题中的 ThreadAffinityClientConnectionFactory:Spring Integration tcp client multiple connections

它工作正常,直到我尝试打开超过 4 个并发连接,第五个(及以上)连接因超时而失败。 我发现 org.springframework.integration.ip.tcp.TcpOutboundGatewayhandleRequestMessage 方法中使用信号量来获取连接,所以我覆盖了 TcpOuboundGateway像这样:

public class NoSemaphoreTcpOutboundGateway extends TcpOutboundGateway {

    private volatile AbstractClientConnectionFactory connectionFactory;
    private final Map<String, NoSemaphoreTcpOutboundGateway.AsyncReply> pendingReplies = new ConcurrentHashMap();

    @Override
    public boolean onMessage(Message<?> message) {
        String connectionId = (String)message.getHeaders().get("ip_connectionId");
        if(connectionId == null) {
            this.logger.error("Cannot correlate response - no connection id");
            this.publishNoConnectionEvent(message, (String)null, "Cannot correlate response - no connection id");
            return false;
        }

        if(this.logger.isTraceEnabled()) {
            this.logger.trace("onMessage: " + connectionId + "(" + message + ")");
        }

        NoSemaphoreTcpOutboundGateway.AsyncReply reply = (NoSemaphoreTcpOutboundGateway.AsyncReply)this.pendingReplies.get(connectionId);
        if(reply == null) {
            if(message instanceof ErrorMessage) {
                return false;
            } else {
                String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
                this.logger.error(errorMessage);
                this.publishNoConnectionEvent(message, connectionId, errorMessage);
                return false;
            }
        } else {
            reply.setReply(message);
            return false;
        }

    }

    @Override
    protected Message handleRequestMessage(Message<?> requestMessage) {
        connectionFactory = (AbstractClientConnectionFactory) this.getConnectionFactory();
        Assert.notNull(this.getConnectionFactory(), this.getClass().getName() + " requires a client connection factory");

        TcpConnection connection = null;
        String connectionId = null;

        Message var7;
        try {
            /*if(!this.isSingleUse()) {
                this.logger.debug("trying semaphore");
                if(!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                    throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
                }

                haveSemaphore = true;
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("got semaphore");
                }
            }*/

            connection = this.getConnectionFactory().getConnection();
            NoSemaphoreTcpOutboundGateway.AsyncReply e = new NoSemaphoreTcpOutboundGateway.AsyncReply(10000);
            connectionId = connection.getConnectionId();
            this.pendingReplies.put(connectionId, e);
            if(this.logger.isDebugEnabled()) {
                this.logger.debug("Added pending reply " + connectionId);
            }

            connection.send(requestMessage);

            //connection may be closed after send (in interceptor) if its disconnect message
            if (!connection.isOpen())
                return null;

            Message replyMessage = e.getReply();
            if(replyMessage == null) {
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Remote Timeout on " + connectionId);
                }

                this.connectionFactory.forceClose(connection);
                throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
            }

            if(this.logger.isDebugEnabled()) {
                this.logger.debug("Response " + replyMessage);
            }

            var7 = replyMessage;
        } catch (Exception var11) {
            this.logger.error("Tcp Gateway exception", var11);
            if(var11 instanceof MessagingException) {
                throw (MessagingException)var11;
            }

            throw new MessagingException("Failed to send or receive", var11);
        } finally {
            if(connectionId != null) {
                this.pendingReplies.remove(connectionId);
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Removed pending reply " + connectionId);
                }
            }
        }
        return var7;
    }

    private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
        ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
        if(applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
        }
    }

    private final class AsyncReply {
        private final CountDownLatch latch;
        private final CountDownLatch secondChanceLatch;
        private final long remoteTimeout;
        private volatile Message<?> reply;

        private AsyncReply(long remoteTimeout) {
            this.latch = new CountDownLatch(1);
            this.secondChanceLatch = new CountDownLatch(1);
            this.remoteTimeout = remoteTimeout;
        }

        public Message<?> getReply() throws Exception {
            try {
                if(!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            } catch (InterruptedException var2) {
                Thread.currentThread().interrupt();
            }
            for(boolean waitForMessageAfterError = true; this.reply instanceof ErrorMessage; waitForMessageAfterError = false) {
                if(!waitForMessageAfterError) {
                    if(this.reply.getPayload() instanceof MessagingException) {
                        throw (MessagingException)this.reply.getPayload();
                    }

                    throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
                }
                NoSemaphoreTcpOutboundGateway.this.logger.debug("second chance");
                this.secondChanceLatch.await(2L, TimeUnit.SECONDS);
            }
            return this.reply;
        }

        public void setReply(Message<?> reply) {
            if(this.reply == null) {
                this.reply = reply;
                this.latch.countDown();
            } else if(this.reply instanceof ErrorMessage) {
                this.reply = reply;
                this.secondChanceLatch.countDown();
            }
        }
    }
}

SpringContext的配置是这样的:

@Configuration
@ImportResource("gateway.xml")
public class Conf {

@Bean
@Autowired
@ServiceActivator(inputChannel = "clientOutChannel")
public NoSemaphoreTcpOutboundGateway noSemaphoreTcpOutboundGateway(ThreadAffinityClientConnectionFactory cf, DirectChannel clientInChannel){
    NoSemaphoreTcpOutboundGateway gw = new NoSemaphoreTcpOutboundGateway();
    gw.setConnectionFactory(cf);
    gw.setReplyChannel(clientInChannel);
    gw.setRequestTimeout(10000);
    return gw;
}

 <int-ip:tcp-connection-factory
        id="delegateCF"
        type="client"
        host="${remoteService.host}"
        port="${remoteService.port}"
        single-use="true"
        lookup-host="false"
        ssl-context-support="sslContext"
        deserializer="clientDeserializer"
        serializer="clientSerializer"
        interceptor-factory-chain="clientLoggingTcpConnectionInterceptorFactory"
        using-nio="false"/>

delegateCF 被传递给ThreadAffinityClientConnectionFactory 构造函数

那么,问题是:

  • 就并发性而言,可以将 NoSemaphoreTcpOutboundGatewayThreadAffinityClientConnectionFactory 结合使用吗?

最佳答案

看起来你走对了,但同时我认为你不需要自定义 TcpOutboundGateway信号量 逻辑基于:

if (!this.isSingleUse) {
            logger.debug("trying semaphore");
            if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
            }

同时看看Gary对ThreadAffinityClientConnectionFactory的解决方案:

@Bean
public TcpNetClientConnectionFactory delegateCF() {
    TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234);
    clientCF.setSingleUse(true); // so each thread gets his own connection
    return clientCF;
}

@Bean
public ThreadAffinityClientConnectionFactory affinityCF() {
    return new ThreadAffinityClientConnectionFactory(delegateCF());
}

关注评论。您只需要委托(delegate) isSingleUse()

关于java - spring 集成中的自定义并发 TcpOutboundGateway,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41166718/

相关文章:

java - 如何从Mule批量发送消息到ActiveMQ

java - 使用 AD 和 Apache shiro 从 LDAP 服务器检索全名或显示名称

Spring Boot @RestController,发现不明确的映射

java - Java 中的线程池实际上 stop() 线程吗?

java - 在 JLabel 中追加文本

java - 缓冲的 RandomAccessFile java

@Controller 类中的 Spring @Value 注释未评估属性文件中的值

java - 准备好bean通过jdbc连接MySQL了吗?

iphone - 塔防游戏中的多线程寻路

C++, clang ,系统错误 : thread constructor failed: Resource temporarily unavailable