我正在使用几个 pandas 数据框。 df1 具有 IP 地址范围,df2 具有 IP 地址。此代码使用 bool 结果正确标记 df 列中的任何 IP 是否与 df 列中的任何 CIDR 匹配。我在获取 CIDR 范围而不是返回 bool 结果(如果为真)时遇到了问题。
import pandas as pd
import netaddr
from netaddr import *
创建范围数据框
a = {'StartAddress': ['65.14.88.64', '148.77.37.88', '65.14.41.128','65.14.40.0', '208.252.49.240','12.9.27.48','107.135.41.16','47.44.167.240'],
'EndAddress': ['65.14.88.95', '148.77.37.95','65.14.41.135','65.14.40.255', '208.252.49.247','12.9.27.63','107.135.41.23','47.44.167.247']}
df1 = pd.DataFrame(data=a)
#Convert range to netaddr cidr format
def rangetocidr(row):
return netaddr.iprange_to_cidrs(row.StartAddress, row.EndAddress)
df1["CIDR"] = df1.apply(rangetocidr, axis=1)
df1["CIDR"].iloc[0]
创建 ip 数据帧
b = {'IP': ['65.13.88.64','148.65.37.88','65.14.88.65','148.77.37.93','66.15.41.132', '208.252.49.247','208.252.49.248','12.9.27.49']}
df2 = pd.DataFrame(data=b)
#Convert ip to netaddr format
def iptonetaddrformat (row):
return netaddr.IPAddress(row.IP)
df2["IP_Format"] = df2.apply(iptonetaddrformat, axis=1)
df2["IP_Format"].iloc[0]
ip = pd.DataFrame(df2.IP.str.rsplit('.', 1, expand=True))
ip.columns = ['IP_init', 'IP_last']
start = pd.DataFrame(df1.StartAddress.str.rsplit('.', 1, expand=True))
start.columns = ['start_init', 'start_last']
end = pd.DataFrame(df1.EndAddress.str.rsplit('.', 1, expand=True))
end.columns = ['end_init', 'end_last']
df = pd.concat([ip, start, end], axis=1)
index = []
for idx, val in enumerate(df.itertuples()):
for i in range(df.start_init.count()):
if df.loc[idx, 'IP_init'] == df.loc[i, 'start_init']:
if df.loc[idx, 'IP_last'] >= df.loc[i, 'start_last'] and df.loc[idx, 'IP_last'] <= df.loc[i, 'end_last']:
index.append(idx)
break
df2['IN_CIDR'] = False
df2.loc[index, 'IN_CIDR'] = True
这可以正确标记来自 df2 的 IP 是否属于 df1 中的范围,并使用 bool 值 True 或 False。
IP IP_Format IN_CIDR
0 65.13.88.64 65.13.88.64 False
1 148.65.37.88 148.65.37.88 False
2 65.14.88.65 65.14.88.65 True
3 148.77.37.93 148.77.37.93 True
4 66.15.41.132 66.15.41.132 False
5 208.252.49.247 208.252.49.247 True
6 208.252.49.248 208.252.49.248 False
7 12.9.27.49 12.9.27.49 True
我想更换 True
df2
中具有正确 CIDR 的条目列或 IP范围2 df2
列。所需输出的示例如下:
IP IP_Format IN_CIDR
0 65.13.88.64 65.13.88.64 False
1 148.65.37.88 148.65.37.88 False
2 65.14.88.65 65.14.88.65 [65.14.88.64/27]
3 148.77.37.93 148.77.37.93 [148.77.37.88/29]
4 66.15.41.132 66.15.41.132 False
5 208.252.49.247 208.252.49.247 [208.252.49.240/29]
6 208.252.49.248 208.252.49.248 False
7 12.9.27.49 12.9.27.49 [12.9.27.48/28]
我已经尝试过df2.loc[index, 'IN_CIDR'] = df1.loc[index,'CIDR']
但这只是给我索引位置 df1 的 CIDR,而不是将其与 CIDR 范围内的 ip 匹配。
最佳答案
我正在使用这种方式:
a = {'StartAddress': ['65.14.88.64', '148.77.37.88', '65.14.41.128', '65.14.40.0', '208.252.49.240', '12.9.27.48',
'107.135.41.16', '47.44.167.240'],
'EndAddress': ['65.14.88.95', '148.77.37.95', '65.14.41.135', '65.14.40.255', '208.252.49.247', '12.9.27.63',
'107.135.41.23', '47.44.167.247']}
df1 = pd.DataFrame(data=a)
# Convert range to netaddr cidr format
def rangetocidr(row):
return netaddr.iprange_to_cidrs(row.StartAddress, row.EndAddress)
df1["CIDR"] = df1.apply(rangetocidr, axis=1)
b = {'IP': ['65.13.88.64', '148.65.37.88', '65.14.88.65', '148.77.37.93', '66.15.41.132', '208.252.49.247', '208.252.49.248', '12.9.27.49']}
df2 = pd.DataFrame(data=b)
# Convert ip to netaddr format
def iptonetaddrformat(row):
return netaddr.IPAddress(row.IP)
df2["IP_Format"] = df2.apply(iptonetaddrformat, axis=1)
df2['IN_CIDR'] = False
for i, row in df2.iterrows():
ip = row['IP']
for j, r in df1.iterrows():
subnet = str(r['CIDR'][0])
if ip_in_subnetwork(ip, subnet):
df2.loc[i, 'IN_CIDR'] = '['+ subnet + ']'
print(df2)
输出:
IP IP_Format IN_CIDR
0 65.13.88.64 65.13.88.64 False
1 148.65.37.88 148.65.37.88 False
2 65.14.88.65 65.14.88.65 [65.14.88.64/27]
3 148.77.37.93 148.77.37.93 [148.77.37.88/29]
4 66.15.41.132 66.15.41.132 False
5 208.252.49.247 208.252.49.247 [208.252.49.240/29]
6 208.252.49.248 208.252.49.248 False
7 12.9.27.49 12.9.27.49 [12.9.27.48/28]
这是我调用的函数来了解 IP 是否在子网中:
import netaddr as netaddr
import socket
import binascii
def ip_in_subnetwork(ip_address, subnetwork):
"""
Returns True if the given IP address belongs to the
subnetwork expressed in CIDR notation, otherwise False.
Both parameters are strings.
Both IPv4 addresses/subnetworks (e.g. "192.168.1.1"
and "192.168.1.0/24") and IPv6 addresses/subnetworks (e.g.
"2a02:a448:ddb0::" and "2a02:a448:ddb0::/44") are accepted.
"""
(ip_integer, version1) = ip_to_integer(ip_address)
(ip_lower, ip_upper, version2) = subnetwork_to_ip_range(subnetwork)
if version1 != version2:
raise ValueError("incompatible IP versions")
return (ip_lower <= ip_integer <= ip_upper)
def ip_to_integer(ip_address):
"""
Converts an IP address expressed as a string to its
representation as an integer value and returns a tuple
(ip_integer, version), with version being the IP version
(either 4 or 6).
Both IPv4 addresses (e.g. "192.168.1.1") and IPv6 addresses
(e.g. "2a02:a448:ddb0::") are accepted.
"""
# try parsing the IP address first as IPv4, then as IPv6
for version in (socket.AF_INET, socket.AF_INET6):
try:
ip_hex = socket.inet_pton(version, ip_address)
ip_integer = int(binascii.hexlify(ip_hex), 16)
return (ip_integer, 4 if version == socket.AF_INET else 6)
except:
pass
raise ValueError("invalid IP address")
def subnetwork_to_ip_range(subnetwork):
"""
Returns a tuple (ip_lower, ip_upper, version) containing the
integer values of the lower and upper IP addresses respectively
in a subnetwork expressed in CIDR notation (as a string), with
version being the subnetwork IP version (either 4 or 6).
Both IPv4 subnetworks (e.g. "192.168.1.0/24") and IPv6
subnetworks (e.g. "2a02:a448:ddb0::/44") are accepted.
"""
try:
fragments = subnetwork.split('/')
network_prefix = fragments[0]
netmask_len = int(fragments[1])
# try parsing the subnetwork first as IPv4, then as IPv6
for version in (socket.AF_INET, socket.AF_INET6):
ip_len = 32 if version == socket.AF_INET else 128
try:
suffix_mask = (1 << (ip_len - netmask_len)) - 1
netmask = ((1 << ip_len) - 1) - suffix_mask
ip_hex = socket.inet_pton(version, network_prefix)
ip_lower = int(binascii.hexlify(ip_hex), 16) & netmask
ip_upper = ip_lower + suffix_mask
return (ip_lower,
ip_upper,
4 if version == socket.AF_INET else 6)
except:
pass
except:
pass
raise ValueError("invalid subnetwork")
关于python - 如果 bool 结果为 true,则 Pandas 返回 CIDR,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54948888/