python - 如何在pyspark作业中为事件中心添加conf

标签 python apache-spark pyspark

我有一个pyspark作业,该作业使用事件中心将数据推送到azure data lake。但是我无法在event hubs作业中包含pyspark的配置,因为没有相同的库。

以下是示例pyspark作业:

from pyspark.sql.functions import lit
import pyspark.sql.functions as f
from pyspark.sql.functions import UserDefinedFunction,regexp_replace,col
import json
from pyspark.sql import types as T
from pyspark.sql.functions import to_json, struct, when
import time
import logging
import sys
from datetime import datetime
import pytz
from datetime import datetime, timedelta
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, FloatType, DoubleType


conf = SparkConf().setAppName("sample1")
sc = SparkContext(conf=conf)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample1").getOrCreate()


......logic of the job



# event hubs
# **ehWriteConf included topic name and connection 
DF.select("body").write.format("eventhubs").options(**ehWriteConf).save()


我正在使用spark-submit运行我的工作,但在事件中心线遇到错误。

我的Spark版本是2.11

最佳答案

您可以在提交conf作业时指定pyspark,如下所示:

spark-submit --packages com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.1 your_job_name.py



将此用作参考https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md

关于python - 如何在pyspark作业中为事件中心添加conf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60365522/

相关文章:

python - BS4 如何在不使用 .text 的情况下获取文本?

python - 如何向 OpenCV 函数 cv.dft() 输入复数值?

scala - 如何获取 RDD 的子集?

python - 如何在 Azure Databricks 笔记本中调试长时间运行的 python 命令?

python - 如何使用 column=numbers 而不是 OpenPyXL 中的字母来读取单元格范围?

python - 检查 HDF5 Store 对象是否为空?

apache-spark - Spark UI 不断重定向到/null 并返回 500

user-interface - 任务进度条的 Spark UI 浅蓝色部分表示什么?

python - 执行 PySpark 代码时遇到 Py4JJavaError

python - 多列上的 PySpark 数据框过滤器