c# - 在 C# 中使用线程构建分布式 DFS

标签 c# multithreading distributed-computing depth-first-search tpl-dataflow

我一直在尝试在 C# 中实现分布式深度优先搜索。我已经成功到一定程度,但出现同步错误。我无法纠正错误。我想做的是让每个节点使用任务并行数据流相互通信,从而在 DFS 中实现并行。下面是我的代码:

public class DFS
{
static List<string> traversedList = new List<string>();

static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

public static void Main(string[] args)
{

    int N = 100;
    int M = N * 4;
    int P = N * 16;

    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();

    List<string> global_list = new List<string>();

    StreamReader file = new StreamReader(args[args.Length - 2]);


    string text = file.ReadToEnd();

    string[] lines = text.Split('\n');



    string[][] array1 = new string[lines.Length][];

    for (int i = 0; i < lines.Length; i++)
    {
        lines[i] = lines[i].Trim();
        string[] words = lines[i].Split(' ');

        array1[i] = new string[words.Length];

        for (int j = 0; j < words.Length; j++)
        {
            array1[i][j] = words[j];
        }
    }

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

    for (int i = 0; i < array1.Length; i++)
    {
        for (int j = 0; j < array1[i].Length; j++)
        {
            if (j != 0)
            {
                sr.Write(array1[i][0] + ":" + array1[i][j]);
                Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                sr.Write(sr.NewLine);
            }
        }

    }
    int start_no = Convert.ToInt32(args[args.Length - 1]);
    thread_array = new Thread[lines.Length];
    string first_message = "root";
    buffer1.Post(first_message);
    buffer1.Post(array1);
    buffer1.Post(start_no);
    buffer1.Post(1);

    for (int t = 1; t < lines.Length; t++)
    {
        Console.WriteLine("thread" + t);
        thread_array[t] = new Thread(new ThreadStart(thread_run));
        thread_array[t].Name = t.ToString();
        lock (thread_array[t])
        {
            Console.WriteLine("working");
            thread_array[t].Start();
            thread_array[t].Join();
        }

    }
    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed);
    Console.ReadLine();
}

private static void dfs(string[][] array, int point)
{
    for (int z = 1; z < array[point].Length; z++)
    {
        if ((!traversedList.Contains(array[point][z])))
        {
            traversedList.Add(array[point][z]);
            parentList.Add(point.ToString());
            dfs(array, int.Parse(array[point][z]));
        }

    }
    return;


}
public static void thread_run()
{
    try
    {
        string parent;
        string[][] array1;
        int point;
        int id;
        parent = (string)buffer1.Receive();
        array1 = (string[][])buffer1.Receive();
        point = (int)buffer1.Receive();
        id = (int)buffer1.Receive();
        object value;
        Console.WriteLine("times");

        if (Thread.CurrentThread.Name.Equals(point.ToString()))
        {
            if (!traversedList.Contains(point.ToString()))
            {
                Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
                traversedList.Add(point.ToString());
                parent = point.ToString();
                for (int x = 1; x < array1[point].Length; x++)
                {
                    Console.WriteLine("times");
                    if (buffer1.TryReceive(out value))
                    {
                        array1 = (string[][])value;
                    }
                    if (buffer1.TryReceive(out value))
                    {
                        id = (int)buffer1.Receive();
                    }
                    id++;
                    buffer1.Post(parent);
                    buffer1.Post(array1);
                    buffer1.Post(x);
                    buffer1.Post(id);
                    Console.WriteLine("times");
                    Monitor.PulseAll(Thread.CurrentThread);
                }

                //return;
            }
            else
            {
                buffer1.Post(parent);
                buffer1.Post(array1);
                buffer1.Post(point);
                buffer1.Post(id);
                Console.WriteLine("working 1");
                Monitor.PulseAll(Thread.CurrentThread);
            }
        }
        else
        {
            Console.WriteLine("working 2");
            Monitor.Wait(Thread.CurrentThread);
        }
        //Console.WriteLine(parent);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

}

}

enter image description here

最佳答案

您的代码存在各种问题。

锁定和“接触”来自多个线程的 traversedList 的错误使用是最明显的问题。

更重要的是,您的代码并没有真正使用 Dataflow,它以类似于 ConcurrentQueue 或任何其他并发集合的方式使用 BufferBlock。数据流的重点是使用 ActionBlocks而不是线程来简化处理。默认情况下,操作 block 将仅使用单个线程进行处理,但您可以通过 DataflowBlockOptions 指定任意数量的线程。类(class)。

ActionBlocks 有自己的输入和输出缓冲区,所以你不必为了缓冲而添加额外的 BufferBlocks。

将多个相关值传递给 block 是另一个问题,因为它会导致错误并使代码困惑。创建一个数据结构来保存所有值不需要任何成本。

假设您使用此类来保存处理消息:

    public class PointMessage
    {
        public string Message { get; set; }
        public string[][] Lines{get;set;}
        public int Point { get; set; }
        public int ID { get; set; }
    }

您可以像这样创建一个 ActionBlock 来处理这些消息:

static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);

然后像这样处理每条消息:

    private static void ProcessMessage(PointMessage arg)
    {
        if (...)
        {
            ...
            arg.ID++;
            _block.Post(arg);
        }
        else
        {
             ...
            _block.Post(arg);
        }
    }

如果您的函数返回一个值,您可以使用 TransformBlock而不是 ActionBlock。

我不明白你的代码做了什么,所以我不会尝试使用 DataFlow 重写它。如果稍微清理一下,就会更容易提供帮助。

关于c# - 在 C# 中使用线程构建分布式 DFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10891047/

相关文章:

java - 跨多个进程均匀分布数据库记录

java - 在分布式服务架构中使用 System.currentTime?

multithreading - 最终 Web 开发框架的功能要求?

c# - 向 ASP.NET Identity 中的 ApplicationUser 类添加关系(数据库优先)

c# - 将 C# 结构转换为 F#

c# - 运行时 C# 中 Lambda 表达式中的多个条件

java - 不必要的 Java 上下文切换

distributed-computing - CAP 定理是转移注意力吗?

c++ - 我应该在调用 pdpotri() 之前设置屏障吗?

c# - 关于SQL函数GETDATE()的问题