我正在做一个小项目来使用 SNMP 映射网络(仅限路由器)。为了加快速度,除了由主线程完成的第一项工作外,我试图让一个线程池负责完成我需要的工作。
此时我有两个工作,一个接受参数,另一个不接受:
-
UpdateDeviceInfo(NetworkDevice nd)
-
UpdateLinks()
*尚未定义
我想要实现的是让那些正在等待工作的工作线程
出现在Queue<Action>
等待它是空的。主线程将添加第一个作业,然后等待所有可能添加更多作业的工作人员完成,然后再开始添加第二个作业并唤醒休眠线程。
我的问题/疑问是:
如何定义
Queue<Actions>
这样我就可以插入方法和参数(如果有的话)。如果不可能,我可以让所有函数都接受相同的参数。如何无限期地启动工作线程。我不确定应该在哪里创建
for(;;)
.
到目前为止,这是我的代码:
public enum DatabaseState
{
Empty = 0,
Learning = 1,
Updating = 2,
Stable = 3,
Exiting = 4
};
public class NetworkDB
{
public Dictionary<string, NetworkDevice> database;
private Queue<Action<NetworkDevice>> jobs;
private string _community;
private string _ipaddress;
private Object _statelock = new Object();
private DatabaseState _state = DatabaseState.Empty;
private readonly int workers = 4;
private Object _threadswaitinglock = new Object();
private int _threadswaiting = 0;
public Dictionary<string, NetworkDevice> Database { get => database; set => database = value; }
public NetworkDB(string community, string ipaddress)
{
_community = community;
_ipaddress = ipaddress;
database = new Dictionary<string, NetworkDevice>();
jobs = new Queue<Action<NetworkDevice>>();
}
public void Start()
{
NetworkDevice nd = SNMP.GetDeviceInfo(new IpAddress(_ipaddress), _community);
if (nd.Status > NetworkDeviceStatus.Unknown)
{
database.Add(nd.Id, nd);
_state = DatabaseState.Learning;
nd.Update(this); // The first job is done by the main thread
for (int i = 0; i < workers; i++)
{
Thread t = new Thread(JobRemove);
t.Start();
}
lock (_statelock)
{
if (_state == DatabaseState.Learning)
{
Monitor.Wait(_statelock);
}
}
lock (_statelock)
{
if (_state == DatabaseState.Updating)
{
Monitor.Wait(_statelock);
}
}
foreach (KeyValuePair<string, NetworkDevice> n in database)
{
using (System.IO.StreamWriter file = new System.IO.StreamWriter(n.Value.Name + ".txt")
{
file.WriteLine(n);
}
}
}
}
public void JobInsert(Action<NetworkDevice> func, NetworkDevice nd)
{
lock (jobs)
{
jobs.Enqueue(item);
if (jobs.Count == 1)
{
// wake up any blocked dequeue
Monitor.Pulse(jobs);
}
}
}
public void JobRemove()
{
Action<NetworkDevice> item;
lock (jobs)
{
while (jobs.Count == 0)
{
lock (_threadswaitinglock)
{
_threadswaiting += 1;
if (_threadswaiting == workers)
Monitor.Pulse(_statelock);
}
Monitor.Wait(jobs);
}
lock (_threadswaitinglock)
{
_threadswaiting -= 1;
}
item = jobs.Dequeue();
item.Invoke();
}
}
public bool NetworkDeviceExists(NetworkDevice nd)
{
try
{
Monitor.Enter(database);
if (database.ContainsKey(nd.Id))
{
return true;
}
else
{
database.Add(nd.Id, nd);
Action<NetworkDevice> action = new Action<NetworkDevice>(UpdateDeviceInfo);
jobs.Enqueue(action);
return false;
}
}
finally
{
Monitor.Exit(database);
}
}
//Job1 - Learning -> Update device info
public void UpdateDeviceInfo(NetworkDevice nd)
{
nd.Update(this);
try
{
Monitor.Enter(database);
nd.Status = NetworkDeviceStatus.Self;
}
finally
{
Monitor.Exit(database);
}
}
//Job2 - Updating -> After Learning, create links between neighbours
private void UpdateLinks()
{
}
}
最佳答案
您最好的选择似乎是使用 BlockingCollection 而不是 Queue 类。它们在 FIFO 方面的行为实际上相同,但 BlockingCollection 会让您的每个线程阻塞,直到可以通过调用 GetConsumingEnumerable 或 Take 获取一个项目。这是一个完整的例子。
http://mikehadlow.blogspot.com/2012/11/using-blockingcollection-to-communicate.html?m=1
至于包含参数,您似乎可以使用闭包来封闭 NetworkDevice 本身,然后将 Action 而不是 Action<>
关于c# - 线程监控 Queue<Actions>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45772021/