大众运输中的 InMemoryTransportCache 发生了什么?现在是否有任何选项可以测试内存中两个总线控件之间的连接?
最佳答案
您想要测试,因为有 2 个不同的主机通过内存中的总线进行通信,对吗?
您可以让服务共享同一个 IBusControl 实例,并使用“ConnectRecieveEndpoint”方法注册每个使用者。
说明“ConnectReceiveEndpoint”用法的简短示例:
public class TestMessage { public string Text { get; set; } }
[Test]
public async Task Can_publish_and_receive_message()
{
var consumer1Semaphore = new SemaphoreSlim(0);
var consumer2Semaphore = new SemaphoreSlim(0);
IInMemoryHost host = null;
var busControl = MassTransit.Bus.Factory.CreateUsingInMemory(inMemoryBusFactoryConfigurator =>
{
host = inMemoryBusFactoryConfigurator.Host;
inMemoryBusFactoryConfigurator.ReceiveEndpoint("consumer1", ep =>
{
ep.Handler<TestMessage>(context =>
{
context.Message.Text.Should().Be("Hi");
consumer1Semaphore.Release();
return Task.CompletedTask;
});
});
});
host.ConnectReceiveEndpoint("consumer2", inMemoryReceiveEndpointConfigurator =>
{
inMemoryReceiveEndpointConfigurator.Handler<TestMessage>(context =>
{
context.Message.Text.Should().Be("Hi");
consumer2Semaphore.Release();
return Task.CompletedTask;
});
});
await busControl.StartAsync();
await busControl.Publish(new TestMessage{Text = "Hi"});
(await consumer1Semaphore.WaitAsync(2.Seconds())).Should().BeTrue();
(await consumer2Semaphore.WaitAsync(2.Seconds())).Should().BeTrue();
await busControl.StopAsync();
}
测试 2 个虚构服务的简单方法; ServiceA 和 ServiceB(不在真实环境中共同托管)。下面的 IBusControlFactory 用于能够在测试中替换 Azure 服务总线传输以支持内存中传输。
public interface IBusControlFactory
{
IBusControl Create(string queueName = null, Action<IReceiveEndpointConfigurator> configurator = null);
}
// Test replacement for a factory that normally creates a Azure Service Bus bus instance
public class Harness : IBusControlFactory, IDisposable
{
public readonly IBusControl BusControl;
private IInMemoryHost _host;
public Harness() => BusControl = MassTransit.Bus.Factory.CreateUsingInMemory(configure: cfg => _host = cfg.Host);
public static Harness Start() => new Harness();
public IBusControl Create(string queueName = null, Action<IReceiveEndpointConfigurator> configurator = null)
{
if(queueName != null && configurator != null)
_host.ConnectReceiveEndpoint(queueName, configurator);
return BusControl;
}
public void Dispose() => BusControl.Stop();
}
public class ServiceA
{
private readonly IBus _bus;
public ServiceA(IBusControlFactory factory) => _bus = factory.Create();
public Task SayHi() => _bus.Publish(new TestMessage { Text = "Hi" });
}
public class ServiceB
{
public ServiceB(IBusControlFactory factory)
{
factory.Create("consumer2", ep =>
{
ep.Handler<TestMessage>(context =>
{
context.Message.Text.Should().Be("Hi");
return Task.CompletedTask;
});
});
}
}
[Test]
public async Task Can_publish_and_receive_message_take_2()
{
using (var harness = Harness.Start())
{
var serviceA = new ServiceA(harness);
var serviceB = new ServiceB(harness);
var spy = new Spy();
harness.BusControl.ConnectConsumeMessageObserver(spy);
await serviceA.SayHi();
await spy.WaitOne(2.Seconds());
}
}
public class Spy : IConsumeMessageObserver<TestMessage>
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
public Task PreConsume(ConsumeContext<TestMessage> context) => Task.CompletedTask;
public Task PostConsume(ConsumeContext<TestMessage> context)
{
_semaphore.Release();
return Task.CompletedTask;
}
public Task ConsumeFault(ConsumeContext<TestMessage> context, Exception exception) => Task.CompletedTask;
public async Task WaitOne(TimeSpan timeout) { (await _semaphore.WaitAsync(timeout)).Should().BeTrue(); }
}
关于.net-core - 大众交通中的 InMemoryTransportCache 发生了什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54563622/