当我在控制台上打印出收到的消息时,显示的消息全都乱七八糟,每条消息都包含 5 个字符串子消息,这些消息在控制恢复到传入消息回调之前打印在控制台上。我强烈认为这是因为在 Booksleeve 中传入的消息事件是异步引发的?
我引用了以下帖子,How does PubSub work in BookSleeve/ Redis? ,其中作者 Marc Gravell 指出了通过将完成模式设置为“PreserveOrder”来强制同步接收的能力。我已经做到了,在连接客户端之前和之后都尝试过。两者似乎都不起作用。
关于如何接收消息并按照发送消息的确切顺序在控制台上打印它们有什么想法吗?在这种情况下,我只有一个发布者。
谢谢
编辑:
下面是一些代码片段,展示了我如何发送消息和我快速编写的 Booksleeve 包装器。
这里是客户端(我有一个类似的 Client2
,它接收消息并检查顺序,但我省略了它,因为它看起来微不足道)。
class Client1
{
const string ClientId = "Client1";
private static Messaging Client { get; set; }
private static void Main(string[] args)
{
var settings = new MessagingSettings("127.0.0.1", 6379, -1, 60, 5000, 1000);
Client = new Messaging(ClientId, settings, ReceiveMessage);
Client.Connect();
Console.WriteLine("Press key to start sending messages...");
Console.ReadLine();
for (int index = 1; index <= 100; index++)
{
//I turned this off because I want to preserve
//the order even if messages are sent in rapit succession
//Thread.Sleep(5);
var msg = new MessageEnvelope("Client1", "Client2", index.ToString());
Client.SendOneWayMessage(msg);
}
Console.WriteLine("Press key to exit....");
Console.ReadLine();
Client.Disconnect();
}
private static void ReceiveMessage(MessageEnvelope msg)
{
Console.WriteLine("Message Received");
}
}
这里是库的相关代码片段:
public void Connect()
{
RequestForReplyMessageIds = new ConcurrentBag<string>();
Connection = new RedisConnection(Settings.HostName, Settings.Port, Settings.IoTimeOut);
Connection.Closed += OnConnectionClosed;
Connection.CompletionMode = ResultCompletionMode.PreserveOrder;
Connection.SetKeepAlive(Settings.PingAliveSeconds);
try
{
if (Connection.Open().Wait(Settings.RequestTimeOutMilliseconds))
{
//Subscribe to own ClientId Channel ID
SubscribeToChannel(ClientId);
}
else
{
throw new Exception("Could not connect Redis client to server");
}
}
catch
{
throw new Exception("Could not connect Redis Client to Server");
}
}
public void SendOneWayMessage(MessageEnvelope message)
{
SendMessage(message);
}
private void SendMessage(MessageEnvelope msg)
{
//Connection.Publish(msg.To, msg.GetByteArray());
Connection.Publish(msg.To, msg.GetByteArray()).Wait();
}
private void IncomingChannelSubscriptionMessage(string channel, byte[] body)
{
var msg = MessageEnvelope.GetMessageEnvelope(body);
//forward received message
ReceivedMessageCallback(msg);
//release requestMessage if returned msgId matches
string msgId = msg.MessageId;
if (RequestForReplyMessageIds.Contains(msgId))
{
RequestForReplyMessageIds.TryTake(out msgId);
}
}
public void SubscribeToChannel(string channelName)
{
if (!ChannelSubscriptions.Contains(channelName))
{
var subscriberChannel = Connection.GetOpenSubscriberChannel();
subscriberChannel.Subscribe(channelName, IncomingChannelSubscriptionMessage).Wait();
ChannelSubscriptions.Add(channelName);
}
}
最佳答案
没有看到确切你是如何检查这个的,很难评论,但我可以说的是任何线程异常都很难跟踪下载并修复,因此不太可能在 BookSleeve 中得到解决,given that it has been succeeded .然而!它将绝对在 StackExchange.Redis 中进行检查。这是我在 SE.Redis 中组装的一个装备(令人尴尬的是,它确实突出了一个小错误,在下一个版本中修复,所以 .222 或更高版本);先输出:
Subscribing...
Sending (preserved order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 2993ms
Out of order: 0
Sending (any order)...
Allowing time for delivery etc...
Checking...
Received: 500 in 341ms
Out of order: 306
(请记住,500 x 5 毫秒是 2500,因此我们不应该对 2993 毫秒或 341 毫秒的数字感到惊讶 - 这主要是我们添加到的 Thread.Sleep
的成本插入线程池使其重叠;如果我们删除它,两个循环都需要 0 毫秒,这很棒 - 但我们无法如此令人信服地看到重叠问题)
如您所见,第一次运行有正确的顺序输出;第二次运行有混合顺序,但它快十倍。那就是做琐事的时候;对于 真正的 工作,它会更加引人注目。一如既往,这是一种权衡。
这是测试装置:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using StackExchange.Redis;
static class Program
{
static void Main()
{
using (var conn = ConnectionMultiplexer.Connect("localhost"))
{
var sub = conn.GetSubscriber();
var received = new List<int>();
Console.WriteLine("Subscribing...");
const int COUNT = 500;
sub.Subscribe("foo", (channel, message) =>
{
lock (received)
{
received.Add((int)message);
if (received.Count == COUNT)
Monitor.PulseAll(received); // wake the test rig
}
Thread.Sleep(5); // you kinda need to be slow, otherwise
// the pool will end up doing everything on one thread
});
SendAndCheck(conn, received, COUNT, true);
SendAndCheck(conn, received, COUNT, false);
}
Console.WriteLine("Press any key");
Console.ReadLine();
}
static void SendAndCheck(ConnectionMultiplexer conn, List<int> received, int quantity, bool preserveAsyncOrder)
{
conn.PreserveAsyncOrder = preserveAsyncOrder;
var sub = conn.GetSubscriber();
Console.WriteLine();
Console.WriteLine("Sending ({0})...", (preserveAsyncOrder ? "preserved order" : "any order"));
lock (received)
{
received.Clear();
// we'll also use received as a wait-detection mechanism; sneaky
// note: this does not do any cheating;
// it all goes to the server and back
for (int i = 0; i < quantity; i++)
{
sub.Publish("foo", i);
}
Console.WriteLine("Allowing time for delivery etc...");
var watch = Stopwatch.StartNew();
if (!Monitor.Wait(received, 10000))
{
Console.WriteLine("Timed out; expect less data");
}
watch.Stop();
Console.WriteLine("Checking...");
lock (received)
{
Console.WriteLine("Received: {0} in {1}ms", received.Count, watch.ElapsedMilliseconds);
int wrongOrder = 0;
for (int i = 0; i < Math.Min(quantity, received.Count); i++)
{
if (received[i] != i) wrongOrder++;
}
Console.WriteLine("Out of order: " + wrongOrder);
}
}
}
}
关于c# - Redis Booksleeve 客户端,ResultCompletionMode.PreserveOrder 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22629508/