python - 将 python 2 代码移植到 Python 3 : ICMP Scan with errors

标签 python python-2.7 python-3.x

import random
import socket
import time
import ipaddress
import struct

from threading import Thread


def checksum(source_string):
    sum = 0
    count_to = (len(source_string) / 2) * 2
    count = 0
    while count < count_to:
        this_val = ord(source_string[count + 1]) * 256 + ord(source_string[count])
        sum = sum + this_val
        sum = sum & 0xffffffff
        count = count + 2
    if count_to < len(source_string):
        sum = sum + ord(source_string[len(source_string) - 1])
        sum = sum & 0xffffffff
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


def create_packet(id):
    header = struct.pack('bbHHh', 8, 0, 0, id, 1)
    data = 192 * 'Q'
    my_checksum = checksum(header + data)
    header = struct.pack('bbHHh', 8, 0, socket.htons(my_checksum), id, 1)
    return header + data


def ping(addr, timeout=1):
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    except Exception as e:
        print (e)
    packet_id = int((id(timeout) * random.random()) % 65535)
    packet = create_packet(packet_id)
    my_socket.connect((addr, 80))
    my_socket.sendall(packet)
    my_socket.close()


def rotate(addr, file_name, wait, responses):
    print ("Sending Packets", time.strftime("%X %x %Z"))
    for ip in addr:
        ping(str(ip))
        time.sleep(wait)
    print ("All packets sent", time.strftime("%X %x %Z"))

    print ("Waiting for all responses")
    time.sleep(2)

    # Stoping listen
    global SIGNAL
    SIGNAL = False
    ping('127.0.0.1')  # Final ping to trigger the false signal in listen

    print (len(responses), "hosts found!")
    print ("Writing File")
    hosts = []
    for response in sorted(responses):
        ip = struct.unpack('BBBB', response)
        ip = str(ip[0]) + "." + str(ip[1]) + "." + str(ip[2]) + "." + str(ip[3])
        hosts.append(ip)
    file = open(file_name, 'w')
    file.write(str(hosts))

    print ("Done", time.strftime("%X %x %Z"))


def listen(responses):
    global SIGNAL
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    s.bind(('', 1))
    print ("Listening")
    while SIGNAL:
        packet = s.recv(1024)[:20][-8:-4]
        responses.append(packet)
    print ("Stop Listening")
    s.close()

SIGNAL = True

responses = []

ips = '200.131.0.0/20' # Internet network
wait = 0.002  # Adjust this based in your bandwidth (Faster link is Lower wait)
file_name = 'log1.txt'

ip_network = ipaddress.ip_network(unicode(ips), strict=False)

t_server = Thread(target=listen, args=[responses])
t_server.start()

t_ping = Thread(target=rotate, args=[ip_network, file_name, wait, responses])
t_ping.start()

我试过:

ip_network = ipaddress.ip_network(ips, strict=False) 而不是 ip_network = ipaddress.ip_network(unicode(ips), strict=False)

因为错误:“”NameError: name 'unicode' is not defined”

之后:

我得到:my_checksum = checksum(header + data) -> TypeError: can't concat bytes to str

所以我尝试了:

data = bytes(192 * 'Q').encode('utf8') 而不是 data = 192 * 'Q'

现在,错误是:“”data = bytes (192 * 'Q').encode('utf8') TypeError: string argument without an encoding”

有人能帮我把代码移植到 Python 3 吗?

最佳答案

import random
import socket
import time
import ipaddress
import struct

from threading import Thread

def checksum(source_string):
    sum = 0
    count_to = (len(source_string) / 2) * 2
    count = 0
    while count < count_to:
        this_val = source_string[count + 1] * 256 + source_string[count]
        sum = sum + this_val
        sum = sum & 0xffffffff
        count = count + 2
    if count_to < len(source_string):
        sum = sum + source_string[len(source_string) - 1]
        sum = sum & 0xffffffff
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


def create_packet(id):
    header = struct.pack('bbHHh', 8, 0, 0, id, 1)
    data = 192 * b'Q'
    my_checksum = checksum(header + data)
    header = struct.pack('bbHHh', 8, 0, socket.htons(my_checksum), id, 1)
    return header + data


def ping(addr, timeout=1):
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    except Exception as e:
        print (e)
    packet_id = int((id(timeout) * random.random()) % 65535)
    packet = create_packet(packet_id)
    my_socket.connect((addr, 80))
    my_socket.sendall(packet)
    my_socket.close()


def rotate(addr, file_name, wait, responses):
    print ("Sending Packets", time.strftime("%X %x %Z"))
    for ip in addr:
        ping(str(ip))
        time.sleep(wait)
    print ("All packets sent", time.strftime("%X %x %Z"))

    print ("Waiting for all responses")
    time.sleep(2)

    # Stoping listen
    global SIGNAL
    SIGNAL = False
    ping('127.0.0.1')  # Final ping to trigger the false signal in listen

    print (len(responses), "hosts found!")
    print ("Writing File")
    hosts = set()
    for response in sorted(responses):
        ip = struct.unpack('BBBB', response)
        ip = str(ip[0]) + "." + str(ip[1]) + "." + str(ip[2]) + "." + str(ip[3])
        hosts.add(ip)
    with open(file_name, 'w') as file:
        file.write('\n'.join(sorted(hosts, key=lambda item: socket.inet_aton(item))))

    print ("Done", time.strftime("%X %x %Z"))


def listen(responses):
    global SIGNAL
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    s.bind(('', 1))
    print ("Listening")
    while SIGNAL:
        packet = s.recv(1024)[:20][-8:-4]
        responses.append(packet)
    print ("Stop Listening")
    s.close()

SIGNAL = True

responses = []

ips = '192.168.1.0/28' # Internet network
wait = 0.002  # Adjust this based in your bandwidth (Faster link is Lower wait)
file_name = 'log1.txt'

ip_network = ipaddress.ip_network(ips, strict=False)

t_server = Thread(target=listen, args=[responses])
t_server.start()

t_ping = Thread(target=rotate, args=[ip_network, file_name, wait, responses])
t_ping.start()

关于python - 将 python 2 代码移植到 Python 3 : ICMP Scan with errors,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39259570/

相关文章:

python - 为什么我使用 twilio 的脚本在本地运行完美,但在 Python Anywhere 上却抛出错误?

python - 两个函数的 yield 可以相减吗?

Python-子进程

python - Python Pandas 中的时间格式化

python-2.7 - 处理由 Python 代码产生的 C++ 子进程引起的段错误

python - 值错误 : too many values to unpack Scipy

python - 如何使用 urllib 跟踪重定向?

python - 遍历大文件的一部分的内存高效方法

python - 我如何将 Heroku 数据库转移到我的本地计算机?我有一个 Django 应用程序(不是 Rails)

python - pytest导入错误