python - 如果 bool 结果为 true,则 Pandas 返回 CIDR

标签 python pandas

我正在使用几个 pandas 数据框。 df1 具有 IP 地址范围,df2 具有 IP 地址。此代码使用 bool 结果正确标记 df 列中的任何 IP 是否与 df 列中的任何 CIDR 匹配。我在获取 CIDR 范围而不是返回 bool 结果(如果为真)时遇到了问题。

import pandas as pd
import netaddr
from netaddr import *

创建范围数据框

a = {'StartAddress': ['65.14.88.64', '148.77.37.88', '65.14.41.128','65.14.40.0', '208.252.49.240','12.9.27.48','107.135.41.16','47.44.167.240'],
 'EndAddress': ['65.14.88.95', '148.77.37.95','65.14.41.135','65.14.40.255', '208.252.49.247','12.9.27.63','107.135.41.23','47.44.167.247']}

df1 = pd.DataFrame(data=a)
#Convert range to netaddr cidr format
def rangetocidr(row):
    return netaddr.iprange_to_cidrs(row.StartAddress, row.EndAddress)    

df1["CIDR"] = df1.apply(rangetocidr, axis=1)
df1["CIDR"].iloc[0]

创建 ip 数据帧

b = {'IP': ['65.13.88.64','148.65.37.88','65.14.88.65','148.77.37.93','66.15.41.132', '208.252.49.247','208.252.49.248','12.9.27.49']}
df2 = pd.DataFrame(data=b)

#Convert ip to netaddr format
def iptonetaddrformat (row):
    return netaddr.IPAddress(row.IP)

df2["IP_Format"] = df2.apply(iptonetaddrformat, axis=1)
df2["IP_Format"].iloc[0]

ip = pd.DataFrame(df2.IP.str.rsplit('.', 1, expand=True))
ip.columns = ['IP_init', 'IP_last']

start = pd.DataFrame(df1.StartAddress.str.rsplit('.', 1, expand=True))
start.columns = ['start_init', 'start_last']

end = pd.DataFrame(df1.EndAddress.str.rsplit('.', 1, expand=True))
end.columns = ['end_init', 'end_last']

df = pd.concat([ip, start, end], axis=1)

index = []
for idx, val in enumerate(df.itertuples()):
    for i in range(df.start_init.count()):
        if df.loc[idx, 'IP_init'] == df.loc[i, 'start_init']:            
            if df.loc[idx, 'IP_last'] >= df.loc[i, 'start_last'] and df.loc[idx, 'IP_last'] <= df.loc[i, 'end_last']:
                index.append(idx)
                break

df2['IN_CIDR'] = False

df2.loc[index, 'IN_CIDR'] = True

这可以正确标记来自 df2 的 IP 是否属于 df1 中的范围,并使用 bool 值 True 或 False。

    IP             IP_Format      IN_CIDR
0   65.13.88.64    65.13.88.64    False
1   148.65.37.88   148.65.37.88   False
2   65.14.88.65    65.14.88.65    True
3   148.77.37.93   148.77.37.93   True
4   66.15.41.132   66.15.41.132   False
5   208.252.49.247 208.252.49.247 True
6   208.252.49.248 208.252.49.248 False
7   12.9.27.49     12.9.27.49     True

我想更换 True df2 中具有正确 CIDR 的条目列 IP范围2 df2列。所需输出的示例如下:

    IP             IP_Format      IN_CIDR
0   65.13.88.64    65.13.88.64    False
1   148.65.37.88   148.65.37.88   False
2   65.14.88.65    65.14.88.65    [65.14.88.64/27]
3   148.77.37.93   148.77.37.93   [148.77.37.88/29]
4   66.15.41.132   66.15.41.132   False
5   208.252.49.247 208.252.49.247 [208.252.49.240/29]
6   208.252.49.248 208.252.49.248 False
7   12.9.27.49     12.9.27.49     [12.9.27.48/28] 

我已经尝试过df2.loc[index, 'IN_CIDR'] = df1.loc[index,'CIDR']但这只是给我索引位置 df1 的 CIDR,而不是将其与 CIDR 范围内的 ip 匹配。

最佳答案

我正在使用这种方式:

a = {'StartAddress': ['65.14.88.64', '148.77.37.88', '65.14.41.128', '65.14.40.0', '208.252.49.240', '12.9.27.48',
                      '107.135.41.16', '47.44.167.240'],
     'EndAddress': ['65.14.88.95', '148.77.37.95', '65.14.41.135', '65.14.40.255', '208.252.49.247', '12.9.27.63',
                    '107.135.41.23', '47.44.167.247']}
df1 = pd.DataFrame(data=a)

# Convert range to netaddr cidr format
def rangetocidr(row):
    return netaddr.iprange_to_cidrs(row.StartAddress, row.EndAddress)

df1["CIDR"] = df1.apply(rangetocidr, axis=1)

b = {'IP': ['65.13.88.64', '148.65.37.88', '65.14.88.65', '148.77.37.93', '66.15.41.132', '208.252.49.247', '208.252.49.248', '12.9.27.49']}
df2 = pd.DataFrame(data=b)

