将FASTQ文件读入AWS Glue作业脚本

问题描述

我需要将FASTQ文件读取到AWS glue作业脚本中,但出现此错误

跟踪(最近一次通话最近):文件“ /opt/amazon/bin/runscript.py”,第59行,位于runpy.run_path(script,run_name =' main ')File“ /usr/lib64/python3.7/runpy.py”,第261行,位于run_path中 代码,fname = _get_code_from_file(run_name,path_name)文件“ /usr/lib64/python3.7/runpy.py”,第236行,在_get_code_from_file中 代码=编译(f.read(),fname,'e​​xec')文件“ / tmp / test20200930”,第24行datasource0 = spark.createDataset(sc.textFile(“ s3:// sample-genes-data / fastq / S_Sonnei_short_reads_1 .fastq“)。sliding(4,4).map { ^ SyntaxError:无效的语法在处理上述异常期间,发生了另一个异常:回溯(最近一次调用最近):文件“ /opt/amazon/bin/runscript.py”,第92行,在 而new_stack.tb_frame.f_code.co_filename中的“ runpy.py”:AttributeError:'nonetype'对象没有属性'tb_frame'

这是我的代码

import org.apache.spark.mllib.rdd.RDDFunctions._

datasource0 = spark.createDataset(sc.textFile("s3://sample-genes-data/fastq/S_Sonnei_short_reads_1.fastq").sliding(4,4).map {
  case Array(id,seq,_,qual) => (id,qual)
 }).toDF("identifier","sequence","quality")
datasource1 = DynamicFrame.fromDF(datasource0,glueContext,"nullv")

我点击了以下链接Read FASTQ file into a Spark dataframe

解决方法

我能够通过将代码包装在GlueApp对象中来运行代码。您可以通过替换您的S3路径来使用以下代码。

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.mllib.rdd.RDDFunctions._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession
    import sparkSession.implicits._
    val datasource0 = sparkSession.createDataset(spark.textFile("s3://<s3path>").sliding(4,4).map {
  case Array(id,seq,_,qual) => (id,qual)
 }).toDF("identifier","sequence","quality")
   val datasource1 = DynamicFrame(datasource0,glueContext)
   datasource1.show()
   datasource1.printSchema()
   Job.commit()
  }
}

通过输入:

@seq1
AGTCAGTCGAC
+
?@@FFBFFDDH
@seq2
CCAGCGTCTCG
+
?88ADA?BDF8

输出:

{"identifier": "@seq1","sequence": "AGTCAGTCGAC","quality": "?@@FFBFFDDH"}
{"identifier": "@seq2","sequence": "CCAGCGTCTCG","quality": "?88ADA?BDF8"}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...