从 Spark 服务器执行 SFTP 时在大型机服务器上截断记录级数据

问题描述

请完整阅读。

我正在通过 SFTP 将 csv 文件从在 Scala 中开发的 spark 应用程序发送到大型机服务器。我正在使用 jsch(java 安全通道)包版本 0.1.53 版本来完成从 Spark 服务器到大型机服务器的 SFTP 连接。我面临的问题是,在大型机服务器上,csv 文件被截断为 每条记录行 1024 字节。

经过研究,我发现在大型机上,我们可以选择使用“lrecl”和“recfm”来控制文件中每条记录的长度和该记录的格式。但我无法在 Scala 上集成这些选项。我在 stackoverflow 上找到了 this 答案,该答案用于在 Java 中实现。当我在 Scala 上使用相同的逻辑时,出现以下错误

EDC5129I No such file or directory.,file: /+recfm=fb,lrecl=3000 at
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2846)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2198)
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2215)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1565)
    at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1526)

使用jsch库建立SFTP连接和传输文件的Scala代码块如下:

session = jsch.getSession(username,host,port)
session.setConfig("PreferredAuthentication","publickey")
session.setConfig("MaxAuthTries",2)
System.out.println("Created SFTP Session")

val sftpSessionConfig: Properties = new Properties()
sftpSessionConfig.put("StrictHostKeyChecking","no")
session.setConfig(sftpSessionConfig)
session.connect() //Connect to session
System.out.println("Connected to SFTP Session")
      
val channel = session.openChannel("sftp")
channel.connect()
val sftpChannel = channel.asInstanceOf[ChannelSftp]
sftpChannel.ls("/+recfm=fb,lrecl=3000") //set lrecl and recfm ---> THROWING ERROR HERE

sftpChannel.put(sourceFile,destinationPath,ChannelSftp.APPEND) //Push file from local to mainframe

有什么方法可以使用 jsch 库将这些选项设置为我的 Scala 代码中的配置吗?我也尝试使用 spring-ml 的 spark-sftp 包。但是这个包在大型机服务器上也存在数据截断的问题。

请帮忙,因为这个问题已成为我项目的非常重要的障碍。

编辑:使用 Scala 代码块更新问题

解决方法

来自幻灯片 21 上的演示文稿 Dovetail SFTP Webinar

ls /+recfm=fb,lrecl=80

在我看来,您的代码中有一个“/”太多了。

从错误信息来看,我认为SFTP服务器有UNIX文件系统中的当前路径。您没有为数据集设置数据集高级限定符 (HLQ),对吗?我在代码中看不到它。再次从上面的演示文稿中,在 cd 之前执行 ls

cd //your-hlq-of-choice

这将做两件事:

  1. 将当前工作目录更改为 MVS 数据集端。
  2. 设置要使用的 HLQ。

对不起,我无法测试自己;我不知道 Scala。

,

首先,z/OS 上运行的是什么 SFTP 服务器?如果它是随 z/OS(不是 Dovetail)提供的,则不支持您正在执行的命令,您将收到类似 Can't ls: "/+recfm=fb,lrecl=80" not found 的消息。这将是有效的,因为那不是有效的文件。 / 右侧的所有内容都将被视为文件名的一部分。

我将您的代码转换为 Java,因为我不熟悉 Scala 并且没有时间学习它。这是我使用的代码示例。

import com.jcraft.jsch.JSch;
import java.util.Properties;
import java.util.Vector;

class sftptest {
  static public void main(String[] args) {

    String username = "ibmuser";
    String host = "localhost";
    int port = 10022;              // Note,my z/OS is running in a docker container so I map 10022 to 22
    JSch jsch = new JSch(); 
    String sourceFile = "/";
    String destinationPath ="/";
    String privateKey = "myPrivateKey";


    try {
      jsch.addIdentity(privateKey);                   //add private key path and file
      com.jcraft.jsch.Session session = jsch.getSession(username,host,port);
      session.setConfig("PreferredAuthentication","password");
      session.setConfig("MaxAuthTries","2");
      System.out.println("Created SFTP Session");

      Properties sftpSessionConfig = new Properties();
      sftpSessionConfig.put("StrictHostKeyChecking","no");
      session.setConfig(sftpSessionConfig);
      session.connect(); //Connect to session
      System.out.println("Connected to SFTP Session");
      
      com.jcraft.jsch.ChannelSftp channel = (com.jcraft.jsch.ChannelSftp) session.openChannel("sftp");
      channel.connect();
      // com.jcraft.jsch.Channel sftpChannel = (ChannelSftp) channel;
      //    channel.ls("/+recfm=fb,lrecl=3000"); //set lrecl and recfm ---> THROWING ERROR HERE
      //    channel.ls("/"); //set lrecl and recfm ---> THROWING ERROR HERE
      Vector filelist = channel.ls("/");
      for(int i=0; i<filelist.size();i++){
          System.out.println(filelist.get(i).toString());
      }


    //  channel.put(sourceFile,destinationPath,com.jcraft.jsch.ChannelSftp.APPEND);  //Push file from local to mainframe
    } catch (Exception e) {
      System.out.println("Exception "+e.getMessage());
    }
  }
}

就我而言,我确实使用了 ssh 密钥而不是密码。您的 ls 方法的输出是:

Created SFTP Session
Connected to SFTP Session
Exception No such file

+ 和所有内容放到右侧:

Created SFTP Session
Connected to SFTP Session
drwxr-xr-x    2 OMVSKERN SYS1         8192 May 13 01:18 .
drwxr-xr-x    7 OMVSKERN SYS1         8192 May 13 01:18 ..
-rw-r--r--    1 OMVSKERN SYS1            0 May 13 01:18 file 1
-rw-r--r--    1 OMVSKERN SYS1            0 May 13 01:18 file 2

主要问题是 z/OS 似乎不支持您使用的语法,该语法由 Dovetail 的特定 SFTP 实现提供。

如果您没有 Dovetail,我建议您发送通常长度可变的 CSV 文件,将它们作为 USS 文件发送,以便正确翻译行并具有可变长度。将它们传输到 USS(z/OS 上的常规 Unix),然后将它们复制到具有 VB RECFM 的 MVS 文件。假设文件已分配,您可以执行 cp myuploadedFile.csv "//'MY.MVS.FILE'"