python - 如何对 PySpark 程序进行单元测试?

标签 python unit-testing apache-spark pyspark

我当前的 Java/Spark 单元测试方法通过使用“本地”实例化 SparkContext 并使用 JUnit 运行单元测试来工作(详细 here)。

必须组织代码在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数。

这很好用。我有一个用 Java + Spark 编写的经过高度测试的数据转换。

我可以用 Python 做同样的事情吗?

如何使用 Python 运行 Spark 单元测试?

最佳答案

我也建议使用 py.test。 py.test 可以轻松创建可重用的 SparkContext 测试装置并使用它来编写简洁的测试函数。您还可以专门化夹具(例如创建 StreamingContext)并在测试中使用其中的一个或多个。

我在 Medium 上写了一篇关于这个主题的博文:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

这里是帖子的一个片段:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

关于python - 如何对 PySpark 程序进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33811882/

相关文章:

Python:从 IP 地址列表生成 IP 范围

python - Pandas 用特定的阈值计算每一列

python - 支持类的 .send() 吗?

c++ - 创建一个接口(interface)来模拟 C++ 对象

c#单元测试重写方法

python-3.x - PySpark:随机化数据框中的行

apache-spark - AWS错误请求(400) Spark

python - 检查字符串是否有效 MySQL UTF8?

powershell - 如何在 Pester 中使用 'get arguments for calls made on' 模拟(或以其他方式生成包含实际值和预期值的有用消息)?

elasticsearch - 从Kafka到Spark的流式传输到 Elasticsearch 索引