我正在为我的 Trident 拓扑实现 IBackingMap 以将元组存储到 ElasticSearch(我知道 GitHub 上已经存在几种 Trident/ElasticSearch 集成实现,但是我决定实现一个更适合我的任务的自定义实现)。
所以我的实现是一个带有工厂的经典实现:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
您会看到它获取主机/端口/集群名称作为输入参数,并创建一个 ElasticSearch 客户端作为该类的成员,但它从不关闭客户端。
然后以非常熟悉的方式在拓扑中使用它:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
这个拓扑被包装在一些公共(public)静态 void main 中,打包在一个 jar 中并发送到 Storm 执行。
问题是,我应该担心关闭 ElasticSearch 连接还是 Storm 自己的事情?如果 Storm 没有做到这一点,在拓扑的生命周期中我应该如何以及何时做到这一点?
提前致谢!
最佳答案
好的,回答我自己的问题。
首先,再次感谢@dedek 的建议并在 Storm 的 Jira 中恢复票证。
最后,由于没有官方的方法可以做到这一点,我决定使用 Trident 过滤器的 cleanup() 方法。到目前为止,我已经验证了以下内容(针对 Storm v. 0.9.4):
使用 LocalCluster
带真实集群
总体而言,这或多或少地保证了 cleanup() 被调用,前提是您要小心处理异常(无论如何,我倾向于在我的每个 Trident 原语中添加“thundercatch”)。
我的代码:
public class CloseFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);
private final Closeable[] closeables;
public CloseFilter(Closeable... closeables) {
this.closeables = closeables;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return true;
}
@Override
public void prepare(Map conf, TridentOperationContext context) {
}
@Override
public void cleanup() {
for (Closeable c : closeables) {
try {
c.close();
} catch (Exception e) {
LOG.warn("Failed to close an instance of {}", c.getClass(), e);
}
}
}
}
但是,如果有一天用于关闭连接的钩子(Hook)成为 API 的一部分,那就太好了。
关于apache-storm - 如何关闭由 Storm Trident 拓扑中的 IBackingMap 实现打开的数据库连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28743434/