scala - 从 Spark 服务器执行 SFTP 时,大型机服务器上的记录级别数据截断

标签 scala apache-spark sftp jsch mainframe

请完整阅读本文。

我正在努力通过 SFTP 从 scala 开发的 Spark 应用程序发送 csv 文件到大型机服务器。我正在使用jsch(java安全通道)包版本0.1.53版本来完成从spark服务器到大型机服务器的SFTP连接。我面临的问题是,在大型机服务器上,csv 文件被截断为每条记录行 1024 字节。

经过研究,我发现在大型机上,我们可以使用“lrecl”和“recfm”等选项来控制文件中每条记录的长度以及该记录的格式。但我无法在 scala 上集成这些选项。我发现this stackoverflow 上的答案,旨在用 Java 实现。当我在 scala 上使用相同的逻辑时,出现以下错误:

EDC5129I No such file or directory., file: /+recfm=fb,lrecl=3000 at
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2846)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2198)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2215)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1565)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1526)

使用jsch库建立SFTP连接和传输文件的Scala代码块如下:

session = jsch.getSession(username, host, port)
session.setConfig("PreferredAuthentication","publickey")
session.setConfig("MaxAuthTries",2)
System.out.println("Created SFTP Session")

val sftpSessionConfig: Properties = new Properties()
sftpSessionConfig.put("StrictHostKeyChecking","no")
session.setConfig(sftpSessionConfig)
session.connect() //Connect to session
System.out.println("Connected to SFTP Session")
      
val channel = session.openChannel("sftp")
channel.connect()
val sftpChannel = channel.asInstanceOf[ChannelSftp]
sftpChannel.ls("/+recfm=fb,lrecl=3000") //set lrecl and recfm ---> THROWING ERROR HERE

sftpChannel.put(sourceFile, destinationPath,ChannelSftp.APPEND) //Push file from local to mainframe

有什么方法可以使用 jsch 库将这些选项设置为我的 scala 代码中的配置吗?我还尝试使用 spring-ml 的spark-sftp 包。但这个包也存在主机服务器上数据截断的问题。

请提供帮助,因为这个问题已成为我的项目的非常关键的障碍。

编辑:使用 scala 代码块更新问题

最佳答案

摘自本演示文稿Dovetail SFTP Webinar第 21 张幻灯片:

ls /+recfm=fb,lrecl=80

在我看来,您的代码中的“/”太多了。

从错误消息来看,我认为SFTP服务器具有UNIX文件系统中的当前路径。您没有为数据集设置数据集高级限定符(HLQ),是吗?我在代码中看不到它。再次从上面的演示中,在 ls 之前执行 cd:

cd //your-hlq-of-choice

这会做两件事:

  1. 将当前工作目录更改为MVS数据集端。
  2. 设置要使用的 HLQ。

抱歉,我无法测试自己;我不知道scala。

关于scala - 从 Spark 服务器执行 SFTP 时,大型机服务器上的记录级别数据截断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67474859/

相关文章:

java - 使用 hasNext() 和 next() 遍历异步生成的元素流

scala - 如何在 Scala 准引号中拼接类型和默认值?

scala - 如何从字符串列中提取数字?

elasticsearch - sbt 无法解析来自 sonatype 快照的依赖关系

amazon-web-services - SFTP Chroot 用户到挂载的 S3 存储桶

ruby - Ruby 中使用 net-sftp 进行基于密码 key 的身份验证

scala - scala 中是否推荐使用枚举(相对于包括特征/案例对象的替代方案)?

java - ( Play 2.1.3) @select 返回(String, String) 对

hadoop - Spark自定义序列化程序导致ClassNotFound

sftp - Jsch错误-无法发送 channel 请求