我正在序列化列数据,然后通过套接字连接发送它。 像这样的东西:
import array, struct, socket
## Socket setup
s = socket.create_connection((ip, addr))
## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
## Binarize data
columns['col1'] = array.array('i', range(10000))
columns['col2'] = array.array('f', [float(num) for num in range(10000)])
.
.
.
## Send away
chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
s.sendall(chunk)
s.recv(1000) #get confirmation
我希望将计算与发送分开,将它们放在不同的线程或进程中,这样我就可以在发送数据的同时继续进行计算。
我将二值化部分作为生成器函数,然后将生成器发送到一个单独的线程,然后通过队列生成二进制 block 。
我从主线程收集数据并发送出去。像这样的东西:
import array, struct, socket
from time import sleep
try:
import thread
from Queue import Queue
except:
import _thread as thread
from queue import Queue
## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()
def binarize(num_of_chunks):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
.
.
yield b''.join((columns[col_name] for col_name in ordered_col_list))
def chunk_yielder(queue):
''' Generate binary chunks and put them on a queue. To be used from a thread '''
while True:
try:
data_gen = queue.get_nowait()
except:
sleep(0.1)
continue
else:
for chunk in data_gen:
queue.put(chunk)
## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)
## Get data back and send away
while True:
try:
binary_chunk = queue.get_nowait()
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) #Get confirmation
但是,我没有看到性能改进 - 它并没有运行得更快。
我不太了解线程/进程,我的问题是是否有可能(在 Python 中)从这种类型的分离中获益,以及什么是解决它的好方法,要么使用线程或进程(或任何其他方式 - 异步等)。
编辑:
据我所知-
- Multirpocessing 需要序列化所有发送的数据,因此我会双重发送每个计算数据。
- 通过
socket.send()
发送应该释放 GIL
因此我认为(如果我弄错了请纠正我)线程解决方案是正确的方法。但是我不确定如何正确地做到这一点。
我知道 cython 可以从线程中释放 GIL,但由于其中之一只是 socket.send/recv,我的理解是它没有必要。
最佳答案
在 Python 中并行运行有两种选择,要么使用 multiprocessing
( docs ) 库,要么在 cython
中编写并行代码并发布吉尔。后者的工作量要多得多,而且一般来说不太适用。
Python 线程受全局解释器锁 ( GIL ) 的限制,我不会在这里详细介绍,因为您可以在网上找到足够多的信息。简而言之,顾名思义,GIL 是 CPython 解释器中的全局锁,可确保多个线程不会同时修改位于所述解释器范围内的对象。这就是为什么,例如,cython
程序可以并行运行代码,因为它们可以存在于 GIL 之外。
关于您的代码,一个问题是您在 GIL 中同时运行数字运算 (binarize
) 和 socket.send
,这将运行它们严格连续。 queue
连接也很奇怪,有一个 NameError
但让我们把这些放在一边。
考虑到 Jeremy Friesner 已经指出的注意事项,我建议您按以下方式重新构建代码:您有两个进程(不是线程),一个用于二进制化数据,另一个用于发送数据。除此之外,还有启动两个子进程的父进程,以及连接子进程 1 和子进程 2 的队列。
- Subprocess-1 进行数字运算并将运算后的数据放入队列中
- Subprocess-2 使用队列中的数据并执行
socket.send
在代码中设置看起来像
from multiprocessing import Process, Queue
work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()
binarize
可以保持代码中的原样,除了在末尾不是 yield
,而是将元素添加到队列中
def binarize(num_of_chunks, q):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
data = b''.join((columns[col_name] for col_name in ordered_col_list))
q.put(data)
send_data
应该只是代码底部的 while
循环,具有连接打开/关闭功能
def send_data(ip, addr, q):
s = socket.create_connection((ip, addr))
while True:
try:
binary_chunk = q.get(False)
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) # Get confirmation
# maybe remember to close the socket before killing the process
现在您有两个(如果算上父进程,实际上是三个)独立处理数据的进程。您可以通过将队列的 max_size
设置为单个元素来强制两个进程同步它们的操作。这两个独立进程的运行也很容易从你电脑上的进程管理器top
(Linux),Activity Monitor
(OsX)监控,不记得是什么了在 Windows 下调用。
最后,Python 3 提供了使用协同例程的选项,它们既不是进程也不是线程,而是完全不同的东西。从 CS 的角度来看,协程非常酷,但一开始有点让人头疼。虽然有很多资源可以学习,比如 this在 Medium 和 this 上发帖David Beazley 的演讲。
更一般地说,如果您还不熟悉生产者/消费者模式,您可能想要了解它。
关于python - 将计算与 Python 中的套接字工作分开,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48143565/