在下面的应用中,
Producer
方法将消息添加到blocking
集合。- 在
Consumer
方法中,我正在使用blocking
集合并将messages
添加到list
以及大小>= 240,将该列表写入 json 文件。
有时我在 blocking
集合中没有任何新消息,但在 Consumer
中,我有一个消息列表,其大小不大于等于 240 ,那么在这种情况下,应用无法写入新的 JSON 文件(其余数据)。
我怎样才能让消费者
知道没有新消息出现,把你剩下的任何东西写到一个新文件中?
这可能吗?假设 Consumer
将等待 1 分钟,如果没有新消息,则将剩余的内容写入新文件?
这是代码(这里我添加了 11 条消息。直到 9 条消息,批处理大小为 240,它生成了一个文件,但是第 10 条和第 11 条消息无法写入新文件),
class Program
{
private static List<Batch> batchList = new List<Batch>();
private static BlockingCollection<Message> messages = new BlockingCollection<Message>();
private static int maxbatchsize = 240;
private static int currentsize = 0;
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
foreach (var message in messages.GetConsumingEnumerable())
{
var msg = JsonConvert.SerializeObject(message);
Console.WriteLine(msg);
if (currentsize + msg.Length >= maxbatchsize)
{
WriteToFile(batchList);
}
batchList.Add(new Batch { Message = message });
currentsize += msg.Length;
}
}
private static void WriteToFile(List<Batch> batchList)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(batchList));
}
batchList.Clear();
currentsize = 0;
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
}
支持类,
public class Message
{
public int Id { get; set; }
public string Name { get; set; }
}
public class Batch
{
public Message Message { get; set; }
}
最佳答案
更新:
class Program
{
private static readonly List<Batch> BatchList = new List<Batch>();
private static readonly BlockingCollection<Message> Messages = new BlockingCollection<Message>();
private const int Maxbatchsize = 240;
private static int _currentsize;
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
Messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
Messages.CompleteAdding();
}
private static void Consumer()
{
foreach (var message in Messages.GetConsumingEnumerable())
{
if (_currentsize >= Maxbatchsize)
{
var listToWrite = new Batch[BatchList.Count];
BatchList.CopyTo(listToWrite);
BatchList.Clear();
_currentsize = 0;
WriteToFile(listToWrite.ToList());
}
else
{
Thread.Sleep(1000);
if (Messages.IsAddingCompleted)
{
var remainSize = Messages.Select(JsonConvert.SerializeObject).Sum(x => x.Length);
if (remainSize == 0)
{
var lastMsg = JsonConvert.SerializeObject(message);
BatchList.Add(new Batch { Message = message });
_currentsize += lastMsg.Length;
Console.WriteLine(lastMsg);
var additionListToWrite = new Batch[BatchList.Count];
BatchList.CopyTo(additionListToWrite);
BatchList.Clear();
_currentsize = 0;
WriteToFile(additionListToWrite.ToList());
break;
}
}
}
var msg = JsonConvert.SerializeObject(message);
BatchList.Add(new Batch { Message = message });
_currentsize += msg.Length;
Console.WriteLine(msg);
}
}
private static void WriteToFile(List<Batch> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
关于c# - 如何使用消息批处理大小不大于等于 240 的重命名消息创建最后一个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54095455/