Python TCP Payload Duplication - 同时将数据传递到多个端点

标签 python sockets tcp python-multithreading

这是我在这里的第一篇文章!

我的目标是复制单向 TCP 流的有效负载并将此有效负载同时发送到多个端点。我有一个用 Python 编写的工作原型(prototype),但是我是 Python 和 Socket 编程的新手。理想情况下,该解决方案能够在 Windows 和 *nix 环境中运行。

此原型(prototype)有效,但它会为每个缓冲区长度(当前设置为 4096 字节)创建一个新的发送 TCP 连接。这样做的主要问题是我最终会用完本地端口来发送,理想情况下我希望数据从每个传入的 TCP 流传递到一个 TCP 流出(对于每个端点)。传入的数据可以从不到 1024 字节到数百兆字节不等。

目前每 4096 字节启动一个新的传出 TCP 流。我不确定问题是否出在我的线程实现中,或者我是否遗漏了其他非常明显的问题。

在我的研究中,我发现 select() 可以提供帮助,但我不确定它是否合适,因为我可能需要处理一些传入数据并在将来的某些情况下响应发送客户端。

这是我到目前为止的代码(我尝试过的一些代码变体被注释掉了):

#!/usr/bin/python
#One way TCP payload duplication
import sys
import threading
from socket import *
bufsize = 4096
host= ''

# Methods:  
#handles sending the data to the endpoints  
def send(endpoint,port,data):
    sendSocket = socket(AF_INET, SOCK_STREAM)
    #sendSocket.setblocking(1)
    sendSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    #sendport = sendSocket.getsockname
    #print sendport
    try:
        sendSocket.connect((endpoint, port))
        sendSocket.sendall(data)
    except IOError as msg:
        print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1]
        sys.exit()  

#handles threading for sending data to endpoints
def forward(service, ENDPOINT_LIST, port, data):
    #for each endpoint in the endpoint list start a new send thread
    for endpoint in ENDPOINT_LIST:
        print "Forwarding data for %s from %s:%s to %s:%s" % (service,host,port,endpoint,port)
        #send(endpoint,port,data)
        ethread = threading.Thread(target=send, args=(endpoint,port,data))
        ethread.start()

#handles threading for incoming clients 
def clientthread(conn,service,ENDPOINT_LIST,port):
    while True:
        #receive data form client
        data = conn.recv(bufsize)
        if not data:
            break
        cthread = threading.Thread(target=forward, args=(service, ENDPOINT_LIST, port, data))
        cthread.start()
    #no data? then close the connection
    conn.close()

#handles listening to sockets for incoming connections
def listen(service, ENDPOINT_LIST, port):
    #create the socket
    listenSocket = socket(AF_INET, SOCK_STREAM)
    #Allow reusing addresses - I think this is important to stop local ports getting eaten up by never-ending tcp streams that don't close
    listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    #try to bind the socket to host and port
    try:
        listenSocket.bind((host, port))
    #display an error message if you can't
    except IOError as msg:
        print "Bind Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1]
        sys.exit()
    #start listening on the socket
    listenSocket.listen(10)
    print "Service %s on port %s is listening" %(service,port)
    while True:
        #wait to accept a connection
        conn, addr = listenSocket.accept()
        print 'Connected to ' + addr[0] + ':' + str(addr[1]) + ' on port ' + str(port)
        #start new thread for each connection
        lthread = threading.Thread(target=clientthread , args=(conn,service,ENDPOINT_LIST,port))
        lthread.start()
    #If no data close the connection
    listenSocket.close()

service = "Dumb-one-way-tcp-service-name1"  
ENDPOINT_LIST = ["192.168.1.100","192.168.1.200"]
port = 55551    
listen(service,ENDPOINT_LIST,port)

我研究了其他库以尝试实现我的目标,包括使用:

  • 扭曲
  • 异步
  • 斯皮皮

但是,对于我的适度需求和编程技能水平而言,我发现它们相当复杂。

如果有人对我如何改进我的方法或实现此目标的任何其他方式有任何建议,请告诉我!

最佳答案

简而言之,您的问题是端口不够,对吗?看来你发送后没有关闭套接字。在 send() 中试试这个:

...
except IOError as msg:
    print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1]
    sys.exit() 
finally:
    sendSocket.close()

关于Python TCP Payload Duplication - 同时将数据传递到多个端点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26292909/

相关文章:

python - 如何获取列表的最后一个索引?

.net - 在 onconnect 例程中调用 winform 时出错

C语言。 TCP 服务器-客户端,字符串传递错误

scalability - 构建 "realtime bidding"广告交易平台需要什么样的软件架构?

基于日期示例过滤 DataFrame 的 Pythonic 方法

python - Python 是否保留对在列表中打开的文件的引用?

Python 3.3 C-API 和 UTF-8 字符串

c++ - MinGW 中的套接字

spring - 是否有任何 spring 集成 tcp-inbound-channel-adapter 示例?

c - C连接值重新分配中的TCP/IP