我有一个持续运行的 Azure Webjob,它基于队列触发器触发。该队列包含需要写入我的 lucene 索引的项目列表。目前,我的队列中有很多项目(超过 500k 行项目),我正在寻找最有效的方法来处理它。当我尝试“扩展”网络作业时,我不断收到 IndexWriter Lock 异常。
当前设置:
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
网络作业功能
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
var azureDirectory = new AzureDirectory(CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString), "megadata");
var findexExists = IndexReader.IndexExists(azureDirectory);
var count = items.Count;
IndexWriter indexWriter = null;
int errors = 0;
while (indexWriter == null && errors < 10)
{
try
{
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
}
catch (LockObtainFailedException)
{
log.WriteLine("Lock is taken, Hit 'Y' to clear the lock, or anything else to try again");
errors++;
}
};
if (errors >= 10)
{
azureDirectory.ClearLock("write.lock");
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
log.WriteLine("IndexWriter lock obtained, this process has exclusive write access to index");
indexWriter.SetRAMBufferSizeMB(10.0);
// Parallel.ForEach(items, (itm) =>
//{
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
//});
}
更新索引项的方法基本上如下所示:
private static void AddtoIndex(ListingItem item, IndexWriter indexWriter)
{
var doc = new Document();
doc.Add(new Field("id", item.URL, Field.Store.NO, Field.Index.NOT_ANALYZED, Field.TermVector.NO));
var title = new Field("Title", item.Title, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.YES);
indexWriter.UpdateDocument(new Term("id", item.URL), doc);
}
我尝试过的事情:
- 将 azure 配置批量大小设置为最大 32
- 使方法异步并使用 Task.WhenAll
- 使用并行 for 循环
当我尝试上述操作时,通常会失败:
Lucene.Net.Store.LockObtainFailedException: Lucene.Net.Store.LockObtainFailedException: Lock obtain timed out: <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="ffbe858a8d9ab3909c94bf888d968b9ad193909c94" rel="noreferrer noopener nofollow">[email protected]</a>.
at Lucene.Net.Store.Lock.Obtain(Int64 lockWaitTimeout) in d:\Lucene.Net\FullRepo\trunk\src\core\Store\Lock.cs:line 97
at Lucene.Net.Index.IndexWriter.Init(Directory d, Analyzer
关于如何在架构上设置此 Web 作业,以便它可以处理队列中的更多项目,而不是一项一项地执行,有什么建议吗?他们需要写入相同的索引? 谢谢
最佳答案
当多个进程尝试同时写入 Lucene 索引时,您会遇到 Lucene 语义问题。扩展 azure 应用程序、使用任务或并行 for 循环只会导致问题,因为当时只有一个进程应该写入 Lucene 索引。
架构上这是你应该做的。
- 确保任何时候只有一个 Webjobs 实例在运行 – 即使是 Web 应用程序是否可扩展(例如通过自动缩放)
- 使用最大网络作业批量大小 (32)
- 在每个批处理后提交 Lucene 索引以最小化 I/O
通过将 settings.job 文件添加到 webjob 项目,确保只能完成一个 webjob 实例。将构建操作设置为内容并复制到输出目录。将以下 JSON 添加到文件中
{ "is_singleton": true }
将 webjob 批处理站点配置为最大
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
每批后提交 Lucene 索引
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
...
indexWriter = new IndexWriter(azureDirectory, …);
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
indexWriter.Commit();
}
这只会在提交 Lucene 索引时写入存储帐户,从而加快索引过程。此外,Webjob 批处理还将加快消息处理速度(随时间推移处理的消息数量,而不是单个消息处理时间)。
您可以添加检查以查看 Lucene 索引是否已锁定(write.lock 文件存在)并在批处理开始时解锁索引。这不应该发生,但一切都可能发生,所以我会添加它以确保。
您可以通过使用更大的 Web 应用实例(里程可能会有所不同)并使用 Azure 高级存储等更快的存储来进一步加快索引过程。
您可以阅读有关 internals of Lucene indexes on Azure on my blog 的更多信息.
关于c# - lucene.net IndexWriter 和 Azure WebJob,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34145840/