Python 多线程和 PostgreSQL

标签 python multithreading postgresql psycopg2

我想加快我的一项任务,我写了一个小程序:

import psycopg2 
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def write_sim_to_db(all_ids2):
    if all_ids1[i] != all_ids2:
        c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
        count = c.fetchone()
        if count[0] == 0:
            sim_sum = random.random()
            c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum) 
                    VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
            conn.commit()

conn = psycopg2.connect("dbname='db' user='user' host='localhost' password='pass'")
c = conn.cursor()

all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))

for i in range(len(all_ids1)):
    with ThreadPoolExecutor(max_workers=5) as pool:
        results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]

有一段时间,程序运行正常。但后来我得到一个错误:

Segmentation fault (core dumped)

或者

*** Error in `python3': double free or corruption (out): 0x00007fe574002270 ***
Aborted (core dumped)

如果我在一个线程中运行这个程序,效果会很好。

with ThreadPoolExecutor(max_workers=1) as pool:

Postgresql 似乎没有时间处理事务。但我不确定。在日志文件中有任何错误。

我不知道如何找到错误。 帮助。

最佳答案

我不得不使用连接池。

import psycopg2 
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from psycopg2.pool import ThreadedConnectionPool

def write_sim_to_db(all_ids2):
    if all_ids1[i] != all_ids2:
        conn = tcp.getconn()
        c = conn.cursor()
        c.execute("""SELECT count(*) FROM similarity WHERE prod_id1 = %s AND prod_id2 = %s""", (all_ids1[i], all_ids2,))
        count = c.fetchone()
        if count[0] == 0:
            sim_sum = random.random()
            c.execute("""INSERT INTO similarity(prod_id1, prod_id2, sim_sum) 
                    VALUES(%s, %s, %s)""", (all_ids1[i], all_ids2, sim_sum,))
            conn.commit()
        tcp.putconn(conn)

DSN = "postgresql://user:pass@localhost/db"
tcp = ThreadedConnectionPool(1, 10, DSN)

all_ids1 = list(n for n in range(1000))
all_ids2_list = list(n for n in range(1000))

for i in range(len(all_ids1)):
    with ThreadPoolExecutor(max_workers=2) as pool:
        results = [pool.submit(write_sim_to_db, i) for i in all_ids2_list]

关于Python 多线程和 PostgreSQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34815650/

相关文章:

python - Python 实现中的 Sha-3

java - while(true) ServerSocket Listen 的效率

等待用户 IO ('getchar()' 的 c++ 线程在主进程中挂起 'Py_Initialize()'

java - 用线程获取素数。如何划分区间?

sql - 用于检测重叠时间范围的 PostgreSQL 查询

postgresql - 存储过程 : how to check if NULL is present in postgresql array or not?

sql - 生成 SQL 以更新主键

python - 如何在 Tkinter 文本小部件中将文本居中?

python - 在上下文管理器中运行子命令

python - 自动sklearn安装错误