我想修复我的函数,以便 filter
中的表达式将根据传递的参数而有所不同。例如,如果我们传递 eq_cols = {"a": "aaa", "b": "bbb"}
,那么应该创建两个表达式,如果没有(默认),我们不会传递这些表达式。 isin_col
相同和select_cols
.
import polars as pl
from typing import Dict, List
def query_parquet(
file_path: str,
eq_cols: Dict[str, str] = None,
isin_cols: Dict[str, List[str]] = None,
select_cols: List[str] = None,
) -> pl.DataFrame:
query = (
pl.scan_parquet(file_path)
.filter(
pl.col("A") == "aaa",
# pl.col("B") == "bbb",
# pl.col("C").is_in(["ccc1", "ccc2"])
# pl.col("D").is_in(["ddd1", "ddd2"])
)
.select(["F", "G", "H"])
.collect()
)
return query
你们能告诉我该怎么做吗?我希望我足够好地解释了我的问题。
最佳答案
表达式是 Python 对象,在创建它们时不会对其进行求值,因此您可以进行类似的理解
pl.scan_parquet(file_path)
.filter(
pl.all_horizontal(
pl.all_horizontal(pl.col(k).eq(v) for k,v in eq_cols.items()),
pl.all_horizontal(pl.col(k).is_in(v) for k,v in isin_cols.items()),
)
)
.select(select_cols)
.collect()
等等,您希望的任何组合逻辑,而不仅仅是all_horizontal
。在默认 None
的情况下,您可以将默认的某种类型的always True
表达式或 *
用于 select,或者在以下情况下执行一些 if 条件:在多行上构建查询,而不是像这样只在一行上构建查询:
query = pl.scan_parquet(file_path)
if eq_cols or isin_cols:
filter_exprs = []
if eq_cols:
filter_exprs.append(pl.all_horizontal(pl.col(k).eq(v) for k,v in eq_cols.items())
if isin_cols:
filter_exprs.append(pl.all_horizontal(pl.col(k).is_in(v) for k,v in isin_cols.items())
query = query.filter(pl.all_horizontal(filter_exprs))
query = query.filter(filter_expr)
if select_cols:
query = query.select(select_cols)
query = query.collect() # done here
附录:eq_cols
的另一个选项是在最新版本的极坐标中,filter
变得更加符合人体工程学。 pl.all_horizontal 的逻辑是传入可变位置参数时的默认逻辑(您甚至可以直接将可迭代对象作为一个位置参数传入),并且特定关键字参数等于 pl。 col(k).eq(v)。因此,您可以解压字典(以及可选的列表/生成器,但不需要)来简化上面的 if 语句,就像这样,假设默认值是 []
和 {}
而不是 None
:
if eq_cols or isin_cols:
query = query.filter((pl.col(k).is_in(v) for k,v in isin_cols.items()), **eq_cols)
# can unpack the generator, e.g. *(pl.col(k) ... ), same results
关于python-polars - 如何使用任意表达式过滤惰性极坐标数据帧?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77512892/