javascript - 如何在所有线程运行完毕后运行代码?

标签 javascript node.js multithreading

我有一个多线程网络爬虫,可以下载网站并将其存储在数据库中(大约需要 4 分钟)。为了加快爬行速度,我使用了 node.js 集群模块,但我有一个问题,我想在所有线程完成它们的过程之后迭代到 while 循环的下一段,而不是在它们开始时立即迭代。如何确保我的所有线程都已结束然后继续?

这是主while循环中的相关代码:

while (indexSize !== indexSizeLimit) {
        const queueLength = queue.length;
        const numberOfThreads = Math.min(numberOfCPUs, queueLength);
        const threadAllocations = Array(numberOfThreads).fill(0);
        let queuesAllocated = 0;
        const queueChunks = [];

        function fillQueueChunks() {
          loop: while (true) {
            for (let i = 0; i < numberOfThreads; i++) {
              threadAllocations[i] += 1;
              queuesAllocated += 1;

              if (queuesAllocated === queueLength) {
                break loop;
              };
            };
          };

          let start = 0;

          for (let threadAllocation of threadAllocations) {
            const end = start + threadAllocation;

            queueChunks.push(queue.slice(start, end));

            start = end;
          };
        };

        fillQueueChunks();
        
        // Find out how to make multithreading finish, and then move on with the loop.
        if (cluster.isMaster) {
          for (let i = 0; i < numberOfThreads; i++) {
            cluster.fork();
          };
        } else {
          const chunk = queueChunks[cluster.worker.id - 1];

          await Promise.all(chunk.map(function (url) {
            return new Promise(async function (resolve) {
              const webcode = await request(url);

              if (webcode !== "Failure") {
                indexSize += 1;

                const document = new Document(url, webcode);
                const hrefs = document.hrefs();
                const hrefsQuery = Query(hrefs);
                // Also make sure it is not included in indexed webpages.
                const hrefIndividualized = hrefsQuery.individualize();

                hrefIndividualized;
                
                // Do something with hrefIndividualized in regards to maintaining a queue in the database.
                // And in adding a nextQueue which to replace the queue in code with.
                
                await document.save();
              };

              resolve("Written");
            });
          }));

          process.exit(0);
        };
      };

最佳答案

将线程包装在一个 promise 中。您可以在父线程中检查是否存在断开连接事件,并且如果断开连接的数量等于线程数,那么您可以解决 promise 。

这是我的

while (indexSize !== indexSizeLimit) {
        let nextQueue = [];
        const queueLength = queue.length;
        const numberOfThreads = Math.min(numberOfCPUs, queueLength);
        const threadAllocations = Array(numberOfThreads).fill(0);
        let queuesAllocated = 0;
        // queueChunks: [[{_id: ..., ...}], [...], ...]
        const queueChunks = [];

        function fillQueueChunks() {
          loop: while (true) {
            for (let i = 0; i < numberOfThreads; i++) {
              threadAllocations[i] += 1;
              queuesAllocated += 1;

              if (queuesAllocated === queueLength) {
                break loop;
              };
            };
          };

          let start = 0;

          for (let threadAllocation of threadAllocations) {
            const end = start + threadAllocation;

            queueChunks.push(queue.slice(start, end));

            start = end;
          };
        };

        fillQueueChunks();

        await new Promise(async function (resolve) {
          if (cluster.isMaster) {
            let threadsDone = 0;

            for (let i = 0; i < numberOfThreads; i++) {
              cluster.fork();
            };

            cluster.on("disconnect", function (_) {
              threadsDone += 1;

              if (threadsDone === numberOfThreads) {
                resolve("Queue Processed");
              };
            });
          } else {
            const queueJob = queueChunks[cluster.id - 1];

            await Promise.all(queueJob.map(function (queueItem) {
              return new Promise(async function (resolve) {
                const url = queueItem._id;
                const webcode = await request(url);

                if (webcode !== "Failure") {
                  const document = Document(url, webcode);
                  let hrefs = document.hrefs();
                  const hrefsQuery = Query(hrefs);

                  await document.save();

                  indexSize += 1; 
                  hrefs = hrefsQuery.individualize();

                  const hrefIncidences = Promise.all(hrefs.map(function (href) {
                    return new Promise(async function (resolve) {
                      const incidences = await Site.countDocuments({
                        url: href
                      });

                      resolve(incidences);
                    });
                  }));

                  hrefs = hrefs.filter(function (_, i) {
                    return hrefIncidences[i] === 0;
                  }).map(function (href) {
                    return {
                      _id: href
                    };
                  });

                  await Queue.insertMany(hrefs);

                  nextQueue = nextQueue.concat(hrefs);
                };

                await Queue.deleteOne({
                  _id: url
                }); 

                resolve("Success");
              });
            }));

            process.exit(0);
          };
        });

        queue = nextQueue;
      };

关于javascript - 如何在所有线程运行完毕后运行代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66218951/

相关文章:

javascript - 创建一个 'remove div' 链接

javascript - 如何在表格列中显示一个按钮元素

node.js - 登录时 Passport 反序列化多次

node.js - NodeJS redis 连接 session ID 在应该保持不变时重新生成

android线程管理onPause

JavaFX 多线程

javascript - 显示模式弹出窗口 10 秒或直到设置标志或变量

javascript - 如何更改标签的文本?

javascript - 删除包含对象的数组 MongoDB

java - 如何在多线程中使用spring事务