# Convert ip to netaddr format
def iptonetaddrformat(row):
    return netaddr.IPAddress(row.IP)

df2["IP_Format"] = df2.apply(iptonetaddrformat, axis=1)


df2['IN_CIDR'] = False

for i, row in df2.iterrows():
    ip = row['IP']
    for j, r in df1.iterrows():
        subnet = str(r['CIDR'][0])
        if ip_in_subnetwork(ip, subnet):
            df2.loc[i, 'IN_CIDR'] = '['+ subnet + ']'
print(df2)

输出:

               IP       IP_Format                           IN_CIDR
0     65.13.88.64     65.13.88.64                             False
1    148.65.37.88    148.65.37.88                             False
2     65.14.88.65     65.14.88.65                  [65.14.88.64/27]
3    148.77.37.93    148.77.37.93                 [148.77.37.88/29]
4    66.15.41.132    66.15.41.132                             False
5  208.252.49.247  208.252.49.247               [208.252.49.240/29]
6  208.252.49.248  208.252.49.248                             False
7      12.9.27.49      12.9.27.49                   [12.9.27.48/28]

这是我调用的函数来了解 IP 是否在子网中:

import netaddr as netaddr
import socket
import binascii

def ip_in_subnetwork(ip_address, subnetwork):

    """
    Returns True if the given IP address belongs to the
    subnetwork expressed in CIDR notation, otherwise False.
    Both parameters are strings.

    Both IPv4 addresses/subnetworks (e.g. "192.168.1.1"
    and "192.168.1.0/24") and IPv6 addresses/subnetworks (e.g.
    "2a02:a448:ddb0::" and "2a02:a448:ddb0::/44") are accepted.
    """

    (ip_integer, version1) = ip_to_integer(ip_address)
    (ip_lower, ip_upper, version2) = subnetwork_to_ip_range(subnetwork)

    if version1 != version2:
        raise ValueError("incompatible IP versions")

    return (ip_lower <= ip_integer <= ip_upper)

def ip_to_integer(ip_address):

    """
    Converts an IP address expressed as a string to its
    representation as an integer value and returns a tuple
    (ip_integer, version), with version being the IP version
    (either 4 or 6).

    Both IPv4 addresses (e.g. "192.168.1.1") and IPv6 addresses
    (e.g. "2a02:a448:ddb0::") are accepted.
    """

    # try parsing the IP address first as IPv4, then as IPv6
    for version in (socket.AF_INET, socket.AF_INET6):

        try:
            ip_hex = socket.inet_pton(version, ip_address)
            ip_integer = int(binascii.hexlify(ip_hex), 16)

            return (ip_integer, 4 if version == socket.AF_INET else 6)
        except:
            pass

    raise ValueError("invalid IP address")

def subnetwork_to_ip_range(subnetwork):

    """
    Returns a tuple (ip_lower, ip_upper, version) containing the
    integer values of the lower and upper IP addresses respectively
    in a subnetwork expressed in CIDR notation (as a string), with
    version being the subnetwork IP version (either 4 or 6).

    Both IPv4 subnetworks (e.g. "192.168.1.0/24") and IPv6
    subnetworks (e.g. "2a02:a448:ddb0::/44") are accepted.
    """

    try:
        fragments = subnetwork.split('/')
        network_prefix = fragments[0]
        netmask_len = int(fragments[1])

        # try parsing the subnetwork first as IPv4, then as IPv6
        for version in (socket.AF_INET, socket.AF_INET6):

            ip_len = 32 if version == socket.AF_INET else 128

            try:
                suffix_mask = (1 << (ip_len - netmask_len)) - 1
                netmask = ((1 << ip_len) - 1) - suffix_mask
                ip_hex = socket.inet_pton(version, network_prefix)
                ip_lower = int(binascii.hexlify(ip_hex), 16) & netmask
                ip_upper = ip_lower + suffix_mask

                return (ip_lower,
                        ip_upper,
                        4 if version == socket.AF_INET else 6)
            except:
                pass
    except:
        pass

    raise ValueError("invalid subnetwork")

关于python - 如果 bool 结果为 true,则 Pandas 返回 CIDR,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54948888/

相关文章:

python - Matplotlib 从 Pandas 数据框 groupby 中绘制图表

python - 如何使用 QGIS/Python 更改具有特定值的 Shapefile 中的属性

python - 请求 - 如何流式上传 - 部分文件

python - Pandas 数据框的切片列在从该列创建的新对象中不断提及原始列名称

python - 将包含 pandas 对象(系列/数据帧)的 python 元组、列表、字典转换为 json

python - 根据条件在 DataFrame 中设置值

python - 是否可以在 Python 的函数装饰器中获取对对象实例的引用?

python - 检测两张图片之间的相似点然后将它们叠加(Python)

python - 传递一个变量以从 Python 中的 JSON 字符串中提取?

python - Pandas:DataFrame describe返回的count是 float 的情况有哪些