以下是设置:我希望能够通过tcp连接将消息(jsons转换为bytestrings)从发布者流式传输到远程服务器订阅者.
理想情况下,发布者将是一个接收内部消息,排队然后然后将它们流式传输到订阅者服务器的参与者,如果当然有出色的需求.据我所知,这需要扩展ActorPublisher类,以便在需要时将消息扩展到onNext().
我的问题是,到目前为止,我只能向服务器发送(接收和解码)一次性消息,每次都打开一个新的连接.我没有设法绕过akka doc并能够使用ActorPublisher设置正确的tcp Flow.
以下是发布商的代码:
理想情况下,发布者将是一个接收内部消息,排队然后然后将它们流式传输到订阅者服务器的参与者,如果当然有出色的需求.据我所知,这需要扩展ActorPublisher类,以便在需要时将消息扩展到onNext().
我的问题是,到目前为止,我只能向服务器发送(接收和解码)一次性消息,每次都打开一个新的连接.我没有设法绕过akka doc并能够使用ActorPublisher设置正确的tcp Flow.
以下是发布商的代码:
def send(message: Message): Unit = { val system = Akka.system() implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val address = Play.current.configuration.getString("eventservice.location").getOrElse("localhost") val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000) /*** Try with actorPublisher ***/ //val result = Source.actorPublisher[Message] (Props[EventActor]).via(Flow[Message].map(Json.toJson(_).toString.map(ByteString(_)))) /*** Try with actorRef ***/ /*val source = Source.actorRef[Message](0,OverflowStrategy.fail).map( m => { Logger.info(s"Sending message: ${m.toString}") ByteString(Json.toJson(m).toString) } ) val ref = Flow[ByteString].via(Tcp().outgoingConnection(address,port)).to(Sink.ignore).runWith(source)*/ val result = Source(Json.toJson(message).toString.map(ByteString(_))). via(Tcp().outgoingConnection(address,port)). runFold(ByteString.empty) { (acc,in) ⇒ acc ++ in }//Handle the future }
以及最终非常标准的演员代码:
import akka.actor.Actor import akka.stream.actor.ActorSubscriberMessage.{OnComplete,OnError} import akka.stream.actor.{ActorPublisherMessage,ActorPublisher} import models.events.Message import play.api.Logger import scala.collection.mutable class EventActor extends Actor with ActorPublisher[Message] { import ActorPublisherMessage._ var queue: mutable.Queue[Message] = mutable.Queue.empty def receive = { case m: Message => Logger.info(s"EventActor - message received and queued: ${m.toString}") queue.enqueue(m) publish() case Request => publish() case Cancel => Logger.info("EventActor - cancel message received") context.stop(self) case OnError(err: Exception) => Logger.info("EventActor - error message received") onError(err) context.stop(self) case OnComplete => Logger.info("EventActor - onComplete message received") onComplete() context.stop(self) } def publish() = { while (queue.nonEmpty && isActive && totalDemand > 0) { Logger.info("EventActor - message published") onNext(queue.dequeue()) } }
如有必要,我可以提供订阅者的代码:
def connect(system: ActorSystem,address: String,port: Int): Unit = { implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val handler = Sink.foreach[Tcp.IncomingConnection] { conn => Logger.info("Event server connected to: " + conn.remoteAddress) // Get the ByteString flow and reconstruct the msg for handling and then output it back // that is how handleWith work apparently conn.handleWith( Flow[ByteString].fold(ByteString.empty)((acc,b) => acc ++ b). map(b => handleIncomingMessages(system,b.utf8String)). map(ByteString(_)) ) } val connections = Tcp().bind(address,port) val binding = connections.to(handler).run() binding.onComplete { case Success(b) => Logger.info("Event server started,listening on: " + b.localAddress) case Failure(e) => Logger.info(s"Event server could not bind to $address:$port: ${e.getMessage}") system.terminate() } }
提前感谢提示.
解决方法
我的第一个建议是不要编写自己的队列逻辑. Akka提供了这种开箱即用的功能.您也不需要编写自己的Actor,Akka Streams也可以提供它.
首先,我们可以创建Flow,通过Tcp将您的发布者连接到您的订阅者.在您的发布商代码中,您只需创建一次ActorSystem并连接到外部服务器一次:
//this code is at top level of your application implicit val actorSystem = ActorSystem() implicit val actorMaterializer = ActorMaterializer() import actorSystem.dispatcher val host = Play.current.configuration.getString("eventservice.location").getOrElse("localhost") val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000) val publishFlow = Tcp().outgoingConnection(host,port)
publishFlow是一个Flow
,它将输入您要发送给外部订阅者的ByteString数据,并输出来自订阅者的ByteString数据:
// data to subscriber ----> publishFlow ----> data returned from subscriber
下一步是发布者来源.您可以使用Source.actorRef
到“materialize”将Stream转换为ActorRef,而不是编写自己的Actor.基本上,Stream将成为我们稍后使用的ActorRef:
//these values control the buffer val bufferSize = 1024 val overflowStrategy = akka.stream.OverflowStrategy.dropHead val messageSource = Source.actorRef[Message](bufferSize,overflowStrategy)
我们还需要一个Flow将Messages转换为ByteString
val marshalFlow = Flow[Message].map(message => ByteString(Json.toJson(message).toString))
最后,我们可以连接所有部分.由于您没有从外部订户接收任何数据,我们将忽略来自连接的任何数据:
val subscriberRef : ActorRef = messageSource.via(marshalFlow) .via(publishFlow) .runWith(Sink.ignore)
我们现在可以将此流视为一个Actor:
val message1 : Message = ??? subscriberRef ! message1 val message2 : Message = ??? subscriberRef ! message2