我使用 memoise
包将查询缓存到 arrow
数据集,但我有时会在哈希中遇到不匹配/“冲突”,因此会返回错误的值。
我已经隔离了问题并将其复制到下面的 MWE 中。
问题在于,首先过滤然后汇总的箭头查询的 rlang::hash()
(memoise
使用)不依赖于过滤器。
我的问题是:这是我可以修复的东西吗(因为我错误地使用了它)或者这是其中一个包中的错误(我很乐意创建问题),如果是这样,是否应该将此报告给arrow
、rlang::hash()
,甚至 R6
?
MWE
例如,下面的所有三个查询都具有相同的哈希值,但它们应该不同(看看结果,结果显然是...)
library(arrow)
library(dplyr)
ds_file <- file.path(tempdir(), "mtcars")
write_dataset(mtcars, ds_file)
ds <- open_dataset(ds_file)
# 1) Create three different queries =======
# Query 1 with mpg > 25 ----
query1 <- ds |>
filter(mpg > 25) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg))
# Query 2 with mpg > 0 ----
query2 <- ds |>
filter(mpg > 0) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg))
# Query 3 with filter on cyl ----
query3 <- ds |>
filter(cyl == 4) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg))
# 2) Lets compare the hashes: the main issue ======
rlang::hash(query1)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
rlang::hash(query2)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
rlang::hash(query3)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
# ERROR HERE: they should be different as the queries are different!
# 3) Lets also compare the results: clearly different =====
query1 |> collect()
#> # A tibble: 2 × 3
#> vs n mean_mpg
#> <dbl> <int> <dbl>
#> 1 1 5 30.9
#> 2 0 1 26
query2 |> collect()
#> # A tibble: 2 × 3
#> vs n mean_mpg
#> <dbl> <int> <dbl>
#> 1 0 18 16.6
#> 2 1 14 24.6
query3 |> collect()
#> # A tibble: 2 × 3
#> vs n mean_mpg
#> <dbl> <int> <dbl>
#> 1 1 10 26.7
#> 2 0 1 26
请注意,当我使用 digest
时会发生同样的错误。
当我打印查询时,它们被打印为好像它们是相同的...(我向箭头报告了这个错误 here)
query1
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#>
#> See $.data for the source Arrow object
query2
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#>
#> See $.data for the source Arrow object
query3
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#>
#> See $.data for the source Arrow object
但是当我查询查询的$.data
参数时,我发现它们实际上是不同的
query1$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#>
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (mpg > 25) #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object
query2$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#>
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (mpg > 0) #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object
query3$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#>
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (cyl == 4) #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object
但是 rlang::hash()
还是找不到区别:
rlang::hash(query1$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"
rlang::hash(query2$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"
rlang::hash(query3$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"
如果有帮助,查询对象是 R6
类 arrow_dplyr_query
的对象(另见其 source code in apache/arrow)
备忘录用例
为了完整起见并正确看待问题,我使用以下内容来缓存结果,它应该返回不同的值(见上文)但实际上没有!
library(arrow)
library(memoise)
library(dplyr)
ds_file <- file.path(tempdir(), "mtcars")
write_dataset(mtcars, ds_file)
ds <- open_dataset(ds_file)
collect_cached <- memoise::memoise(dplyr::collect,
cache = cachem::cache_mem(logfile = stdout()))
# Query 1 with mpg > 25 ----
ds |>
filter(mpg > 25) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
#> [2022-11-25 09:16:28.586] cache_mem get: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.586] cache_mem get: key "2edd901226498414056dcc54eaa49415" is missing
#> [2022-11-25 09:16:28.705] cache_mem set: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.706] cache_mem prune
#> # A tibble: 2 × 3
#> vs n mean_mpg
#> <dbl> <int> <dbl>
#> 1 1 5 30.9
#> 2 0 1 26
# Query 2 with mpg > 0 ----
# this is wrongly matched to the first query and returns wrong results...
ds |>
filter(mpg > 0) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
#> [2022-11-25 09:16:28.820] cache_mem get: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.820] cache_mem get: key "2edd901226498414056dcc54eaa49415" found #< ERROR HERE! as the hash is identical
#> # A tibble: 2 × 3
#> vs n mean_mpg
#> <dbl> <int> <dbl>
#> 1 1 5 30.9
#> 2 0 1 26
请注意,尽管查询不同(但它们的哈希值相同,因此出现了这个问题),但我们得到了相同的结果。
最佳答案
这很像 hack……但也许就足够了?通过捕获 show_query
的输出并将其用作 hash=
的参数,我能够找到足够独特的关于包含其过滤器组件的中间“查询” 备忘录
:
hashfun <- function(x) {
x$x <- capture.output(show_query(x$x))
rlang::hash(x)
}
collect_cached <- memoise::memoise(
dplyr::collect,
cache = cachem::cache_mem(logfile = stdout()),
hash = hashfun)
ds |>
filter(mpg > 25) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
# [2022-11-25 08:14:56.596] cache_mem get: key "e6184e282e05875139e8afd2a071f329"
# [2022-11-25 08:14:56.596] cache_mem get: key "e6184e282e05875139e8afd2a071f329" is missing
# [2022-11-25 08:14:56.616] cache_mem set: key "e6184e282e05875139e8afd2a071f329"
# [2022-11-25 08:14:56.616] cache_mem prune
# # A tibble: 2 x 3
# vs n mean_mpg
# <dbl> <int> <dbl>
# 1 1 5 30.9
# 2 0 1 26
#### different filter, should be a "miss"
ds |>
filter(mpg > 0) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
# [2022-11-25 08:15:06.745] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:06.745] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f" is missing
# [2022-11-25 08:15:06.767] cache_mem set: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:06.767] cache_mem prune
# # A tibble: 2 x 3
# vs n mean_mpg
# <dbl> <int> <dbl>
# 1 0 18 16.6
# 2 1 14 24.6
#### repeat of filter `mpg > 0`, should be a "hit"
ds |>
filter(mpg > 0) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
# . + >
# [2022-11-25 08:15:24.825] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:24.825] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f" found
# # A tibble: 2 x 3
# vs n mean_mpg
# <dbl> <int> <dbl>
# 1 0 18 16.6
# 2 1 14 24.6
传递给 hashfun
的对象是一个列表,其中第一个参数似乎是校验和或某种盐(我们将忽略它),以及所有剩余的参数(命名或其他)由缓存函数的形式决定。在我们的例子中,由于我们正在缓存 collect
,它接受 x=
(我们看到)和 ...=
(我们不't):
debugonce(hashfun)
ds |>
filter(mpg > 0) |>
group_by(vs) |>
summarise(n = n(), mean_mpg = mean(mpg)) |>
collect_cached()
# debugging in: encl$`_hash`(c(encl$`_f_hash`, args, lapply(encl$`_additional`,
# function(x) eval(x[[2L]], environment(x)))))
# debug at #1: {
# x$x <- capture.output(show_query(x$x))
# rlang::hash(x)
# }
x
# [[1]]
# [1] "1e4b92a7ebe8b4bcb1afbd44c9a72a72"
#
# $x
# FileSystemDataset (query)
# vs: double
# n: int32
# mean_mpg: double
#
# See $.data for the source Arrow object
show_query(x$x)
# ExecPlan with 6 nodes:
# 5:SinkNode{}
# 4:ProjectNode{projection=[vs, n, mean_mpg]}
# 3:GroupByNode{keys=["vs"], aggregates=[
# hash_sum(n, {skip_nulls=true, min_count=1}),
# hash_mean(mean_mpg, {skip_nulls=false, min_count=0}),
# ]}
# 2:ProjectNode{projection=["n": 1, "mean_mpg": mpg, vs]}
# 1:FilterNode{filter=(mpg > 0)}
# 0:SourceNode{}
只是用 show_query(x$x)
的返回替换 x$x
似乎没有用,因为似乎只有
ed 形式,因此我选择了 rlang::hash
无法打印 capture.output
。
关于rlang::hash 无法区分箭头查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74570604/