我当前的 Java/Spark 单元测试方法通过使用“本地”实例化 SparkContext 并使用 JUnit 运行单元测试来工作(详细 here)。
必须组织代码在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数。
这很好用。我有一个用 Java + Spark 编写的经过高度测试的数据转换。
我可以用 Python 做同样的事情吗?
如何使用 Python 运行 Spark 单元测试?
最佳答案
我也建议使用 py.test。 py.test 可以轻松创建可重用的 SparkContext 测试装置并使用它来编写简洁的测试函数。您还可以专门化夹具(例如创建 StreamingContext)并在测试中使用其中的一个或多个。
我在 Medium 上写了一篇关于这个主题的博文:
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
这里是帖子的一个片段:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
关于python - 如何对 PySpark 程序进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33811882/