我一直在尝试在 C# 中实现分布式深度优先搜索。我已经成功到一定程度,但出现同步错误。我无法纠正错误。我想做的是让每个节点使用任务并行数据流相互通信,从而在 DFS 中实现并行。下面是我的代码:
public class DFS
{
static List<string> traversedList = new List<string>();
static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();
public static void Main(string[] args)
{
int N = 100;
int M = N * 4;
int P = N * 16;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
List<string> global_list = new List<string>();
StreamReader file = new StreamReader(args[args.Length - 2]);
string text = file.ReadToEnd();
string[] lines = text.Split('\n');
string[][] array1 = new string[lines.Length][];
for (int i = 0; i < lines.Length; i++)
{
lines[i] = lines[i].Trim();
string[] words = lines[i].Split(' ');
array1[i] = new string[words.Length];
for (int j = 0; j < words.Length; j++)
{
array1[i][j] = words[j];
}
}
StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");
for (int i = 0; i < array1.Length; i++)
{
for (int j = 0; j < array1[i].Length; j++)
{
if (j != 0)
{
sr.Write(array1[i][0] + ":" + array1[i][j]);
Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
sr.Write(sr.NewLine);
}
}
}
int start_no = Convert.ToInt32(args[args.Length - 1]);
thread_array = new Thread[lines.Length];
string first_message = "root";
buffer1.Post(first_message);
buffer1.Post(array1);
buffer1.Post(start_no);
buffer1.Post(1);
for (int t = 1; t < lines.Length; t++)
{
Console.WriteLine("thread" + t);
thread_array[t] = new Thread(new ThreadStart(thread_run));
thread_array[t].Name = t.ToString();
lock (thread_array[t])
{
Console.WriteLine("working");
thread_array[t].Start();
thread_array[t].Join();
}
}
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed);
Console.ReadLine();
}
private static void dfs(string[][] array, int point)
{
for (int z = 1; z < array[point].Length; z++)
{
if ((!traversedList.Contains(array[point][z])))
{
traversedList.Add(array[point][z]);
parentList.Add(point.ToString());
dfs(array, int.Parse(array[point][z]));
}
}
return;
}
public static void thread_run()
{
try
{
string parent;
string[][] array1;
int point;
int id;
parent = (string)buffer1.Receive();
array1 = (string[][])buffer1.Receive();
point = (int)buffer1.Receive();
id = (int)buffer1.Receive();
object value;
Console.WriteLine("times");
if (Thread.CurrentThread.Name.Equals(point.ToString()))
{
if (!traversedList.Contains(point.ToString()))
{
Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
traversedList.Add(point.ToString());
parent = point.ToString();
for (int x = 1; x < array1[point].Length; x++)
{
Console.WriteLine("times");
if (buffer1.TryReceive(out value))
{
array1 = (string[][])value;
}
if (buffer1.TryReceive(out value))
{
id = (int)buffer1.Receive();
}
id++;
buffer1.Post(parent);
buffer1.Post(array1);
buffer1.Post(x);
buffer1.Post(id);
Console.WriteLine("times");
Monitor.PulseAll(Thread.CurrentThread);
}
//return;
}
else
{
buffer1.Post(parent);
buffer1.Post(array1);
buffer1.Post(point);
buffer1.Post(id);
Console.WriteLine("working 1");
Monitor.PulseAll(Thread.CurrentThread);
}
}
else
{
Console.WriteLine("working 2");
Monitor.Wait(Thread.CurrentThread);
}
//Console.WriteLine(parent);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
最佳答案
您的代码存在各种问题。
锁定和“接触”来自多个线程的 traversedList 的错误使用是最明显的问题。
更重要的是,您的代码并没有真正使用 Dataflow,它以类似于 ConcurrentQueue 或任何其他并发集合的方式使用 BufferBlock。数据流的重点是使用 ActionBlocks而不是线程来简化处理。默认情况下,操作 block 将仅使用单个线程进行处理,但您可以通过 DataflowBlockOptions 指定任意数量的线程。类(class)。
ActionBlocks 有自己的输入和输出缓冲区,所以你不必为了缓冲而添加额外的 BufferBlocks。
将多个相关值传递给 block 是另一个问题,因为它会导致错误并使代码困惑。创建一个数据结构来保存所有值不需要任何成本。
假设您使用此类来保存处理消息:
public class PointMessage
{
public string Message { get; set; }
public string[][] Lines{get;set;}
public int Point { get; set; }
public int ID { get; set; }
}
您可以像这样创建一个 ActionBlock 来处理这些消息:
static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);
然后像这样处理每条消息:
private static void ProcessMessage(PointMessage arg)
{
if (...)
{
...
arg.ID++;
_block.Post(arg);
}
else
{
...
_block.Post(arg);
}
}
如果您的函数返回一个值,您可以使用 TransformBlock而不是 ActionBlock。
我不明白你的代码做了什么,所以我不会尝试使用 DataFlow 重写它。如果稍微清理一下,就会更容易提供帮助。
关于c# - 在 C# 中使用线程构建分布式 DFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10891047/