我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个Sink.
该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并将其发送到接收器.
该程序应该打开服务器套接字,侦听传入的连接,然后使用tcp流.我们的发件人不希望/接受我们的回复,所以我们永远不会发回任何东西 – 我们只是消费流.在构建tcp流之后,我们需要将字节转换为更有用的字节并将其发送到接收器.
到目前为止我尝试了以下但是我特别努力解决了如何不将tcp数据包发送回发送方并正确连接接收器的部分.
import scala.util.Failure import scala.util.Success import akka.actor.ActorSystem import akka.event.Logging import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Tcp import akka.stream.scaladsl.Framing import akka.util.ByteString import java.nio.ByteOrder import akka.stream.scaladsl.Flow object TcpConsumeOnlyStreamToSink { implicit val system = ActorSystem("stream-system") private val log = Logging(system,getClass.getName) //The Sink //In reality this is of course a real Sink doing some useful things :-) //The Sink accept types of "SomethingMySinkUnderstand" val mySink = Sink.ignore; def main(args: Array[String]): Unit = { //our sender is not interested in getting replies from us //so we just want to consume the tcp stream and never send back anything to the sender val (address,port) = ("127.0.0.1",6000) server(system,address,port) } def server(system: ActorSystem,address: String,port: Int): Unit = { implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val handler = Sink.foreach[Tcp.IncomingConnection] { conn => println("Client connected from: " + conn.remoteAddress) conn handleWith Flow[ByteString] //this is neccessary since we use a self developed tcp wire protocol .via(Framing.lengthField(4,65532,ByteOrder.BIG_ENDIAN)) //here we want to map the raw bytes into something our Sink understands .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) //here we like to connect our Sink to the Tcp Source .to(mySink) //<------ NOT COMPILING } val tcpsource = Tcp().bind(address,port) val binding = tcpsource.to(handler).run() binding.onComplete { case Success(b) => println("Server started,listening on: " + b.localAddress) case Failure(e) => println(s"Server Could not bind to $address:$port: ${e.getMessage}") system.terminate() } } class SomethingMySinkUnderstand(x:String) { } }
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
解决方法
handleWith期望Flow,即具有未连接入口和未连接插座的盒子.您可以有效地提供Source,因为您使用to操作将Flow连接到Sink.
我想你可以做到以下几点:
conn.handleWith( Flow[ByteString] .via(Framing.lengthField(4,ByteOrder.BIG_ENDIAN)) .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) .alsoto(mySink) .map(_ => ByteString.empty) .filter(_ => false) // Prevents sending anything back )