c# - 线程锁内的多线程

标签 c# multithreading elasticsearch .net-core nest

我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是几百万个)发布到Elasticsearch。在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示:

var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));

fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });

fetchRecords.Post("Start");

然后我要实现的发送批量请求调用:
public IBulkResponse sendBulkRequest(List<?> records)
{
    lock(SomeStaticObject)
    {
       // Execute several new threads to send records in bulk
    }
}

我的问题是在锁中执行附加线程的实用性,该锁是数据流管道的一部分。

这个可以吗?我可以在性能,执行,缓存/内存丢失等方面看到任何潜在的问题吗?

任何见识都会很高兴地被接受。

最佳答案

您可能需要在此处使用BulkAll,该代码实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子

void Main()
{   
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var connectionSettings = new ConnectionSettings(pool);

    var client = new ElasticClient(connectionSettings);
    var indexName = "bulk-index";

    if (client.IndexExists(indexName).Exists)
        client.DeleteIndex(indexName);

    client.CreateIndex(indexName, c => c
        .Settings(s => s
            .NumberOfShards(3)
            .NumberOfReplicas(0)
        )
        .Mappings(m => m
            .Map<DeviceStatus>(p => p.AutoMap())
        )
    );

    var size = 500;

    // set up the observable
    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
        .Index(indexName)
        .MaxDegreeOfParallelism(4)
        .RefreshOnCompleted()
        .Size(size)
    );

    var countdownEvent = new CountdownEvent(1);

    Exception exception = null;

    // set up an observer. Delegates passed are:
    // 1. onNext
    // 2. onError
    // 3. onCompleted
    var bulkAllObserver = new BulkAllObserver(
        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
        ex => 
        {
            // capture exception for throwing outside Observer.
            // You may decide to do something different here
            exception = ex;
            countdownEvent.Signal();
        },
        () => 
        {
            Console.WriteLine("Finished");
            countdownEvent.Signal();
        });

    // subscribe to the observable          
    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait indefinitely for it to finish. May want to put a
    // max timeout on this  
    countdownEvent.Wait();

    if (exception != null) 
    {
        throw exception;
    }
}

// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
    for (var i = 0; i < DocumentCount; i++)
        yield return new DeviceStatus(i); 
}

private const int DocumentCount = 20000;

public class DeviceStatus
{
    public DeviceStatus(int id) => Id = id;
    public int Id {get;set;}
}

如果您不需要在观察者中做任何特别的事情,可以在可观察对象上使用.Wait()方法
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
    .Index(indexName)
    .MaxDegreeOfParallelism(4)
    .RefreshOnCompleted()
    .Size(size)
)
.Wait(
    TimeSpan.FromHours(1), 
    response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);
BulkAllScrollAllReindex有一些可观察到的方法(尽管有ReindexOnServer在Elasticsearch中重新编制索引并映射到the Reindex API-Reindex方法早于此)

关于c# - 线程锁内的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48202652/

相关文章:

c# - QueryString 真的不区分大小写吗?

multithreading - 英特尔和AMD使用哪种缓存一致性协议(protocol)?

c# - 如何从 C# 中的字符串 XSD 获取 XmlSchema 对象?

C# 线程机制

java - 在多线程环境中使用 apache HttpClients 的最佳方式

python - 使用pyqtgraph使用外部数据进行绘图

python - Elasticsearch 与 python : query specific field

json - 如何从Json文件中使用Logstash获取TimeStamp? JSON中有多个日期字段

elasticsearch - 每次连续显示多次时,如何将具有相同字段值的结果限制为X个文档

c# - 使用 mvvm-light 消息在非单例 View 模型的多个实例之间进行通信?