apache-storm - 如何关闭由 Storm Trident 拓扑中的 IBackingMap 实现打开的数据库连接?

标签 apache-storm trident

我正在为我的 Trident 拓扑实现 IBackingMap 以将元组存储到 ElasticSearch(我知道 GitHub 上已经存在几种 Trident/ElasticSearch 集成实现,但是我决定实现一个更适合我的任务的自定义实现)。

所以我的实现是一个带有工厂的经典实现:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}

您会看到它获取主机/端口/集群名称作为输入参数,并创建一个 ElasticSearch 客户端作为该类的成员,但它从不关闭客户端。

然后以非常熟悉的方式在拓扑中使用它:

tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));

这个拓扑被包装在一些公共(public)静态 void main 中,打包在一个 jar 中并发送到 Storm 执行。

问题是,我应该担心关闭 ElasticSearch 连接还是 Storm 自己的事情?如果 Storm 没有做到这一点,在拓扑的生命周期中我应该如何以及何时做到这一点?

提前致谢!

最佳答案

好的,回答我自己的问题。

首先,再次感谢@dedek 的建议并在 Storm 的 Jira 中恢复票证。

最后,由于没有官方的方法可以做到这一点,我决定使用 Trident 过滤器的 cleanup() 方法。到目前为止,我已经验证了以下内容(针对 Storm v. 0.9.4):

使用 LocalCluster

  • cleanup() 在集群关闭时被调用
  • cleanup() 在终止拓扑时不会被调用,这不应该是一场悲剧,很可能无论如何都不会使用 LocalCluster 进行实际部署

  • 带真实集群
  • 它在拓扑被终止时以及当使用 pkill -TERM -u Storm -f 'backtype.storm.daemon.worker'
  • 停止工作人员时被调用
  • 如果 worker 被 kill -9 杀死或当它崩溃或 - 可悲的是 - 当 worker 因异常而死亡时,它不会被调用

  • 总体而言,这或多或少地保证了 cleanup() 被调用,前提是您要小心处理异常(无论如何,我倾向于在我的每个 Trident 原语中添加“thundercatch”)。

    我的代码:

    public class CloseFilter implements Filter {
    
        private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);
    
        private final Closeable[] closeables;
    
        public CloseFilter(Closeable... closeables) {
            this.closeables = closeables;
        }
    
        @Override
        public boolean isKeep(TridentTuple tuple) {
            return true;
        }
    
        @Override
        public void prepare(Map conf, TridentOperationContext context) {
    
        }
    
        @Override
        public void cleanup() {
            for (Closeable c : closeables) {
                try {
                    c.close();
                } catch (Exception e) {
                    LOG.warn("Failed to close an instance of {}", c.getClass(), e);
                }
            }
        }
    }
    

    但是,如果有一天用于关闭连接的钩子(Hook)成为 API 的一部分,那就太好了。

    关于apache-storm - 如何关闭由 Storm Trident 拓扑中的 IBackingMap 实现打开的数据库连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28743434/

    相关文章:

    java - 如何在 Apache Storm Trident 中确定事务是否成功提交

    apache-storm - Storm的这个调试日志是什么意思?

    distributed-computing - 流处理引擎的并行行为

    clojure - clojure 中的未绑定(bind)函数 - 如何绑定(bind)它们?

    global - 我应该使用 Trident 来计算 Storm 中元组的全局平均值吗?

    java - 比较不透明或事务性 Kafka spout

    java - Storm 中的多个字段分组

    java - 如何识别所有发射是否到达 bolt ?

    stream - 创建一个每 X 秒发出一次元组的 Apache Storm spout

    java - 在 Storm TrackedTopology 单元测试中运行 Trident 拓扑