我正在使用 Python 实现嗅探器。 我需要所有数据包详细信息(VLAN 等),所以我使用的是 RAW 套接字。 在我所有的以太网接口(interface)上都有几个嗅探器作为恶魔工作,所以每个都在不同的线程中。
除了我添加数据包的全局变量(如以下代码所示)之外,是否有更好的方法在我的主线程中获取结果?
我尝试了 Queue,但除了增加了复杂性之外没有发现任何特别的好处。
import socket, threading
def ReceivePackets():
soc = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
while 1:
packet = soc.recv(2000)
buffer.append(packet)
def ReadSniffer():
result = list(buffer)
#Clear buffer to start sniffing from scratch after reading
del buffer[:]
return result
我在程序开始时在所有以太网接口(interface)上启动嗅探器,并在需要数据时读取相应的全局缓冲区。
buffer = []
t = threading.Thread(target = ReceivePackets)
t.daemon = True
t.start()
...
packets = ReadSniffer()
最佳答案
对于套接字,我总是得到不一致的结果,因为我有各种数据包大小不同的数据包。套接字可能会捕获我发送几次的所有数据包,但最终它会丢失一些数据包。
我将 pcapy
接口(interface)转移到 libcap
库而不是原始套接字,它可以完美且非常可靠地工作。
我将 Sniffer 类实现为 threading.Thread
的子类,因此它在一个单独的线程中启动。实现是这样的:
class Sniffer(threading.Thread):
def __init__(self, port):
...
threading.Thread.__init__(self)
#Packets will be stored here
self.result = []
#Not worry about closing the sniffer thread
self.daemon = True
#Invoked when I start my thread
def run(self):
max_bytes = 16000
#Important to capture broken packets as I need them
promiscuous = True
read_timeout = 100
pc = pcapy.open_live(str(self.port), max_bytes, promiscuous, read_timeout)
pc.loop(-1, self.recv_pkts)
#Do when the packet arrives
def recv_pkts(self, hdr, packet):
packetHex = binascii.hexlify(packet)
self.result.append(packetHex)
return self.result
要与主程序并行启动嗅探器线程:
sniffer1 = Sniffer(eth1)
#Start separate thread
sniffer1.start()
关于python - 多线程Python程序中高效的抓包方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22102189/