python - 使用 UDF 通过使用数据框中的两列来返回列表

标签 python arrays dataframe pyspark user-defined-functions

我有 dns(字符串)和 ip 地址(字符串)的数据框。我想使用 UDF 来应用我创建的 python 函数,该函数搜索不同/唯一的 dns 并将其与它匹配的 ips 数量相关联。最后,它会将这些信息输出到一个列表中。最终结果是 UDF 获取一个数据框并返回一个列表。

#creating sample data
from pyspark.sql import Row
l = [('pipe.skype.com','172.25.132.26'),('management.azure.com','172.25.24.57'),('pipe.skype.com','172.11.128.10'),('management.azure.com','172.16.12.22'),('www.google.com','172.26.51.144'),('collector.exceptionless.io','172.22.2.21')]
rdd = sc.parallelize(l)
data = rdd.map(lambda x: Row(dns_host=x[0], src_ipv4=x[1]))
data_df = sqlContext.createDataFrame(data)

def beaconing_aggreagte(df):
  """Loops through unique hostnames and correlates them to unique src ip. If an individual hostname has less than 5 unique source ip connection, moves to the next step"""
  dns_host = df.select("dns_host").distinct().rdd.flatMap(lambda x: x).collect()
  HIT_THRESHOLD = 5
  data = []
  for dns in dns_host:
    dns_data =[]
    testing = df.where((f.col("dns_host") == dns)).select("src_ipv4").distinct().rdd.flatMap(lambda x: x).collect()
    if 0 < len(testing) <= 5: #must have less than 5 unique src ip for significance 
      dns_data.append(dns)
      data.append([testing,dns_data])
      print([testing,dns_data])
  return data

我认为我的架构可能不正确

#Expected return from function: [[['172.25.24.57','172.16.12.22'],[management.azure.com]],..]
array_schema = StructType([
    StructField('ip', ArrayType(StringType()), nullable=False),
    StructField('hostname', ArrayType(StringType()), nullable=False)
]) 

testing_udf_beaconing_aggreagte = udf(lambda z: beaconing_aggreagte(z), array_schema)
df_testing = testing_df.select('*',testing_udf_beaconing_aggreagte(array('dns_host','src_ipv4')))
df_testing.show()

此错误输出到:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1248.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1248.0 (TID 3846823, 10.139.64.23, executor 13): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

我的最终目标是获取 df 并返回格式为 [[[ips 列表], [dns_host]],...] 的列表。我正在尝试使用 UDF 来帮助并行化集群上的操作,而不是使用一个执行程序。

最佳答案

group by 应该能够实现这一点。使用聚合收集所有 IP,然后统计列表的大小。 然后,您可以过滤掉大小 > 5

的行
from pyspark.sql.functions import *
from pyspark.sql import Row
l = [('pipe.skype.com','172.25.132.26'),('management.azure.com','172.25.24.57'),('pipe.skype.com','172.11.128.10'),('management.azure.com','172.16.12.22'),('www.google.com','172.26.51.144'),('collector.exceptionless.io','172.22.2.21')]
rdd = sc.parallelize(l)
data = rdd.map(lambda x: Row(dns_host=x[0], src_ipv4=x[1]))
data_df = sqlContext.createDataFrame(data)

data_df2 = data_df.groupby("dns_host").agg(F.collect_list("src_ipv4").alias("src_ipv4_list"))\
                  .withColumn("ip_count",F.size("src_ipv4_list"))\
                  .filter(F.col("ip_count") <= 5)\
                  .drop("ip_count")
data_df2.show(20,False)

输出:

+--------------------------+------------------------------+
|dns_host                  |src_ipv4_list                 |
+--------------------------+------------------------------+
|pipe.skype.com            |[172.25.132.26, 172.11.128.10]|
|collector.exceptionless.io|[172.22.2.21]                 |
|www.google.com            |[172.26.51.144]               |
|management.azure.com      |[172.25.24.57, 172.16.12.22]  |
+--------------------------+------------------------------+

关于python - 使用 UDF 通过使用数据框中的两列来返回列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53873501/

相关文章:

python - 为什么 K 最近点到原点的堆比排序慢?

javascript - 解析数组的数组并返回公共(public)交集

c++ - 如何在 C++ 中搜索字符串数组

python - 按月对数据帧进行排序,并找到每个月每列中的第一个非零值

python - TensorFlow:是否有一种方法可以在变量范围内初始化变量(如果未初始化),并重用它们(如果已初始化)?

php - 使用 PHP 变量执行 Python 脚本

arrays - Cantor 的 ZigZag 函数 : Find the nth cell

python - 将数据帧的索引设置为字典中的单个键

database - 关系数据库与 R/Python 数据框架

python - 如何在 Python 中处理非常长的字符串?