python - 如何对PySpark程序进行单元测试?

原文 标签 python unit-testing apache-spark pyspark

我当前的Java / Spark单元测试方法通过使用“本地”实例化SparkContext并使用JUnit运行单元测试来工作(详细信息here)。

必须组织代码以在一个功能中进行I / O,然后使用多个RDD调用另一个功能。

这很好。我有一个用Java + Spark编写的经过高度测试的数据转换。

我可以用Python做同样的事情吗?

如何使用Python运行Spark单元测试?

最佳答案

我建议也使用py.test。 py.test使得创建可重复使用的SparkContext测试装置变得容易,并使用它编写简洁的测试功能。您还可以专门测试夹具(例如,创建一个StreamingContext)并在测试中使用其中的一个或多个。

我写了一篇有关Medium的博客文章,主题是:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

这是该帖子的摘录:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

相关文章:

python - 使用硬链接(hard link)的Python写时复制

python - 如何在数据帧末尾添加某列的总和

ios - [[NSRunLoop currentRunLoop] runMode:* beforeDate:*]中的BAD EXC;

hadoop - Spark 流是否同时适用于 “cp”和 “mv”

apache-spark - Spark Sql抛出PermGen空间错误

python - 如何定义曲线拟合中的函数?

python - 模拟Flask before_first_request

apache-spark - 以字节数组为键的ReduceByKey

php - PHP中的测试驱动开发