c# - 线程监控 Queue<Actions>

标签 c# multithreading snmp

我正在做一个小项目来使用 SNMP 映射网络(仅限路由器)。为了加快速度,除了由主线程完成的第一项工作外,我试图让一个线程池负责完成我需要的工作。

此时我有两个工作,一个接受参数,另一个不接受:

  • UpdateDeviceInfo(NetworkDevice nd)
  • UpdateLinks() *尚未定义

我想要实现的是让那些正在等待工作的工作线程 出现在Queue<Action>等待它是空的。主线程将添加第一个作业,然后等待所有可能添加更多作业的工作人员完成,然后再开始添加第二个作业并唤醒休眠线程。

我的问题/疑问是:

  • 如何定义 Queue<Actions>这样我就可以插入方法和参数(如果有的话)。如果不可能,我可以让所有函数都接受相同的参数。

  • 如何无限期地启动工作线程。我不确定应该在哪里创建 for(;;) .

到目前为止,这是我的代码:

public enum DatabaseState
{
    Empty = 0,
    Learning = 1,
    Updating = 2,
    Stable = 3,
    Exiting = 4
};

public class NetworkDB
{
    public Dictionary<string, NetworkDevice> database;
    private Queue<Action<NetworkDevice>> jobs;
    private string _community;
    private string _ipaddress;

    private Object _statelock = new Object();
    private DatabaseState _state = DatabaseState.Empty;

    private readonly int workers = 4;
    private Object _threadswaitinglock = new Object();
    private int _threadswaiting = 0;

    public Dictionary<string, NetworkDevice> Database { get => database; set => database = value; }

    public NetworkDB(string community, string ipaddress)
    {
        _community = community;
        _ipaddress = ipaddress;
        database = new Dictionary<string, NetworkDevice>();
        jobs = new Queue<Action<NetworkDevice>>();
    }

    public void Start()
    {
        NetworkDevice nd = SNMP.GetDeviceInfo(new IpAddress(_ipaddress), _community);
        if (nd.Status > NetworkDeviceStatus.Unknown)
        {
            database.Add(nd.Id, nd);
            _state = DatabaseState.Learning;
            nd.Update(this); // The first job is done by the main thread 

            for (int i = 0; i < workers; i++)
            {
                Thread t = new Thread(JobRemove);
                t.Start();
            }

            lock (_statelock)
            {
                if (_state == DatabaseState.Learning)
                {
                    Monitor.Wait(_statelock);
                }
            }

            lock (_statelock)
            {
                if (_state == DatabaseState.Updating)
                {
                    Monitor.Wait(_statelock);
                }
            }

            foreach (KeyValuePair<string, NetworkDevice> n in database)
            {
                using (System.IO.StreamWriter file = new System.IO.StreamWriter(n.Value.Name + ".txt")
                {
                    file.WriteLine(n);

                }
            }
        }
    }

    public void JobInsert(Action<NetworkDevice> func, NetworkDevice nd)
    {
        lock (jobs)
        {
            jobs.Enqueue(item);
            if (jobs.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.Pulse(jobs);
            }
        }
    }

    public void JobRemove()
    {
        Action<NetworkDevice> item;
        lock (jobs)
        {
            while (jobs.Count == 0)
            {
                lock (_threadswaitinglock)
                {
                    _threadswaiting += 1;
                    if (_threadswaiting == workers)
                        Monitor.Pulse(_statelock);
                }
                Monitor.Wait(jobs);
            }

            lock (_threadswaitinglock)
            {
                _threadswaiting -= 1;
            }

            item = jobs.Dequeue();
            item.Invoke();
        }
    }

    public bool NetworkDeviceExists(NetworkDevice nd)
    {
        try
        {
            Monitor.Enter(database);
            if (database.ContainsKey(nd.Id))
            {
                return true;
            }
            else
            {
                database.Add(nd.Id, nd);
                Action<NetworkDevice> action = new Action<NetworkDevice>(UpdateDeviceInfo);
                jobs.Enqueue(action);
                return false;
            }
        }
        finally
        {

            Monitor.Exit(database);
        }
    }

    //Job1 - Learning -> Update device info
    public void UpdateDeviceInfo(NetworkDevice nd)
    {
        nd.Update(this);
        try
        {
            Monitor.Enter(database);
            nd.Status = NetworkDeviceStatus.Self;
        }
        finally
        {
            Monitor.Exit(database);
        }
    }

    //Job2 - Updating -> After Learning, create links between neighbours
    private void UpdateLinks()
    {

    }
}

最佳答案

您最好的选择似乎是使用 BlockingCollection 而不是 Queue 类。它们在 FIFO 方面的行为实际上相同,但 BlockingCollection 会让您的每个线程阻塞,直到可以通过调用 GetConsumingEnumerable 或 Take 获取一个项目。这是一个完整的例子。

http://mikehadlow.blogspot.com/2012/11/using-blockingcollection-to-communicate.html?m=1

至于包含参数,您似乎可以使用闭包来封闭 NetworkDevice 本身,然后将 Action 而不是 Action<>

关于c# - 线程监控 Queue<Actions>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45772021/

相关文章:

c# - Quartz.Net在Mono中启动线程卡住系统

c# - SNMP:创建自定义 OID

c# - 使用多边形注释在图表中绘制

multithreading - 互斥量在忙碌时被销毁

c# - 在 WCF 中使用非 WCF 自定义类

ios - 在主线程 Swift 上调用时 UI 不更新

mysql - 将 SNMP GET 输出存储到 MySQL 数据库

windows - 我如何从 windows box 监视 linux 服务器的资源?

c# - 在 .Net 中阅读 PDF 文档

c# - 是否可以从第一个元素以外的元素开始迭代使用 foreach?