amazon-web-services - 如何将 RDD 转换为 Dataframe Spark 2.4.5 Python

标签 amazon-web-services azure apache-spark databricks azure-databricks



我完全是 databricks 和 Spark 的新手。我使用的是 data bricks 社区版和 Spark 2.4.5 集群。我试图修改要从 Spark 1.6.2 运行的代码到 Spark 2.4.5,因为在社区版中,不允许使用 Spark 1.6.2 创建集群。有人可以帮我在 Spark 2.4.5 中将 RDD 对象转换为 Dataframe 吗?


代码

summary = data.select("OrderMonthYear", "SaleAmount").groupBy("OrderMonthYear").sum().orderBy("OrderMonthYear") #.toDF("OrderMonthYear","SaleAmount")
# Convert OrderMonthYear to integer type
#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])
rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
#rddData1 = rddData.flatMap(lambda x : [(k, x) for k in x.keys()])

# assuming the spark environemnt is set and sc is spark.sparkContext 
schemaPeople = spark.createDataFrame(rddData)

在Spark 1.6.2中运行的代码如下

#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])

我修改后的代码无法正常工作,如下

rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
schemaPeople = spark.createDataFrame(rddData) 


尝试在以下行中将 RDD 转换为 DataFrame 时出现错误

schemaPeople = spark.createDataFrame(rddData) 

enter image description here

错误 enter image description here


谢谢您

完整详细信息错误

SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:57)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2890)
    at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2881)
    at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2880)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3492)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3487)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:242)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:172)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3487)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2880)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:149)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:54)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:984)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:931)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:931)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:492)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:918)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:364)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:351)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:351)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

最佳答案

终于通过下面的代码成功了。

经过 8 天的发现和堆栈溢出(“https://stackoverflow.com/users/2451763/sparker0i”)的一位好人的帮助,我们可以想出以下代码,该代码确实可以在 databricks 中与 Spark 2.4.5 配合使用。

聚合和转换

from pyspark.sql.functions import *
dataF = data.select("OrderMonthYear", date_format("OrderMonthYear", 'yyyy-MM-dd').alias("dt_format"), "SaleAmount").groupBy("dt_format").sum().orderBy("dt_format").toDF("dt_format", "SaleAmount")

dataF.show()

results = tata.rdd.map(lambda r: (int(r.dt_format.replace('-','')), r.SaleAmount)) 
df = spark.createDataFrame(results,("dt_format", "SaleAmount")) 
display(df)


将 DataFrame 转换为特征和标签

#convenience for specifying schema 

from pyspark.mllib.regression import LabeledPoint

meta = df.select("dt_format", "SaleAmount").toDF("dt_format","SaleAmount") meta.show()

rdd1 = meta.rdd.map(lambda r: LabeledPoint(r[1], [r[0]]))

rddtodf=spark.createDataFrame(rdd1,("dt_format","SaleAmount")) display(rddtodf)

希望这会有所帮助。

关于amazon-web-services - 如何将 RDD 转换为 Dataframe Spark 2.4.5 Python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61546721/

相关文章:

amazon-web-services - SSH 不适用于 C3 计算优化机器

php - PHP 'out of memory' 错误可能是由 MySQL 服务器限制引起的吗?

python - Python 中的 Azure WebJob : How to access azure python package?

hadoop - 如何设置newAPIHadoopFile的分区数?

scala - 如何将 Spark 行(StructType)转换为 scala 案例类

azure - Spark 集群 OutOfMemoryError 内存远多于所需内存

amazon-web-services - 访问CDK堆栈资源

amazon-web-services - 使用 CloudFormation 部署 Lambda 函数时,函数中的 S3Bucket 和 S3Key 是什么

azure - 如何批量添加Azure端点?

.net - 从 Windows 桌面客户端应用程序访问 Azure SQL 数据库