假设有一条调用有状态 bean 的路由:
<camel:route id="Concurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>
消息可以沿着该路径同时发送,即从并发线程调用 ProducerTemplate
的 requestBody
方法。因此,如果正在进行两次交换,并且在一次交换期间调用 setSomeState
和在另一次交换期间执行的 setSomeState
和 getSomeDataDependingOnState
调用之间,就会出现问题。我看到有两种方法可以解决这个问题,每种方法都有一个缺点。
使用SEDA
<camel:route id="Councurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:to uri="seda:sedaRoute"/>
</camel:route>
<camel:route id="SEDA-route">
<camel:from uri="seda:sedaRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>
在这种情况下,从不同线程发送的消息会聚集在 SEDA 端点的队列中。来自该队列的消息在沿着 SEDA-route
行进时在一个线程中进行处理。因此一条消息的处理不会干扰另一条消息的处理。但是,如果有许多线程向 concurrentlyCalledRoute
发送消息,SEDA-route
将成为瓶颈。如果使用多个线程来处理 seda 队列中的项目,则会再次出现并发调用有状态 bean 的问题。
另一种方法 - 使用自定义范围。
自定义范围
Spring 框架允许实现自定义范围。因此,我们能够实现一个范围,为每次交换存储一个单独的 bean 实例。
public class ExchangeScope implements Scope {
private Map<String, Map<String,Object>> instances = new ConcurrentHashMap<>();
private Map<String,Runnable> destructionCallbacks = new ConcurrentHashMap<>();
private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();
public void activate(String exchangeId) {
if (!this.instances.containsKey(exchangeId)) {
Map<String, Object> instancesInCurrentExchangeScope = new ConcurrentHashMap<>();
this.instances.put(exchangeId, instancesInCurrentExchangeScope);
}
this.currentExchangeId.set(exchangeId);
}
public void destroy() {
String currentExchangeId = this.currentExchangeId.get();
Map<String,Object> instancesInCurrentExchangeScope = instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
throw new RuntimeException("ExchangeScope with id = " + currentExchangeId + " doesn't exist");
for (String name : instancesInCurrentExchangeScope.keySet()) {
this.remove(name);
}
instances.remove(currentExchangeId);
this.currentExchangeId.set(null);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
// selects by name a bean instance from a map storing instances for current exchange
// creates a new bean instance if necessary
}
@Override
public Object remove(String name) {
// removes a bean instance
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
this.destructionCallbacks.put(name, callback);
}
@Override
public Object resolveContextualObject(String name) {
String currentExchangeId = this.currentExchangeId.get();
if (currentExchangeId == null)
return null;
Map<String,Object> instancesInCurrentExchangeScope = this.instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
return null;
return instancesInCurrentExchangeScope.get(name);
}
@Override
public String getConversationId() {
return this.currentExchangeId.get();
}
}
现在我们可以注册这个自定义作用域并将 statefullBean
声明为交换作用域:
<bean id="exchangeScope" class="org.my.ExchangeScope"/>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="ExchangeScope" value-ref="exchangeScope"/>
</map>
</property>
</bean>
<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>
要使用交换范围,我们应该在发送消息之前调用 ExchangeScope
的 activate
方法,然后调用 destroy
:
this.exchangeScope.activate(exchangeId);
this.producerTemplate.requestBody(request);
this.exchangeScope.destroy(exchangeId);
通过此实现,交换作用域实际上是线程作用域。这是一个缺点。例如,如果在路由中使用多线程拆分器,则它将无法从拆分器创建的线程中调用交换作用域 bean,因为对 bean 的调用将在与启动交换的线程不同的线程中执行。
有什么想法可以解决这些缺点吗?在并发交换期间是否有完全不同的方法来隔离有状态 bean?
最佳答案
另一个需要考虑的替代方案是不要让你的 beans 有状态。您可以将状态数据存储在消息本身而不是 bean 中,因此您的方法将类似于:
public class StatefulBean {
public StateInfo setSomeState(Message msg) {...}
public void getSomeDataDependingOnState(StateInfo stateinfo) {...}
}
关于java - 在并发交换期间隔离有状态bean的最正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26013925/