我有一个多线程网络爬虫,可以下载网站并将其存储在数据库中(大约需要 4 分钟)。为了加快爬行速度,我使用了 node.js 集群模块,但我有一个问题,我想在所有线程完成它们的过程之后迭代到 while 循环的下一段,而不是在它们开始时立即迭代。如何确保我的所有线程都已结束然后继续?
这是主while循环中的相关代码:
while (indexSize !== indexSizeLimit) {
const queueLength = queue.length;
const numberOfThreads = Math.min(numberOfCPUs, queueLength);
const threadAllocations = Array(numberOfThreads).fill(0);
let queuesAllocated = 0;
const queueChunks = [];
function fillQueueChunks() {
loop: while (true) {
for (let i = 0; i < numberOfThreads; i++) {
threadAllocations[i] += 1;
queuesAllocated += 1;
if (queuesAllocated === queueLength) {
break loop;
};
};
};
let start = 0;
for (let threadAllocation of threadAllocations) {
const end = start + threadAllocation;
queueChunks.push(queue.slice(start, end));
start = end;
};
};
fillQueueChunks();
// Find out how to make multithreading finish, and then move on with the loop.
if (cluster.isMaster) {
for (let i = 0; i < numberOfThreads; i++) {
cluster.fork();
};
} else {
const chunk = queueChunks[cluster.worker.id - 1];
await Promise.all(chunk.map(function (url) {
return new Promise(async function (resolve) {
const webcode = await request(url);
if (webcode !== "Failure") {
indexSize += 1;
const document = new Document(url, webcode);
const hrefs = document.hrefs();
const hrefsQuery = Query(hrefs);
// Also make sure it is not included in indexed webpages.
const hrefIndividualized = hrefsQuery.individualize();
hrefIndividualized;
// Do something with hrefIndividualized in regards to maintaining a queue in the database.
// And in adding a nextQueue which to replace the queue in code with.
await document.save();
};
resolve("Written");
});
}));
process.exit(0);
};
};
最佳答案
将线程包装在一个 promise 中。您可以在父线程中检查是否存在断开连接事件,并且如果断开连接的数量等于线程数,那么您可以解决 promise 。
这是我的
while (indexSize !== indexSizeLimit) {
let nextQueue = [];
const queueLength = queue.length;
const numberOfThreads = Math.min(numberOfCPUs, queueLength);
const threadAllocations = Array(numberOfThreads).fill(0);
let queuesAllocated = 0;
// queueChunks: [[{_id: ..., ...}], [...], ...]
const queueChunks = [];
function fillQueueChunks() {
loop: while (true) {
for (let i = 0; i < numberOfThreads; i++) {
threadAllocations[i] += 1;
queuesAllocated += 1;
if (queuesAllocated === queueLength) {
break loop;
};
};
};
let start = 0;
for (let threadAllocation of threadAllocations) {
const end = start + threadAllocation;
queueChunks.push(queue.slice(start, end));
start = end;
};
};
fillQueueChunks();
await new Promise(async function (resolve) {
if (cluster.isMaster) {
let threadsDone = 0;
for (let i = 0; i < numberOfThreads; i++) {
cluster.fork();
};
cluster.on("disconnect", function (_) {
threadsDone += 1;
if (threadsDone === numberOfThreads) {
resolve("Queue Processed");
};
});
} else {
const queueJob = queueChunks[cluster.id - 1];
await Promise.all(queueJob.map(function (queueItem) {
return new Promise(async function (resolve) {
const url = queueItem._id;
const webcode = await request(url);
if (webcode !== "Failure") {
const document = Document(url, webcode);
let hrefs = document.hrefs();
const hrefsQuery = Query(hrefs);
await document.save();
indexSize += 1;
hrefs = hrefsQuery.individualize();
const hrefIncidences = Promise.all(hrefs.map(function (href) {
return new Promise(async function (resolve) {
const incidences = await Site.countDocuments({
url: href
});
resolve(incidences);
});
}));
hrefs = hrefs.filter(function (_, i) {
return hrefIncidences[i] === 0;
}).map(function (href) {
return {
_id: href
};
});
await Queue.insertMany(hrefs);
nextQueue = nextQueue.concat(hrefs);
};
await Queue.deleteOne({
_id: url
});
resolve("Success");
});
}));
process.exit(0);
};
});
queue = nextQueue;
};
关于javascript - 如何在所有线程运行完毕后运行代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66218951/