使用Databricks将文件从Azure Blob存储上传到SFTP位置?

问题描述

我有一种方案,需要将文件从Azure Blob存储复制到Databricks中的SFTP位置

是否有一种使用pySpark或Scala实现此方案的方法

解决方法

关于此问题,请参考以下步骤(我使用scala)

  1. 将Azure Blob存储容器安装到DBFS
dbutils.fs.mount(
  source = "<container-name>@<storage-account-name>.blob.core.windows.net",mountPoint = "/mnt/blob",extraConfigs = Map("fs.azure.account.key.<storage-account-name>.blob.core.windows.net" -> "<key>"))

dbutils.fs.ls("/mnt/blob")

enter image description here

  1. 将这些文件复制到群集本地文件系统
%sh

cp -R /dbfs/mnt/blob  /databricks/driver
ls -R /databricks/driver/blob

enter image description here

  1. 代码。在运行代码之前,请在数据砖中添加库com.jcraft.jsch vai Maven
import java.io.File
import scala.sys.process._
import com.jcraft.jsch._
def recursiveListFiles(f: File): Array[File] = {
  val these = f.listFiles
  these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}
val jsch = new JSch()
val session = jsch.getSession("<usename>","<host>",<port>) // Set your username and host
session.setPassword("<password>") // Set your password
val config = new java.util.Properties()
config.put("StrictHostKeyChecking","no")
session.setConfig(config)
session.connect()
val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
channelSftp.connect()

val files =recursiveListFiles(new File("/databricks/driver/blob"))

files.foreach(file =>{

  if(file.isFile()){
    println(file.getPath())
    channelSftp.put(file.getPath(),"/home/testqw/upload")
  }
     
  

})
channelSftp.disconnect()
session.disconnect()

enter image description here

  1. 使用FileZilla
  2. 进行检查

#Update

挂载Azure blob之后,我们可以直接访问文件并将其上传。

例如

import java.io.File
import scala.sys.process._
import com.jcraft.jsch._
def recursiveListFiles(f: File): Array[File] = {
  val these = f.listFiles
  these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}
val jsch = new JSch()
val session = jsch.getSession("","",22) // Set your username and host
session.setPassword("") // Set your password
val config = new java.util.Properties()
config.put("StrictHostKeyChecking","no")
session.setConfig(config)
session.connect()
val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
channelSftp.connect()
val home = channelSftp.getHome()

val files =recursiveListFiles(new File("/dbfs/mnt/blob"))

files.foreach(file =>{

  if(file.isFile()){
    println(file.getPath())
    channelSftp.put(file.getPath(),"/home/testqw/upload")
  }
     
  

})
channelSftp.disconnect()
session.disconnect()

enter image description here