sql - PostgreSQL 并发事务问题

标签 sql postgresql concurrency transactions locking

我目前正在构建一个爬虫。多个爬行 worker 访问同一个 PostgreSQL 数据库。遗憾的是,我遇到了此处介绍的主要事务的问题:

BEGIN ISOLATION LEVEL SERIALIZABLE;
    UPDATE webpages
    SET locked = TRUE
    WHERE url IN 
        (
            SELECT DISTINCT ON (source) url
            FROM webpages
            WHERE
                (
                    last IS NULL
                    OR
                    last < refreshFrequency
                )
                AND
                locked = FALSE
            LIMIT limit
        )
    RETURNING *;
COMMIT;
  • url 是一个 URL(字符串)
  • source 是域名(字符串)
  • last 是上次抓取页面的时间(日期)
  • locked 是一个 bool 值,设置为指示当前正在抓取网页( bool 值)

我尝试了两种不同的事务隔离级别:

  • 隔离级别可串行化,我收到诸如由于并发更新而无法序列化访问之类的错误
  • 隔离级别读取已提交,我从并发事务中收到重复的url,因为数据从事务首次提交时就被“卡住”(我认为)

总的来说,我对 PostgreSQL 和 SQL 还很陌生,所以我真的不确定如何解决这个问题。

更新:
PostgreSQL 版本为 9.2.x。
网页表定义:

CREATE TABLE webpages (
  last timestamp with time zone,
  locked boolean DEFAULT false,
  url text NOT NULL,
  source character varying(255) PRIMARY KEY
);

最佳答案

澄清

这个问题留下了解释的空间。这是我对任务的理解:

锁定最多 limit 个满足某些条件且尚未锁定的 URL。为了分散源上的负载,每个 URL 都应该来自不同的源。

数据库设计

假设有一个单独的表:这使得工作更快更容易。如果您没有这样的表,请创建它,无论如何它都是正确的设计:

CREATE TABLE source (
  source_id serial NOT NULL PRIMARY KEY
, source    text NOT NULL
);

CREATE TABLE webpage (
  source_id int NOT NULL REFERENCES source
  url       text NOT NULL PRIMARY KEY
  locked    boolean NOT NULL DEFAULT false,        -- may not be needed
  last      timestamp NOT NULL DEFAULT '-infinity' -- makes query simpler
);

或者,您可以有效地使用递归 CTE:

带有咨询锁的基本解决方案

我正在使用advisory locks即使在默认的已提交读隔离级别下,也可以使其安全且廉价:

UPDATE webpage w
SET    locked = TRUE
FROM  (
   SELECT (SELECT url
           FROM   webpage
           WHERE  source_id = s.source_id
           AND   (last >= refreshFrequency) IS NOT TRUE
           AND    locked = FALSE
           AND    pg_try_advisory_xact_lock(url)  -- only true is free
           LIMIT  1     -- get 1 URL per source
          ) AS url
   FROM  (
      SELECT source_id  -- the FK column in webpage
      FROM   source
      ORDER  BY random()
      LIMIT  limit      --  random selection of "limit" sources
      ) s
   FOR    UPDATE
   ) l
WHERE  w.url = l.url
RETURNING *;

或者,您可以使用咨询锁,而根本不使用表列locked。基本上只需运行 SELECT 语句即可。锁将一直保留到事务结束。您可以使用 pg_try_advisory_lock() 来保持锁定直到 session 结束。完成后仅UPDATE一次设置last(并可能释放咨询锁)。

其他要点

  • 在 Postgres 9.3 或更高版本中,您将使用 LATERAL 连接而不是相关子查询。

  • 我选择了pg_try_advisory_xact_lock(),因为锁可以(并且应该)在事务结束时释放。咨询锁详细解释:

  • 如果某些来源没有更多可供抓取的网址,您将获得少于limit 行。

  • 随机选择来源是我的疯狂但有根据的猜测,因为信息不可用。如果您的表很大,还有更快的方法:

  • refreshFrequency 实际上应该被称为 lastest_last 之类的东西,因为它不是“频率”,而是一个 timestamp日期

递归替代

要获取完整的限制行数如果可用,请使用RECURSIVE CTE并迭代所有源,直到找到足够的或没有更多的行可以找到。

正如我上面提到的,您可能根本不需要列locked,并且仅使用咨询锁进行操作(更便宜)。只需在交易结束时设置 last,然后再开始下一轮。

WITH RECURSIVE s AS (
   SELECT source_id, row_number() OVER (ORDER BY random()) AS rn
   FROM source  -- you might exclude "empty" sources early ...
   )
, page(source_id, rn, ct, url) AS (
   SELECT 0, 0, 0, ''::text   -- dummy init row
   UNION ALL
   SELECT s.source_id, s.rn
        , CASE WHEN t.url <> ''
               THEN p.ct + 1
               ELSE p.ct END  -- only inc. if url found last round
        , (SELECT url
           FROM   webpage
           WHERE  source_id = t.source_id
           AND   (last >= refreshFrequency) IS NOT TRUE
           AND    locked = FALSE  -- may not be needed
           AND    pg_try_advisory_xact_lock(url)  -- only true is free
           LIMIT  1           -- get 1 URL per source
          ) AS url            -- try, may come up empty
   FROM   page p
   JOIN   s ON s.rn = p.rn + 1
   WHERE  CASE WHEN p.url <> ''
               THEN p.ct + 1
               ELSE p.ct END < limit  -- your limit here
   )
SELECT url
FROM   page
WHERE  url <> '';             -- exclude '' and NULL

或者,如果您还需要管理锁定,请将此查询与上述UPDATE结合使用。

进一步阅读

您一定会喜欢即将推出的 Postgres 9.5 中的SKIP LOCKED:

相关:

关于sql - PostgreSQL 并发事务问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29807033/

相关文章:

sql - Ruby on Rails : Finding the sum of a column in a returned search query

按日期查询销售报告的SQL

sql - 如何加速pg_trgm?

c# - 添加到通用字典会导致 IndexOutOfRangeException

sql - 如何检查类型属性是否已经存在

MySQL:源错误 2?找不到文件

sql - 我可以避免在不必要的时间内锁定行吗 [在 Django 中]?

sql - postgreSQL 内部连接位

java - Camel Split EIP 和池中线程的使用似乎没有超过最小线程

Java volatile 和/或 synchronized