c# - 从不同线程访问后释放 BrokeredMessage

标签 c# multithreading azure servicebus

这可能是 this question 的重复项但这与批处理数据库更新的讨论相混淆,并且仍然没有正确的答案。

在使用 Azure 服务总线队列的简单示例中,我无法在将 BrokeredMessage 放入队列后访问它;如果我从另一个线程读取队列,它总是被处理。

示例代码:

class Program {
    private static string _serviceBusConnectionString = "XXX";

    private static BlockingCollection<BrokeredMessage> _incomingMessages = new BlockingCollection<BrokeredMessage>();
    private static CancellationTokenSource _cancelToken = new CancellationTokenSource();

    private static QueueClient _client;

    static void Main(string[] args) {

        // Set up a few listeners on different threads
        Task.Run(async () => {
            while (!_cancelToken.IsCancellationRequested) {
                var msg = _incomingMessages.Take(_cancelToken.Token);
                if (msg != null) {
                    try {
                        await msg.CompleteAsync();
                        Console.WriteLine($"Completed Message Id: {msg.MessageId}");
                    } catch (ObjectDisposedException) {
                        Console.WriteLine("Message was disposed!?");
                    }
                }
            }
        });


        // Now set up our service bus reader
        _client = GetQueueClient("test");

        _client.OnMessageAsync(async (message) => {
            await Task.Run(() => _incomingMessages.Add(message));
        },
        new OnMessageOptions() {
            AutoComplete = false
        });

        // Now start sending
        Task.Run(async () => {
            int sent = 0;
            while (!_cancelToken.IsCancellationRequested) {
                var msg = new BrokeredMessage();
                await _client.SendAsync(msg);
                Console.WriteLine($"Sent {++sent}");
                await Task.Delay(1000);
            }
        });

        Console.ReadKey();
        _cancelToken.Cancel();

    }

    private static QueueClient GetQueueClient(string queueName) {

        var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString);
        if (!namespaceManager.QueueExists(queueName)) {
            var settings = new QueueDescription(queueName);
            settings.MaxDeliveryCount = 10;
            settings.LockDuration = TimeSpan.FromSeconds(5);
            settings.EnableExpress = true;
            settings.EnablePartitioning = true;
            namespaceManager.CreateQueue(settings);
        }

        var factory = MessagingFactory.CreateFromConnectionString(_serviceBusConnectionString);
        factory.RetryPolicy = new RetryExponential(minBackoff: TimeSpan.FromSeconds(0.1), maxBackoff: TimeSpan.FromSeconds(30), maxRetryCount: 100);
        var queueClient = factory.CreateQueueClient(queueName);

        return queueClient;
    }
}

我尝试过设置,但无法使其正常工作。有什么想法吗?

最佳答案

通过 Serkant Karaca @ Microsoft 的回复回答我自己的问题 here :

Very basic rule and I am not sure if this is documented. The received message needs to be processed in the callback function's life time. In your case, messages will be disposed when async callback completes, this is why your complete attempts are failing with ObjectDisposedException in another thread.

I don't really see how queuing messages for further processing helps on the throughput. This will add more burden to client for sure. Try processing the message in the async callback, that should be performant enough.

SCSS 。

关于c# - 从不同线程访问后释放 BrokeredMessage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40739063/

相关文章:

java - 如何在 run 方法之外发送对象?

python - 分页异步迭代器协议(protocol)不可用(适用于 Python 的 Azure SDK)

c# - 如何从 XML 文档读取值以构建组合框?

c# - 等待第三方应用程序窗口加载

c# - 不是表单时使用 InvokeRequired

python - 收到错误代码 : SubscriptionNotFound Message: The subscription not found when trying to get data about Azure Virtual Machine?

Azure 调度程序 : first working day of every month

c# - &lt;script runat ="server"> 与代码隐藏文件

c# - WinForms TreeView 检查/取消检查层次结构

c# - 将值从主窗口传递到子窗口,然后从子窗口返回多个值到主窗口