无法使用mongodb在akka http中将scala.concurrent.impl.Promise $ Transformation转换为scala.collection.immutable.Seq erro?

问题描述

基本上,我尝试使用akka http从数据库中获取数据。如果我直接在api中传递(EmployeeRepo.findAll()),那么它会显示所有数据,但在使用actor时会显示转换错误...。 仅数据获取在这里出现的问题,请解决

这是我的EmployeeRepo -----------------------------------

    package org.repo

import org.data.Employee
import org.db.DbConfig
import org.mongodb.scala.MongoCollection
import org.utils.JsonUtils
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Filters.equal
import org.mongodb.scala.model.FindOneAndUpdateOptions
import org.mongodb.scala.model.Updates.{combine,set}

import scala.concurrent.Future

object EmployeeRepo extends JsonUtils{
private val employeeDoc:MongoCollection[Employee]=DbConfig.employees

def createCollection()={
DbConfig.database.createCollection("employee").subscribe(
  (result)=>println(s"$result"),e=>println(e.getLocalizedMessage),()=>println("complete")
)
}


  def insertData(emp:Employee)={
    employeeDoc.insertOne(emp).toFuture()
  }
  def findAll()= employeeDoc.find().toFuture()

  def update(emp:Employee,id:String)=
    employeeDoc
      .findOneAndUpdate(equal("_id",id),setBsonValue(emp),FindOneAndUpdateOptions().upsert(true)).toFuture()

  def delete(id:String)=
    employeeDoc.deleteOne(equal("_id",id)).toFuture()
  private def setBsonValue(emp:Employee)={
    combine(
      set("name",emp.name),set("dateOfBirth",emp.dateOfBirth)
    )
  }
}

============ EMployeeService ------------ 软件包org.service

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.data.Employee
import org.domain.EmployeeRequest
import org.repo.EmployeeRepo

import scala.concurrent.Future

class EmployeeService {

implicit val system=ActorSystem("Employeee")
  implicit val materializer=ActorMaterializer
  import system.dispatcher

def saveEmployeeData= (employeeRequest:EmployeeRequest) => {
val employeeDoc:Employee=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.insertData(employeeDoc)
}
def findAll={
  EmployeeRepo.findAll()
}

  def update(employeeRequest: EmployeeRequest,id:String)={
    val employeeDoc=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.update(emp = employeeDoc,id)
  }
  def delete(id:String)=EmployeeRepo.delete(id)

  private def employeeMapperWithNewId(employee:EmployeeRequest)={
Employee(name=employee.name,dateOfBirth = LocalDate.parse(employee.dateOfBirth,DateTimeFormatter.ISO_DATE),_id=UUID.randomUUID.toString)
  }
}

----------------- EmployeeActor ------

package org.actor

import akka.actor.{Actor,ActorLogging}
import org.actor.EmployeeActor.{Delete,Save,SearchAll,Update}
import org.data.Employee
import org.domain.EmployeeRequest
import org.service.EmployeeService

object EmployeeActor {

  case class Save(emp: EmployeeRequest)

  case object SearchAll

  case class Update(emp: EmployeeRequest,id: String)

  case class Delete(id: String)

}

class
EmployeeActor extends Actor with ActorLogging {
  private val employeeService: EmployeeService = new EmployeeService()

  override def receive: Receive = {
    case Save(emp) =>
      log.info(s"recevied msg saved with employee :$emp")
      sender() ! employeeService.saveEmployeeData(emp)

    case SearchAll =>
      log.info("received msg find ALL")
      sender() ! employeeService.findAll

    case Update(emp,id) =>
      log.info(s"received update msg for id $id and employee $emp")
      sender() ! employeeService.update(emp,id)
    case Delete(id) =>
      log.info(s"received msg for deleting employee of id $id")
      sender() ! employeeService.delete(id)
    case _ =>
      log.debug("Unhandled msg")


  }
}

--------------- EmployeeRoute ----

    package org
    
    import akka.actor.{ActorSystem,Props}
    import akka.http.scaladsl.model.{ContentTypes,HttpEntity,StatusCodes}
    import akka.stream.ActorMaterializer
    import org.actor.EmployeeActor
    import org.utils.{JsonUtils,TimeUtils}
    import akka.http.scaladsl.server.Directives._
    import akka.{NotUsed,util}
    import akka.util.{ByteString,Timeout}
    import org.data.Employee
    import org.domain.EmployeeRequest
    import akka.pattern.{Patterns,ask}
    import akka.stream.scaladsl.Source
    import org.actor.EmployeeActor.{Delete,Update}
    import org.service.EmployeeService
    
    import scala.concurrent.duration._
    import spray.json._
    
    import scala.concurrent.{Await,Future}
    
    class EmployeeRoute extends JsonUtils{
    
      implicit val system=ActorSystem("Employee")
      implicit val materializer=ActorMaterializer
      import system.dispatcher
      val actor=system.actorOf(Props[EmployeeActor],"employeeActor")
      val employeeService=new EmployeeService()
    
      implicit val timeOut=Timeout(5.seconds)
    
      val getRoute={
        pathPrefix("employee"){
    
          (pathEndOrSingleSlash & get){
            complete((actor ? SearchAll).mapTo[Seq[EmployeeRequest]])
  }~
          ( path("update") & put &  parameter("id".as[String])){id=>
            entity(as[EmployeeRequest]){employee=>
              complete((actor ? Update(employee,id)).map(_=>StatusCodes.OK))
            }
          }~
          post{
            entity(as[EmployeeRequest]) { employee =>
              complete((actor ? Save(employee)).map(_ => StatusCodes.OK))
            }
          }~
            delete{
              (path(Segment) |parameter("id".as[String])){id=>
                complete((actor ? Delete(id)).map(_=>StatusCodes.OK))
              }
            }
    
    
    
        }
      }
    }

=================错误==============

[ERROR] [09/09/2020 19:46:48.551] [web-app-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(web-app)] Error during processing of request: 'Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq'. Completing with 500 Internal Server Error response. To change default exception handling behavior,provide a custom ExceptionHandler.
java.lang.ClassCastException: Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq
    at java.base/java.lang.Class.cast(Class.java:3734)
    at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:464)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:430)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:164)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:392)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:299)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:249)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:242)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:615)
    at org.actor.EmployeeActor$$anonfun$receive$1.applyOrElse(EmployeeActor.scala:32)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at org.actor.EmployeeActor.aroundReceive(EmployeeActor.scala:22)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

请帮帮我。.

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)