问题描述
我必须将Socket应用程序转换为akka,但似乎没有响应。我是akka的新手,修改示例似乎无效。
使用普通套接字建立与设备的连接,并且在将HEX cmd传递给设备后将返回HEX数据。 这是套接字线程的原始代码:
BlockingQueue<byte[]> incomingResults = new LinkedBlockingQueue<>();
clientSocket = new Socket(ipAddress,port);
clientSocket.setSoTimeout(5000);
out = new DataOutputStream(clientSocket.getoutputStream());
in = clientSocket.getInputStream();
byte[] read = new byte[512];
int len;
out.write(Hex.decodeHex("F000001A501101007F1201F7"));
Logger.debug("Connected and requested data on " + ipAddress + ":" + port);
while ((len = in.read(read)) > -1) {
incomingResults.add(Arrays.copyOfRange(read,len));
if (new Date().getTime() - lastFe.getTime() > 8000) { // The data flow for approx 10 seconds,so need to re-request.
out.write(Hex.decodeHex("F000001A501101007F1201F7"));
lastFe = new Date();
}
}
此输入结果然后由另一个线程处理。 我正在使用Play!使用akka和使用原始线程的框架不能很好地工作。 但是我似乎无法弄清楚如何将它们放在一起,建立连接,然后能够发送cmd并继续处理传入的数据(并偶尔发出刷新cmd)。
如果可以立即异步处理数据,那么最好写一个单独的akka流,我认为可以用Materializer完成,但是我还没走那么远。
这是我的第一次尝试:
String requestCmd = "F000001A501101007F1201F7";
final Flow<ByteString,ByteString,CompletionStage<Tcp.OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(ipAddress,port);
final Flow<String,NotUsed> replParser =
Flow.<String>create()
.takeWhile(elem -> !elem.equals("q"))
.concat(Source.single("BYE")) // will run after the original flow completes
.map(elem -> ByteString.fromString(elem + "\n"));
final Flow<ByteString,NotUsed> repl =
Flow.of(ByteString.class)
.map(ByteString::utf8String)
.map(elem -> requestCmd)
.via(replParser)
.via(Framing.delimiter(ByteString.fromString("\n"),256,FramingTruncation.disALLOW))
.map(
text -> {
Logger.debug("Server: " + text.decodeString(ByteString.UTF_8()));
return "next";
}).via(replParser);
但这只会返回我发出的命令。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)