python - 如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?

标签 python multithreading apache-spark pyspark

从 Spark 文档中了解到 Scheduling Within an Application :

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

我在 Scala 和 Java 中找不到相同的示例代码。 有人可以举例说明如何使用 PySpark 实现吗?

最佳答案

我遇到了同样的问题,所以我创建了一个独立的小例子。我使用 python 的线程模块创建多个线程并同时提交多个 spark 作业。

请注意,默认情况下,spark 将以先进先出 (FIFO) 的方式运行作业:http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application .在下面的示例中,我将其更改为 FAIR 调度

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

输出:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000

关于python - 如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30214474/

相关文章:

python - 使用 urllib urllib2 python 向 SciHub 发送表单请求不再有效

python - 回滚事务不适用于 py.test 和 Flask

c# - 在 Unity iOS 上发出 HTTP 请求的方法?

java - 无法从连接到 EC2 上 Cassandra 的 EMR 运行 Spark 作业

scala - Spark 分区 Hive 表

python - 在带有字符串的for循环中的每个其他字母之前添加一个空格

python - 选择一列的一些元素并找到它们的最大值,在一个大文件上重复。使用 python

c# - 长时间运行的进程暂停

multithreading - 保护两个线程的字符串缓冲区?

scala - Spark Scala 列出目录中的文件夹