python - 从整数列 PySpark 中删除字母

标签 python pyspark

我有一个非常大的表来表示点(>3000 万点)。 它可以有两个或树形列代表 x,y,z

不幸的是,其中一些列可以包含字符串('nan'、'nulo'、'vazio' 等) 它们可以在不同的文件中改变,但在表中是不变的

我需要一种方法来删除此字符串并用空值替换它们或删除行

我所做的是在图片和下面的代码中,有更好的原因吗?更灵活?(此代码仅适用于 3d)

def import_file(self,file_path:str,sep:str=',',null_values:str=''):  
 
 #read table
 table =  self.spark.read.load(path=file_path, \
 format='csv', \
 sep=sep, \
 header=False).toDF('x','y','z')
 
 #change the letters to ''
 table.withColumn('x',regexp_replace('x','[a-z]',''))
 table.withColumn('y',regexp_replace('z','[a-z]',''))
 table.withColumn('z',regexp_replace('z','[a-z]',''))

 #replace '' for nulls or TODO:remove columns
 table.replace('',None)

 return table

最佳答案

另一种方法可以使用 UDF 来标记字符串,并且进一步基于您想要删除的列中的任何行组合,您可以轻松地做到这一点

import pyspark.sql.functions as F
import pandas as pd
import numpy as np

@F.udf(returnType=BooleanType())
def mark_strings(inp):

  #### Check if inp is string or not , assuming here you can have numeric rows as well which are to be returned as is

  if isinstance(inp,str) and not pd.isnull(inp):
    if inp.isalpha():
       return True
  
  return False


@F.udf(returnType=StringType())
def replace_strings(inp):

  #### Check if inp is string or not , assuming here you can have numeric rows as well which are to be returned as is

  if isinstance(inp,str) and not pd.isnull(inp):
    if inp.isalpha():
       return np.nan
  
  return inp

删除数据行

table = table.withColumn('x_str_bool',mark_strings(F.col('x')))
table = table.withColumn('y_str_bool',mark_strings(F.col('y')))
table = table.withColumn('z_str_bool',mark_strings(F.col('z')))

##### Assuming if you only want to remove string data rows based on a combination of x and y.

table_filter = table.filter((F.col('x_str_bool') == False) &
(F.col('y_str_bool') == False))

替换数据行

table = table.withColumn('x',replace_strings(F.col('x')))
table = table.withColumn('y',replace_strings(F.col('y')))
table = table.withColumn('z',replace_strings(F.col('z')))

关于python - 从整数列 PySpark 中删除字母,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67846590/

相关文章:

python - 如何将所有列相互相乘

java - 从pyspark手动调用spark的垃圾回收

scala - 如何在 PySpark 中压缩两个 RDD?

apache-spark - 为什么 SparkContext.parallelize 使用驱动程序的内存?

arrays - PySpark - RDD 到 JSON

java.lang.内存不足错误: Java heap space collecting a lot of elements from an rdd in pyspark

python - 在单元测试中将 errno 与 assertRaises 结合使用

python - 使用 Jinja 按嵌套字典值过滤

python - Python 中字符串的受限排列

python - 将函数应用于 3D numpy 数组的每个 2D 切片的有效方法