java - 在并发交换期间隔离有状态bean的最正确方法

标签 java spring concurrency apache-camel

假设有一条调用有状态 bean 的路由:

<camel:route id="Concurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

消息可以沿着该路径同时发送,即从并发线程调用 ProducerTemplaterequestBody 方法。因此,如果正在进行两次交换,并且在一次交换期间调用 setSomeState 和在另一次交换期间执行的 setSomeStategetSomeDataDependingOnState 调用之间,就会出现问题。我看到有两种方法可以解决这个问题,每种方法都有一个缺点。

使用SEDA

<camel:route id="Councurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:to uri="seda:sedaRoute"/>
</camel:route>

<camel:route id="SEDA-route">
    <camel:from uri="seda:sedaRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

在这种情况下,从不同线程发送的消息会聚集在 SEDA 端点的队列中。来自该队列的消息在沿着 SEDA-route 行进时在一个线程中进行处理。因此一条消息的处理不会干扰另一条消息的处理。但是,如果有许多线程向 concurrentlyCalledRoute 发送消息,SEDA-route 将成为瓶颈。如果使用多个线程来处理 seda 队列中的项目,则会再次出现并发调用有状态 bean 的问题。

另一种方法 - 使用自定义范围。

自定义范围

Spring 框架允许实现自定义范围。因此,我们能够实现一个范围,为每次交换存储一个单独的 bean 实例。

public class ExchangeScope implements Scope {

    private Map<String, Map<String,Object>> instances = new ConcurrentHashMap<>();

    private Map<String,Runnable> destructionCallbacks = new ConcurrentHashMap<>();

    private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();

    public void activate(String exchangeId) {
        if (!this.instances.containsKey(exchangeId)) {
            Map<String, Object> instancesInCurrentExchangeScope = new ConcurrentHashMap<>();
            this.instances.put(exchangeId, instancesInCurrentExchangeScope);
        }
        this.currentExchangeId.set(exchangeId);
    }

    public void destroy() {
        String currentExchangeId = this.currentExchangeId.get();
        Map<String,Object> instancesInCurrentExchangeScope = instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            throw new RuntimeException("ExchangeScope with id = " + currentExchangeId + " doesn't exist");
        for (String name : instancesInCurrentExchangeScope.keySet()) {
            this.remove(name);
        }
        instances.remove(currentExchangeId);
        this.currentExchangeId.set(null);
    }

    @Override
    public Object get(String name, ObjectFactory<?> objectFactory) {
    // selects by name a bean instance from a map storing instances for current exchange
    // creates a new bean instance if necessary
    }

    @Override
    public Object remove(String name) {
    // removes a bean instance
    }

    @Override
    public void registerDestructionCallback(String name, Runnable callback) {
        this.destructionCallbacks.put(name, callback);
    }

    @Override
    public Object resolveContextualObject(String name) {
        String currentExchangeId = this.currentExchangeId.get();
        if (currentExchangeId == null)
            return null;

        Map<String,Object> instancesInCurrentExchangeScope = this.instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            return null;

        return instancesInCurrentExchangeScope.get(name);
    }

    @Override
    public String getConversationId() {
        return this.currentExchangeId.get();
    }
}

现在我们可以注册这个自定义作用域并将 statefullBean 声明为交换作用域:

<bean id="exchangeScope" class="org.my.ExchangeScope"/>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="ExchangeScope" value-ref="exchangeScope"/>
        </map>
    </property>
</bean>

<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>

要使用交换范围,我们应该在发送消息之前调用 ExchangeScopeactivate 方法,然后调用 destroy :

this.exchangeScope.activate(exchangeId);
this.producerTemplate.requestBody(request);
this.exchangeScope.destroy(exchangeId);

通过此实现,交换作用域实际上是线程作用域。这是一个缺点。例如,如果在路由中使用多线程拆分器,则它将无法从拆分器创建的线程中调用交换作用域 bean,因为对 bean 的调用将在与启动交换的线程不同的线程中执行。

有什么想法可以解决这些缺点吗?在并发交换期间是否有完全不同的方法来隔离有状态 bean?

最佳答案

另一个需要考虑的替代方案是不要让你的 beans 有状态。您可以将状态数据存储在消息本身而不是 bean 中,因此您的方法将类似于:

public class StatefulBean {
    public StateInfo setSomeState(Message msg) {...}

    public void getSomeDataDependingOnState(StateInfo stateinfo) {...}
}

关于java - 在并发交换期间隔离有状态bean的最正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26013925/

相关文章:

java - 每帧通过 OpenGL 渲染两次,不同的库

java - 在 Hibernate 中使用实体作为 DTO

c - Execvp 输出太大时挂起?

java - 向 hashmap 添加值时出现 ConcurrentModificationException

java - OpenGL 1.1 以固定分辨率和高档渲染

java - 将此 "yyyy-MM-dd' T'HH :mm:ss. SSSXXX"格式的字符串转换为 LocalDate

java - Elasticsearch 1.6 将 19 位数字插入 Long 字段 - 最后 3 位四舍五入

java - 从实体访问存储库或服务

java - Spring Boot + Hibernate JPA 在 @Service 层中没有具有实际事务的 EntityManager

java - 在指定的超时后从一个线程执行多个 Runnable