如何解析 akka 流中的 avro byteString 流

问题描述

我在 s3 存储桶中有 Avro 文件并尝试流式传输和解析为案例类。 我有要解析的架构,但不知道如何处理。

我使用 s3.download 从 s3 存储桶下载并流式传输文件,然后将其转换为 utf8string。

请协助,我如何解析我们所拥有的架构,并考虑到我获得的输入流。

解决方法

我将根据您要求使用架构来(反)使用 Avro 序列化消息来回答这个问题。

我有要解析的架构,但不知道如何处理。

并假设您已经从 s3.buckets 下载了消息。然后我将使用我的例子,我在 PostgreSQL 上持久化消息,只是为了一个有效的例子。但是您可以假设您的 s3.bucket 连接。

我正在使用 com.sksamuel.avro4s 库来创建我的 Avro 序列化程序。以下是放入 build.sbt 的必要库:

  val akkaVersion = "2.6.10"
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,"com.typesafe.akka" %% "akka-persistence" % akkaVersion,"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4","org.xerial.snappy" % "snappy-java" % "1.1.8.2","org.postgresql" % "postgresql" % "42.2.2","com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",

然后创建序列化程序,在我的例子中是 MyFirstAvroSerializer,扩展 akka.serialization.Serializer。它有一个模式,在我的例子中是案例类 CompanyRegistry。基本上,您必须实现方法 identifier 必须具有唯一 ID、toBinaryfromBinary 来转换消息,以及 includeManifest 是错误的,因为我不不需要清单。

import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream,AvroOutputStream,AvroSchema}
import com.typesafe.config.ConfigFactory

import java.io.{ByteArrayInputStream,ByteArrayOutputStream}

case class BankAccount(iban: String,bankCode: String,amount: Double,currency: String)
case class CompanyRegistry(name: String,accounts: Seq[BankAccount],activityCode: String,marketCap: Double)

class MyFirstAvroSerializer extends Serializer {
  val schema = AvroSchema[CompanyRegistry]
  override def identifier: Int = 454874
  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case c: CompanyRegistry =>
      val baos = new ByteArrayOutputStream()
      val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
      avroOutputStream.write(c)
      avroOutputStream.flush()
      avroOutputStream.close()
      baos.toByteArray
    case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
  }
  override def fromBinary(bytes: Array[Byte],manifest: Option[Class[_]]): AnyRef = {
    val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
    val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
    val companyRegistry = companyRegistryIterator.next()
    avroInputStream.close()
    companyRegistry
  }
  override def includeManifest: Boolean = false
}

然后您必须配置您的项目以在参与者之间交换 akka 消息时调用此序列化程序。通过添加特定配置在 application.conf 上配置它。在我的情况下是 avroSerializable。您在 MyFirstAvroSerializer 范围下设置 serializers,在 serialization-bindings 范围下设置案例类。我也配置了 Akka-remote 但你可以忽略它。

avroSerializable {
  akka {
    actor {
      provider = remote
      #allow-java-serialization = off
      serializers {
        java = "akka.serialization.JavaSerializer"
        avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
      }
      serialization-bindings {
        "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
        "java.io.Serializable" = java
      }
    }
    remote {
      artery {
        enabled = on
        transport = aeron-udp
        canonical.hostname = "localhost"
      }
    }
  }
}

就像我一开始说的,我使用的是 PostgreSQL。但在您的情况下,它将是 s3 存储桶存储配置。为了完整起见,我将离开这里,因为我在创建 actor 系统时调用了这个配置。

postgresStore {
  akka.persistence.journal.plugin = "jdbc-journal"
  akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
  akka.actor.allow-java-serialization = on
  # create JDBC configuration to Akka persistence
  akka-persistence-jdbc {
    shared-databases {
      slick {
        profile = "slick.jdbc.PostgresProfile$"
        db {
          numThreads = 10
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/rtjvm"
          user = "docker"
          password = "docker"
        }
      }
    }
  }
  # dbinding the JDBC plugins with the configureation created above
  jdbc-journal {
    use-shared-db = "slick"
  }
  jdbc-snapshot-store {
    use-shared-db = "slick"
  }
}

现在是创建角色系统和简单角色 SimplePersistentActor 并通过网络发送消息的时候了。 SimplePersistentActor 只是一个非常简单的 actor,它接受我发送的消息,没什么特别的。

object AvroSerialization_Persistence {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load().getConfig("postgresStore")
      .withFallback(ConfigFactory.load("avroSerializable"))
    val system = ActorSystem("postgresStoreSystem",config)
    val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"),"personAvroActor")
    val companyRegistryMsg = CompanyRegistry(
      "Google",Seq(
        BankAccount("US-1234","google-bank",4.3,"gazillion dollars"),BankAccount("GB-4321",0.5,"trillion pounds")
      ),"ads",523895
    )
    simplePersistentActor ! companyRegistryMsg
  }
}