python - Spark 会立即丢弃临时 rdd 吗?

标签 python caching apache-spark pyspark rdd

一些来源将 RDD 默认描述为“短暂的”(例如 this s/o answer )——这意味着除非我们对它们调用 cache() 或 persist() ,否则它们不会保留在内存中。

假设我们的程序涉及一个临时(用户未显式缓存)RDD,该 RDD 用于导致 RDD 实现的一些操作。 我的问题是:Spark 是否会立即丢弃物化临时 RDD——或者 RDD 是否可能保留在内存中以进行其他操作,即使我们从未要求它这样做被缓存?

此外,如果临时 RDD 保留在内存中,是否总是只是因为某些 LRU 策略尚未将其踢出——或者也可能是因为调度优化?

我尝试用下面的代码来解决这个问题 - 在 4 核机器上使用带有 python 3.5 和 Spark 1.6.0 的 Jupyter 笔记本运行 - 但我希望得到知道的人的回答当然。

import pyspark
sc = pyspark.SparkContext()
N = 1000000   # size of dataset
THRESHOLD = 100  # some constant

def f():
    """ do not chache """
    rdd = sc.parallelize(range(N))
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

def g():
    """ cache """
    rdd = sc.parallelize(range(N)).cache()
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

对于上面的两个函数,f() 不会要求 rdd 持久化 - 但 g() 在开始时会这样做。当我对 foo() 和 boo() 这两个函数进行计时时,我得到了两个函数非常相似的性能,就好像 cache() 调用没有产生任何区别一样。 (事实上​​,使用缓存的速度更慢)。

%%timeit
f()
> 1 loops, best of 3: 2.19 s per loop

%%timeit
g()
> 1 loops, best of 3: 2.7 s per loop

实际上,即使修改 f() 以在 RDD 上调用 unpersist() 也不会改变任何事情。

def ff():
    """ modified f() with explicit call to unpersist() """
  rdd = sc.parallelize(range(N))
  for i in range(10):
    rdd.unpersist()
    print(rdd.filter(lambda x: x > i * THRESHOLD).count())

%%timeit
ff()
> 1 loops, best of 3: 2.25 s per loop

unpersist() 的文档指出,它“将 RDD 标记为非持久性的,并从内存和磁盘中删除它的所有 block ”。 但事实真的如此吗?或者当 Spark 知道它将在以后使用 RDD 时,它会忽略对 unpersist 的调用吗?

最佳答案

这里的缓 stub 本没有值(value)。从范围创建RDD非常便宜(每个分区只需要两个整数即可开始),并且您应用的操作无法真正从缓存中受益。 persist 应用于 Java 对象而不是 Python 对象,并且您的代码在 RDD 创建和第一次转换之间不执行任何工作。

即使您忽略所有这些,这也是一项非常简单的任务,数据量很小。总成本很可能是由日程安排和沟通决定的,而不是其他因素。

如果您想查看缓存的实际效果,请考虑以下示例:

from pyspark import SparkContext
import time

def f(x):
   time.sleep(1)
    return x

sc = SparkContext("local[5]")
rdd = sc.parallelize(range(50), 5).map(f)
rdd.cache()

%time rdd.count()   # First run, no data cached ~10 s
## CPU times: user 16 ms, sys: 4 ms, total: 20 ms
## Wall time: 11.4 s
## 50

%time rdd.count()  # Second time, task results fetched from cache
## CPU times: user 12 ms, sys: 0 ns, total: 12 ms
## Wall time: 114 ms
## 50

rdd.unpersist()  # Data unpersisted

%time rdd.count()  #  Results recomputed ~10s
## CPU times: user 16 ms, sys: 0 ns, total: 16 ms 
## Wall time: 10.1 s
## 50

虽然在像这样的简单情况下,持久行为在一般情况下是可以预测的,但缓存应该被视为一种提示而不是契约。任务输出可能会保留或不保留,具体取决于可用资源,并且可以在没有任何用户干预的情况下从缓存中逐出。

关于python - Spark 会立即丢弃临时 rdd 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36087490/

相关文章:

Javascript 文件未更新(第一次)

algorithm - Spark : What is the time complexity of the connected components algorithm used in GraphX?

python - 如何制作像django这样的基于Python的Web框架?

python - 如何在 matplotlib 中反射(reflect)矩阵?

php - 使用 cURL PHP 与命令行 cURL 清除 Varnish 缓存

docker - kubernetes 无法从 spark master 主机中提取图像

python - 在分布式系统中实现DBSCAN

python - 如何在线程之间共享对象?

python - 力扣 : Problem 23 - Merge K Sorted Lists

caching - 如何在 apollo-server-hapi graphql 上实现缓存