.net-core - 大众交通中的 InMemoryTransportCache 发生了什么?

标签 .net-core masstransit

大众运输中的 InMemoryTransportCache 发生了什么?现在是否有任何选项可以测试内存中两个总线控件之间的连接?

最佳答案

您想要测试,因为有 2 个不同的主机通过内存中的总线进行通信,对吗?

您可以让服务共享同一个 IBusControl 实例,并使用“ConnectRecieveEndpoint”方法注册每个使用者。

说明“ConnectReceiveEndpoint”用法的简短示例:

public class TestMessage { public string Text { get; set; } }

[Test]
public async Task Can_publish_and_receive_message()
{
    var consumer1Semaphore = new SemaphoreSlim(0);
    var consumer2Semaphore = new SemaphoreSlim(0);

    IInMemoryHost host = null; 

    var busControl = MassTransit.Bus.Factory.CreateUsingInMemory(inMemoryBusFactoryConfigurator =>
    {
        host = inMemoryBusFactoryConfigurator.Host; 

        inMemoryBusFactoryConfigurator.ReceiveEndpoint("consumer1", ep =>
        {   
            ep.Handler<TestMessage>(context =>
            {
                context.Message.Text.Should().Be("Hi");
                consumer1Semaphore.Release();
                return Task.CompletedTask;
            });
        });
    });

    host.ConnectReceiveEndpoint("consumer2", inMemoryReceiveEndpointConfigurator =>
    {
        inMemoryReceiveEndpointConfigurator.Handler<TestMessage>(context =>
        {
            context.Message.Text.Should().Be("Hi");
            consumer2Semaphore.Release();
            return Task.CompletedTask;
        });

    });

    await busControl.StartAsync();

    await busControl.Publish(new TestMessage{Text = "Hi"});

    (await consumer1Semaphore.WaitAsync(2.Seconds())).Should().BeTrue();
    (await consumer2Semaphore.WaitAsync(2.Seconds())).Should().BeTrue();

    await busControl.StopAsync();
}

测试 2 个虚构服务的简单方法; ServiceA 和 ServiceB(不在真实环境中共同托管)。下面的 IBusControlFactory 用于能够在测试中替换 Azure 服务总线传输以支持内存中传输。

public interface IBusControlFactory
{
    IBusControl Create(string queueName = null, Action<IReceiveEndpointConfigurator> configurator = null);
}

// Test replacement for a factory that normally creates a Azure Service Bus bus instance
public class Harness : IBusControlFactory, IDisposable
{
    public readonly IBusControl BusControl;

    private IInMemoryHost _host;

    public Harness() => BusControl = MassTransit.Bus.Factory.CreateUsingInMemory(configure: cfg => _host = cfg.Host);

    public static Harness Start() => new Harness();

    public IBusControl Create(string queueName = null, Action<IReceiveEndpointConfigurator> configurator = null)
    {
        if(queueName != null && configurator != null)
            _host.ConnectReceiveEndpoint(queueName, configurator);

        return BusControl;
    }

    public void Dispose() => BusControl.Stop();
}

public class ServiceA
{
    private readonly IBus _bus;

    public ServiceA(IBusControlFactory factory) => _bus = factory.Create();

    public Task SayHi() => _bus.Publish(new TestMessage { Text = "Hi" });
}

public class ServiceB
{
    public ServiceB(IBusControlFactory factory)
    {
        factory.Create("consumer2", ep =>
        {
            ep.Handler<TestMessage>(context =>
            {
                context.Message.Text.Should().Be("Hi");
                return Task.CompletedTask;
            });
        });
    }
}

[Test]
public async Task Can_publish_and_receive_message_take_2()
{
    using (var harness = Harness.Start())
    {
        var serviceA = new ServiceA(harness);
        var serviceB = new ServiceB(harness);

        var spy = new Spy();
        harness.BusControl.ConnectConsumeMessageObserver(spy);

        await serviceA.SayHi();

        await spy.WaitOne(2.Seconds());
    }            
}

public class Spy : IConsumeMessageObserver<TestMessage>
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);

    public Task PreConsume(ConsumeContext<TestMessage> context) => Task.CompletedTask;

    public Task PostConsume(ConsumeContext<TestMessage> context)
    {
        _semaphore.Release();
        return Task.CompletedTask;
    }

    public Task ConsumeFault(ConsumeContext<TestMessage> context, Exception exception) => Task.CompletedTask;
    public async Task WaitOne(TimeSpan timeout) { (await _semaphore.WaitAsync(timeout)).Should().BeTrue(); }
}

关于.net-core - 大众交通中的 InMemoryTransportCache 发生了什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54563622/

相关文章:

c# - 如何在有很多用户的 Ldap 服务器上进行分页搜索?

c# - dotnetcore api : Skipping project "/MyDotnetCoreLib/MyDotnetCoreLib.csproj" because it was not found 的 Docker 构建错误

c# - MassTransit Saga 状态到实例状态映射

c# - 如果没有订阅者在一个月内不检索消息,如何自动删除 Azure 服务总线中的队列?

masstransit - 派生类型不会发布给 MassTransit 中的消费者

rabbitmq - 使用 RabbitMQ : When is a message moved to the error queue 的大众运输

nservicebus - CQRS 项目是否需要像 NServiceBus 这样的消息传递框架?

c# - 如何让 JsonNode.ToJsonString(...) 按字母顺序生成属性?

asp.net-core - 如何在.NetCore API中的不同应用程序上验证从一个应用程序发出的AntiForgeryToken?

.net - Visual Studio Team Services 构建包还原失败,project.json netstandard1.0 目标