python - 以控制台格式 pyspark 写入流时出错

标签 python apache-spark pyspark spark-structured-streaming

使用 Windows 10.Pyspark==3.0.1.

我的代码:

spark = SparkSession.builder.appName("app").getOrCreate()
s = StructType().add("loggedPersonID", StringType(), True).add("latitude", FloatType(), True).add("longitude", FloatType(), True).add("t_from", TimestampType(), True).add("t_to", TimestampType(), True)
json_sdf = spark.readStream.schema(s).json(filepath)
json_sdf.isStreaming
json_sdf.printSchema()
json_sdf.createOrReplaceTempView("snapshot")
d = json_sdf.select("latitude").where("loggedPersonID = 6") 
d.writeStream.format("console").start()

错误:

ERROR StreamMetadata: Error writing stream metadata StreamMetadata(320b32f9-c938-410f-8acc-020630570676) to file:/C:/Users/***/AppData/Local/Temp/temporary-baf30601-3b5c-41aa-a211-cfe8cd2fe729/metadata
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
        at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
        at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
        at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
        at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
        at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100)
        at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
        at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
        at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:596)
        at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
        at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
        at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
        at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
        at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
        at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:177)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:175)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:317)
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
[2021-01-24 23:48:57,585] ERROR in app: Exception on /place_matching [POST]
Traceback (most recent call last):
  File "C:\Users\***\anaconda3\lib\site-packages\flask\app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\***\anaconda3\lib\site-packages\flask\app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\***\anaconda3\lib\site-packages\flask\app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "C:\Users\***\anaconda3\lib\site-packages\flask\_compat.py", line 39, in reraise
    raise value
  File "C:\Users\***\anaconda3\lib\site-packages\flask\app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\***\anaconda3\lib\site-packages\flask\app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "E:\*****\app.py", line 136, in algorithm
    d.writeStream.format("console").start()
  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\streaming.py", line 1211, in start
    return self._sq(self._jwrite.start())
  File "C:\Users\***\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\utils.py", line 128, in deco
    return f(*a, **kw)
  File "C:\Users\***\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o39.start.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
        at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
        at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
        at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
        at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
        at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100)
        at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
        at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
        at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:596)
        at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
        at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
        at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
        at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
        at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
        at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:177)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:175)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:317)
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)

我已经尝试了所有问题的答案:

  1. Why does starting a streaming query lead to "ExitCodeException exitCode=-1073741515"?
  2. Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

最佳答案

在这里查看我对相同问题的回答link .使用完全兼容的版本为我解决了这个问题。

关于python - 以控制台格式 pyspark 写入流时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65874601/

相关文章:

apache-spark - 使用ALS进行训练时,Spark会给出StackOverflowError

python - 网格搜索和计算机处于 sleep 模式

apache-spark - 如何设置 Spark 应用程序退出状态?

python - 执行顺序和缓存需求

azure - 从外部连接到 Databricks 管理的 Hive

hadoop - Spark - Snappy 库不可用

python - 为第三方 HTTP 调用优化 Celery

python - 如何在 Python 单元测试中模拟 __version__

Python:如何在派生类之间共享类属性?

java - 替代已弃用的方法sparksession.read.json(JavaRDD)