python - 将计算与 Python 中的套接字工作分开

标签 python multithreading sockets concurrency

我正在序列化列数据,然后通过套接字连接发送它。 像这样的东西:

import array, struct, socket

## Socket setup
s = socket.create_connection((ip, addr))

## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)

for i in range(num_of_chunks):
    ## Binarize data
    columns['col1'] = array.array('i', range(10000))
    columns['col2'] = array.array('f', [float(num) for num in range(10000)])
    .
    .
    .

    ## Send away
    chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
    s.sendall(chunk)
    s.recv(1000)      #get confirmation

我希望将计算与发送分开,将它们放在不同的线程或进程中,这样我就可以在发送数据的同时继续进行计算。

我将二值化部分作为生成器函数,然后将生成器发送到一个单独的线程,然后通过队列生成二进制 block 。

我从主线程收集数据并发送出去。像这样的东西:

import array, struct, socket
from time import sleep
try:
    import  thread
    from Queue import Queue
except:
    import _thread as thread
    from queue import Queue


## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()


def binarize(num_of_chunks):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)

    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        .
        .

        yield b''.join((columns[col_name] for col_name in ordered_col_list))


def chunk_yielder(queue):
    ''' Generate binary chunks and put them on a queue. To be used from a thread '''

    while True:   
        try:
            data_gen = queue.get_nowait()
        except:
            sleep(0.1)
            continue
        else:    
            for chunk in data_gen:
                queue.put(chunk)


## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)


## Get data back and send away
while True:
   try:
        binary_chunk = queue.get_nowait()
    except:
        sleep(0.1)
        continue
    else:    
        socket.sendall(binary_chunk)
        socket.recv(1000) #Get confirmation

但是,我没有看到性能改进 - 它并没有运行得更快。

我不太了解线程/进程,我的问题是是否有可能(在 Python 中)从这种类型的分离中获益,以及什么是解决它的好方法,要么使用线程或进程(或任何其他方式 - 异步等)。

编辑:

据我所知-

  1. Multirpocessing 需要序列化所有发送的数据,因此我会双重发送每个计算数据。
  2. 通过 socket.send() 发送应该释放 GIL

因此我认为(如果我弄错了请纠正我)线程解决方案是正确的方法。但是我不确定如何正确地做到这一点。

我知道 cython 可以从线程中释放 GIL,但由于其中之一只是 socket.send/recv,我的理解是它没有必要。

最佳答案

在 Python 中并行运行有两种选择,要么使用 multiprocessing ( docs ) 库,要么在 cython 中编写并行代码并发布吉尔。后者的工作量要多得多,而且一般来说不太适用。

Python 线程受全局解释器锁 ( GIL ) 的限制,我不会在这里详细介绍,因为您可以在网上找到足够多的信息。简而言之,顾名思义,GIL 是 CPython 解释器中的全局锁,可确保多个线程不会同时修改位于所述解释器范围内的对象。这就是为什么,例如,cython 程序可以并行运行代码,因为它们可以存在于 GIL 之外


关于您的代码,一个问题是您在 GIL 中同时运行数字运算 (binarize) 和 socket.send,这将运行它们严格连续。 queue 连接也很奇怪,有一个 NameError 但让我们把这些放在一边。

考虑到 Jeremy Friesner 已经指出的注意事项,我建议您按以下方式重新构建代码:您有两个进程(不是线程),一个用于二进制化数据,另一个用于发送数据。除此之外,还有启动两个子进程的父进程,以及连接子进程 1 和子进程 2 的队列。

  • Subprocess-1 进行数字运算并将运算后的数据放入队列中
  • Subprocess-2 使用队列中的数据并执行 socket.send

在代码中设置看起来像

from multiprocessing import Process, Queue

work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()

binarize 可以保持代码中的原样,除了在末尾不是 yield,而是将元素添加到队列中

def binarize(num_of_chunks, q):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)
    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        data = b''.join((columns[col_name] for col_name in ordered_col_list))
        q.put(data)

send_data 应该只是代码底部的 while 循环,具有连接打开/关闭功能

def send_data(ip, addr, q):
     s = socket.create_connection((ip, addr))
     while True:
         try:
             binary_chunk = q.get(False)
         except:
             sleep(0.1)
             continue
         else:    
             socket.sendall(binary_chunk)
             socket.recv(1000) # Get confirmation
    # maybe remember to close the socket before killing the process

现在您有两个(如果算上父进程,实际上是三个)独立处理数据的进程。您可以通过将队列的 max_size 设置为单个元素来强制两个进程同步它们的操作。这两个独立进程的运行也很容易从你电脑上的进程管理器top(Linux),Activity Monitor(OsX)监控,不记得是什么了在 Windows 下调用。


最后,Python 3 提供了使用协同例程的选项,它们既不是进程也不是线程,而是完全不同的东西。从 CS 的角度来看,协程非常酷,但一开始有点让人头疼。虽然有很多资源可以学习,比如 this在 Medium 和 this 上发帖David Beazley 的演讲。


更一般地说,如果您还不熟悉生产者/消费者模式,您可能想要了解它。

关于python - 将计算与 Python 中的套接字工作分开,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48143565/

相关文章:

python - 在 Python 中查找元音的第一次出现

python - 如何正确广播 NumPy 数组的数组索引

linux - 如何在 Haskell 中为多播套接字指定本地绑定(bind)接口(interface)?

c - 带 c 的双向套接字(非阻塞)

c# - 使用.net套接字的文件,传输问题

python - Opencv:使用 python 导入 highgui

python - Flask:获取模块中每个 View 类的 URL

Java线程和run方法

Java线程生命周期

Java : Out Of Memory Error when my application runs for longer time