Python 分布式计算(有效)

标签 python sockets multiprocessing pickle distributed-computing

我正在使用旧线程发布试图解决相同问题的新代码。 什么是安全 pickle ? this?

socks .py

from socket import socket
from socket import AF_INET
from socket import SOCK_STREAM
from socket import gethostbyname
from socket import gethostname

class SocketServer:
  def __init__(self, port):
    self.sock = socket(AF_INET, SOCK_STREAM)
    self.port = port
  def listen(self, data):
    self.sock.bind(("127.0.0.1", self.port))
    self.sock.listen(len(data))
    while data:
      s = self.sock.accept()[0]
      siz, dat = data.pop()
      s.send(siz)
      s.send(dat)
      s.close()

class Socket:
  def __init__(self, host, port):
    self.sock = socket(AF_INET, SOCK_STREAM)
    self.sock.connect((host, port))
  def recv(self, size):
    return self.sock.recv(size)

包.py

#http://stackoverflow.com/questions/6234586/we-need-to-pickle-any-sort-of-callable
from marshal import dumps as marshal_dumps
from pickle import dumps as pickle_dumps
from struct import pack as struct_pack

class packer:
  def __init__(self):
    self.f = []
  def pack(self, what):
    if type(what) is type(lambda:None):
      self.f = []
      self.f.append(marshal_dumps(what.func_code))
      self.f.append(pickle_dumps(what.func_name))
      self.f.append(pickle_dumps(what.func_defaults))
      self.f.append(pickle_dumps(what.func_closure))
      self.f = pickle_dumps(self.f)
      return (struct_pack('Q', len(self.f)), self.f)

解压.py

from types import FunctionType
from pickle import loads as pickle_loads
from marshal import loads as marshal_loads
from struct import unpack as struct_unpack
from struct import calcsize

#http://stackoverflow.com/questions/6234586/we-need-to-pickle-any-sort-of-callable

class unpacker:
  def __init__(self):
    self.f = []
    self.fcompiled = lambda:None
    self.sizeofsize = calcsize('Q')
  def unpack(self, sock):
    size = struct_unpack('Q', sock.recv(self.sizeofsize))[0]
    self.f = pickle_loads(sock.recv(size))
    a = marshal_loads(self.f[0])
    b = globals() ##
    c = pickle_loads(self.f[1])
    d = pickle_loads(self.f[2])
    e = pickle_loads(self.f[3])
    self.fcompiled = FunctionType(a, b, c, d, e)
    return self.fcompiled

测试.py

from unpack import unpacker
from pack import packer
from sock import SocketServer
from sock import Socket
from threading import Thread
from time import sleep

count = 2
port = 4446

def f():
  print 42

def server():
  ss = SocketServer(port)
  pack = packer()
  functions = [pack.pack(f) for nothing in range(count)]
  ss.listen(functions)

if __name__ == "__main__":
  Thread(target=server).start()
  sleep(1)
  unpack = unpacker()
  for nothing in range(count):
    print unpack.unpack(Socket("127.0.0.1", port))

输出:

<function f at 0x12917d0>
<function f at 0x12915f0>

最佳答案

我不认为 Process 对象设计为通过网络发送。查看 multiprocessing/process.py 中的第 256 行。

# We subclass bytes to avoid accidental transmission of auth keys over network.

我觉得这是有充分理由的。如果你想做分布式计算,也许你应该看看 library designed for that .

关于Python 分布式计算(有效),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6212326/

相关文章:

c# - C++ 到 C# 桥接多个进程

python - 如何使用Keras TimeseriesGenerator为每n个训练样本获取一个验证样本?

python - youtube_dl 使用 Python 获取音频链接

java - Jeromq/ZeroMQ 服务器 (windows) - 客户端 (android) 测试

c++ - 如何设置 Winsock UDP 套接字?

python - 多重处理似乎不起作用

python - 在 sklearn LabelEncoder 中返回标签及其编码值

python - 检查使用 twistd 启动的 Twisted Server 是否成功启动

java - TCP 套接字写入错误

python - Python-调用函数和继续的主程序