java - NiFi - 更新处理器中的 Luwak (Lucene) 索引

标签 java lucene apache-nifi

我正在尝试使用 Luwak 创建自定义处理器Lucene 索引器,因此我可以对传入流文件运行查询。我试图找出更新 Luwak 监视器内部存在的查询索引的最佳方法(下面的示例代码)。

编辑 - 更多使用上下文

通过更新,我的意思是允许外部用户添加/更新/删除针对传入流文件运行的查询。我们将从一组固定的查询开始,但随后希望允许一个或多个用户能够更改针对传入消息执行的查询。这就是挑战,改变正在执行的查询。

我还应该考虑其他选择吗?如果有 10k 个查询,则更新查询似乎需要大约 20 秒。这种情况很可能很少见,但重新加载/启动时间是我正在考虑的事情。

我考虑过的选项:

  1. 使用 UpdateAttribute 并更新每个流文件。不理想,尤其是当有大量查询需要索引时。
  2. 使用http、AWS SQS等发送高优先级流文件进行更新(高于任何其他源)。不可怕,但看起来还是不太对劲。
  3. 使用 NiFi API 在更新时启动/停止处理器。似乎不是执行更新的非常有效的方法,特别是如果更新非常频繁的话。

实例化监视器:

Monitor monitor = new Monitor(new LuceneQueryParser("field"), new TermFilteredPresearcher());

添加查询 - 我正在尝试优化的内容:

        //Add queries to the monitor
        for (Map.Entry<String, String> entry : bucketList.entrySet()) {
            MonitorQuery q = new MonitorQuery(entry.getKey(),entry.getValue());
            monitor.update(q);
        }

最佳答案

当您的处理器启动时,您可以启动一个后台计时器线程,该线程定期构建一个新监视器,然后替换处理器正在使用的监视器。

您可能希望在处理器中创建一个成员变量,例如:

AtomicReference<Monitor> monitorHolder = new AtomicReference<Monitor>();

然后在@OnScheduled中,您可以构建初始监视器并将其设置在支架中。

然后在 onTrigger 中你总是首先获得监视器:

Monitor localMonitor = monitorHolder.get();

然后在后台线程中可以调用monitorHolder.set(newMonitor),这不会影响处理器当前的执行,但会在下次调用onTrigger时生效。

关于java - NiFi - 更新处理器中的 Luwak (Lucene) 索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47005634/

相关文章:

java - Spark下载页面上预构建的hadoop和用户提供的hadoop有什么区别?

c# - Lucene - 搜索数值字段

json - 根据 json 字段路由整个 Json 内容

apache-nifi - Zookeeper 对集群 Nifi 的推荐

java - Android Studio : Unboxing of 'xxx' may produce 'java.lang.NullPointerException'

java - HashTagHelper Creator() 具有私有(private)访问权限吗?

java - 使用 Apache Beam 从 PubSubIO 获取 Pub/Sub 消息的 messageId 字段

search - 每个属性值的Elasticsearch(n)个文档

java - InnoDB 全文搜索

java - 从 Infinispan 缓存读取进入无限循环