c# - 如何使用消息批处理大小不大于等于 240 的重命名消息创建最后一个文件

标签 c#

在下面的应用中,

  1. Producer 方法将消息添加到 blocking 集合。
  2. Consumer 方法中,我正在使用 blocking 集合并将 messages 添加到 list 以及大小>= 240,将该列表写入 json 文件。

有时我在 blocking 集合中没有任何新消息,但在 Consumer 中,我有一个消息列表,其大小不大于等于 240 ,那么在这种情况下,应用无法写入新的 JSON 文件(其余数据)。

我怎样才能让消费者知道没有新消息出现,把你剩下的任何东西写到一个新文件中?

这可能吗?假设 Consumer 将等待 1 分钟,如果没有新消息,则将剩余的内容写入新文件?

这是代码(这里我添加了 11 条消息。直到 9 条消息,批处理大小为 240,它生成了一个文件,但是第 10 条和第 11 条消息无法写入新文件),

class Program
{
    private static List<Batch> batchList = new List<Batch>();
    private static BlockingCollection<Message> messages = new BlockingCollection<Message>();

    private static int maxbatchsize = 240;
    private static int currentsize = 0;
    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        foreach (var message in messages.GetConsumingEnumerable())
        {
            var msg = JsonConvert.SerializeObject(message);

            Console.WriteLine(msg);

            if (currentsize + msg.Length >= maxbatchsize)
            {
                WriteToFile(batchList);
            }

            batchList.Add(new Batch { Message = message });
            currentsize += msg.Length;
        }
    }

    private static void WriteToFile(List<Batch> batchList)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(batchList));
        }

        batchList.Clear();
        currentsize = 0;
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
    }
}
}

支持类,

 public class Message
{
    public int Id { get; set; }
    public string Name { get; set; }
}

public class Batch
{
    public Message Message { get; set; }
}

最佳答案

更新:

class Program
    {
        private static readonly List<Batch> BatchList = new List<Batch>();
        private static readonly BlockingCollection<Message> Messages = new BlockingCollection<Message>();

        private const int Maxbatchsize = 240;
        private static int _currentsize;

        private static void Producer()
        {
            int ctr = 1;
            while (ctr <= 11)
            {
                Messages.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
                Thread.Sleep(1000);
                ctr++;
            }
            Messages.CompleteAdding();
        }

        private static void Consumer()
        {
            foreach (var message in Messages.GetConsumingEnumerable())
            {
                if (_currentsize >= Maxbatchsize)
                {
                    var listToWrite = new Batch[BatchList.Count];
                    BatchList.CopyTo(listToWrite);
                    BatchList.Clear();
                    _currentsize = 0;
                    WriteToFile(listToWrite.ToList());
                }
                else
                {
                    Thread.Sleep(1000);
                    if (Messages.IsAddingCompleted)
                    {
                        var remainSize = Messages.Select(JsonConvert.SerializeObject).Sum(x => x.Length);
                        if (remainSize == 0)
                        {
                            var lastMsg = JsonConvert.SerializeObject(message);
                            BatchList.Add(new Batch { Message = message });
                            _currentsize += lastMsg.Length;
                            Console.WriteLine(lastMsg);
                            var additionListToWrite = new Batch[BatchList.Count];
                            BatchList.CopyTo(additionListToWrite);
                            BatchList.Clear();
                            _currentsize = 0;
                            WriteToFile(additionListToWrite.ToList());
                            break;
                        }
                    }
                }

                var msg = JsonConvert.SerializeObject(message);
                BatchList.Add(new Batch { Message = message });
                _currentsize += msg.Length;
                Console.WriteLine(msg);
            }
        }

        private static void WriteToFile(List<Batch> listToWrite)
        {
            using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
            {
                outFile.Write(JsonConvert.SerializeObject(listToWrite));
            }
        }

        static void Main(string[] args)
        {
            var producer = Task.Factory.StartNew(() => Producer());
            var consumer = Task.Factory.StartNew(() => Consumer());
            Console.Read();
        }
    }

关于c# - 如何使用消息批处理大小不大于等于 240 的重命名消息创建最后一个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54095455/

相关文章:

c# - 从 Outlook 获取签名并检测正文格式

c# - 如何忽略第一个小数点后的第一个值(如果它是 0),否则如果大于 0,则取它?

c# - 如何在 C# 中为类的静态默认属性分配默认值?

c# - 将 SQL 查询映射到 Nhibernate 中的业务对象

c# - CLR 中的托管堆

c# - 从动态类型使方法泛型

c# - 输出到标签时有没有办法保留格式?

c# - C#/VB.NET 中的缩略图

c# - 作用范围参数

c# - UPS 地址街道级别验证