问题描述
我想通过 websockets 发送一些请求,并等待客户端的响应。我正在尝试使用通道来做到这一点,但问题是等待通道上的响应阻塞了线程,该线程还负责读取数据流上的数据。仅通过添加 async 函数前缀,我无法真正使 Handler 函数异步。
session.rs
use actix::prelude::*;
use std::sync::mpsc;
use actix_web_actors::ws;
use std::time::{Duration,Instant};
use super::Response;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
type ChannelTuple<T> = (mpsc::Sender<T>,mpsc::Receiver<T>);
pub struct WebsocketSession {
last_heartbeat: Instant,pub response_channel: ChannelTuple<Response>,}
impl WebsocketSession {
pub fn new() -> Self {
Self{
response_channel: mpsc::channel(),last_heartbeat: Instant::Now(),}
}
}
impl Actor for WebsocketSession {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self,ctx: &mut Self::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL,|act,ctx| {
let expired = Instant::Now().duration_since(act.last_heartbeat) > CLIENT_TIMEOUT;
if expired {
log::info!("Websocket client disconnected because ping has not been received");
ctx.stop();
} else {
ctx.ping(&[]);
}
});
}
}
impl StreamHandler<Result<ws::Message,ws::ProtocolError>> for WebsocketSession {
fn handle(
&mut self,msg: Result<ws::Message,ws::ProtocolError>,ctx: &mut Self::Context,) {
println!("WS: {:?}",msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.last_heartbeat = Instant::Now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.last_heartbeat = Instant::Now();
}
Ok(ws::Message::Text(text)) => {
let (tx,_) = &self.response_channel;
tx.send(Response(text.to_string())).unwrap();
ctx.text(format!("Response: {}",text));
},Ok(ws::Message::Binary(bin)) => ctx.binary(bin),Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),}
}
}
执行.rs
use std::time::Duration;
use std::sync::mpsc;
use actix::dev::*;
use super::session::WebsocketSession;
const EXECUTE_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub enum ExecuteError {
Timeout,}
impl From<mpsc::RecvTimeoutError> for ExecuteError {
fn from(err: mpsc::RecvTimeoutError) -> Self {
match err {
mpsc::RecvTimeoutError::disconnected => ExecuteError::Timeout,mpsc::RecvTimeoutError::Timeout => ExecuteError::Timeout,}
}
}
#[derive(MessageResponse,Debug)]
pub struct Response(pub String);
#[derive(Message,Debug)]
#[rtype(result = "Result<Response,ExecuteError>")]
pub struct Request(pub String);
impl Handler<Request> for WebsocketSession {
type Result = Result<Response,ExecuteError>;
fn handle(
&mut self,_req: Request,ctx: &mut Self::Context
) -> Self::Result {
ctx.text("Send me response");
let (_,rx) = &self.response_channel;
let response = rx.recv_timeout(EXECUTE_TIMEOUT)?;
Ok(response)
}
}
/ws 路径的处理程序,我将每个连接都放到一个 HashMap 中,以便稍后通过 ID 找到连接
use actix_web::{get,HttpRequest,HttpResponse,web};
use actix_web_actors::ws;
mod session;
mod execute;
pub use execute::Request as ExecuteRequest;
pub use execute::Response;
pub use session::WebsocketSession;
#[get("/ws")]
pub async fn index(
request: HttpRequest,stream: web::Payload,app_state: web::Data<super::AppState>,) -> actix_web::Result<HttpResponse> {
println!("Received new connection at /ws");
let id = uuid::Uuid::new_v4().to_string();
let session = WebsocketSession::new();
let (addr,res) = ws::start_with_addr(session,&request,stream)?;
let mut sessions = app_state.sessions.lock().unwrap();
sessions.insert(id.clone(),addr);
if sessions.contains_key(&id.clone()) != true {
println!("Failed");
};
println!("Websocket session started with ID: {}",id);
Ok(res)
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)