我目前正在构建一个爬虫。多个爬行 worker 访问同一个 PostgreSQL 数据库。遗憾的是,我遇到了此处介绍的主要事务的问题:
BEGIN ISOLATION LEVEL SERIALIZABLE;
UPDATE webpages
SET locked = TRUE
WHERE url IN
(
SELECT DISTINCT ON (source) url
FROM webpages
WHERE
(
last IS NULL
OR
last < refreshFrequency
)
AND
locked = FALSE
LIMIT limit
)
RETURNING *;
COMMIT;
url
是一个 URL(字符串)source
是域名(字符串)last
是上次抓取页面的时间(日期)locked
是一个 bool 值,设置为指示当前正在抓取网页( bool 值)
我尝试了两种不同的事务隔离级别:
隔离级别可串行化
,我收到诸如由于并发更新而无法序列化访问
之类的错误隔离级别读取已提交
,我从并发事务中收到重复的url
,因为数据从事务首次提交时就被“卡住”(我认为)
总的来说,我对 PostgreSQL 和 SQL 还很陌生,所以我真的不确定如何解决这个问题。
更新:
PostgreSQL 版本为 9.2.x。
网页
表定义:
CREATE TABLE webpages (
last timestamp with time zone,
locked boolean DEFAULT false,
url text NOT NULL,
source character varying(255) PRIMARY KEY
);
最佳答案
澄清
这个问题留下了解释的空间。这是我对任务的理解:
锁定最多 limit
个满足某些条件且尚未锁定的 URL。为了分散源上的负载,每个 URL 都应该来自不同的源。
数据库设计
假设有一个单独的表源
:这使得工作更快更容易。如果您没有这样的表,请创建它,无论如何它都是正确的设计:
CREATE TABLE source (
source_id serial NOT NULL PRIMARY KEY
, source text NOT NULL
);
CREATE TABLE webpage (
source_id int NOT NULL REFERENCES source
url text NOT NULL PRIMARY KEY
locked boolean NOT NULL DEFAULT false, -- may not be needed
last timestamp NOT NULL DEFAULT '-infinity' -- makes query simpler
);
或者,您可以有效地使用递归 CTE:
带有咨询锁的基本解决方案
我正在使用advisory locks即使在默认的已提交读
隔离级别下,也可以使其安全且廉价:
UPDATE webpage w
SET locked = TRUE
FROM (
SELECT (SELECT url
FROM webpage
WHERE source_id = s.source_id
AND (last >= refreshFrequency) IS NOT TRUE
AND locked = FALSE
AND pg_try_advisory_xact_lock(url) -- only true is free
LIMIT 1 -- get 1 URL per source
) AS url
FROM (
SELECT source_id -- the FK column in webpage
FROM source
ORDER BY random()
LIMIT limit -- random selection of "limit" sources
) s
FOR UPDATE
) l
WHERE w.url = l.url
RETURNING *;
或者,您可以仅使用咨询锁,而根本不使用表列locked
。基本上只需运行 SELECT
语句即可。锁将一直保留到事务结束。您可以使用 pg_try_advisory_lock() 来保持锁定直到 session 结束。完成后仅UPDATE
一次设置last
(并可能释放咨询锁)。
其他要点
在 Postgres 9.3 或更高版本中,您将使用
LATERAL
连接而不是相关子查询。我选择了
pg_try_advisory_xact_lock()
,因为锁可以(并且应该)在事务结束时释放。咨询锁详细解释:如果某些来源没有更多可供抓取的网址,您将获得少于
limit
行。随机选择来源是我的疯狂但有根据的猜测,因为信息不可用。如果您的
源
表很大,还有更快的方法:refreshFrequency
实际上应该被称为lastest_last
之类的东西,因为它不是“频率”,而是一个timestamp
或日期
。
递归替代
要获取完整的限制行数如果可用,请使用RECURSIVE
CTE并迭代所有源,直到找到足够的或没有更多的行可以找到。
正如我上面提到的,您可能根本不需要列locked
,并且仅使用咨询锁进行操作(更便宜)。只需在交易结束时设置 last
,然后再开始下一轮。
WITH RECURSIVE s AS (
SELECT source_id, row_number() OVER (ORDER BY random()) AS rn
FROM source -- you might exclude "empty" sources early ...
)
, page(source_id, rn, ct, url) AS (
SELECT 0, 0, 0, ''::text -- dummy init row
UNION ALL
SELECT s.source_id, s.rn
, CASE WHEN t.url <> ''
THEN p.ct + 1
ELSE p.ct END -- only inc. if url found last round
, (SELECT url
FROM webpage
WHERE source_id = t.source_id
AND (last >= refreshFrequency) IS NOT TRUE
AND locked = FALSE -- may not be needed
AND pg_try_advisory_xact_lock(url) -- only true is free
LIMIT 1 -- get 1 URL per source
) AS url -- try, may come up empty
FROM page p
JOIN s ON s.rn = p.rn + 1
WHERE CASE WHEN p.url <> ''
THEN p.ct + 1
ELSE p.ct END < limit -- your limit here
)
SELECT url
FROM page
WHERE url <> ''; -- exclude '' and NULL
或者,如果您还需要管理锁定
,请将此查询与上述UPDATE
结合使用。
进一步阅读
您一定会喜欢即将推出的 Postgres 9.5 中的SKIP LOCKED
:
相关:
关于sql - PostgreSQL 并发事务问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29807033/