如何执行线程安全IO和缓存到rust中的文件?

问题描述

上下文:

我正在编写一个Web服务器,在其中处理不同的段。我想将这些不同的段缓存在不同的文件中(这些段的最大大小为10MB)。像这样:

pub async fn segment_handler(segment: String) {
   if is_cached(&segment) {
       return get_from_cache(segment)
   }
   // Do some computation to get the result.
   let result = do_some_large_computation(segment);
   // Cache this result to a file.
   let file_name = &format!("./cache/{}",&segment);
   fs::create(file_name);
   fs::write(file_name,result).expect("Unable to write file");
   result
}

现在,由于segment_handler可以由具有不同segment的多个线程调用,因此fs::write线程安全吗?否则,我们将无法使用互斥锁,因为每个呼叫的segment: String可能会有所不同,并且使用互斥锁会使性能变差。我需要类似互斥的内容,但仅在segment: String上。解决此问题的方法是什么?

环境:

解决方法

您发布的代码无法编译,因为没有fs::create之类的东西,但是幸运的是您根本不需要它。 fs::write函数为您创建文件。

至少在Linux上,从几个不同的线程在同一路径上同时调用fs::write会导致该文件包含传递给fs::write调用之一的内容。请注意,如果您使用文件的存在来确定是否需要从缓存中读取或重新计算,则可能会导致多个线程重新计算相同的值,然后所有线程将其写入文件。


您应该注意,由于您使用的是异步/等待,因此不允许使用std::fs模块,因为它会阻塞线程。您应该像这样使用tokio::fs::write

pub async fn segment_handler(segment: String) {
    if is_cached {
         return get_from_cache(segment)
    }
    // Do some computation to get the result.
    let result = do_some_large_computation(segment);
    // Cache this result to a file.
    let file_name = &format!("./cache/{}",&segment);
    tokio::fs::write(file_name,result).await.expect("Unable to write file");
    result
}

另一个正确的选择是像这样使用spawn_blocking

pub async fn segment_handler(segment: String) {
    if is_cached {
        return get_from_cache(segment)
    }
    tokio::task::spawn_blocking(move || {
        // Do some computation to get the result.
        let result = do_some_large_computation(segment);
        // Cache this result to a file.
        let file_name = &format!("./cache/{}",&segment);
        tokio::fs::write(file_name,result).await.expect("Unable to write file");
        result
    }).await.unwrap("Panic in spawn_blocking")
}

您可以在CPU-bound tasks and blocking code中从Tokio的文档中了解有关为何必须正确处理阻塞的更多信息。

通过在每个线程上反复交换当前正在运行的任务,Tokio能够在几个线程上同时运行许多任务。但是,这种交换只能在.await点进行,因此花费很长时间而未到达.await的代码将阻止其他任务运行。为了解决这个问题,Tokio提供了两种线程:核心线程和阻塞线程。核心线程是运行所有异步代码的位置,默认情况下,Tokio将为每个CPU核心生成一个。阻塞线程是按需生成的,可用于运行阻塞代码,否则将阻止其他任务运行。

要生成阻止任务,应使用spawn_blocking函数。

请注意,我已经链接到Tokio 0.2的文档,因为warp尚不支持Tokio 0.3。


如果在第一次调用完成之前多次调用该函数,为防止多次计算该值,可以使用基于存储在互斥锁后面的HashMap的技术,如下所示:

use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::broadcast;

pub struct Cache {
    inner: Mutex<Inner>,}
struct Inner {
    cached: HashMap<String,CachedType>,pending: HashMap<String,broadcast::Sender<CachedType>>,}

pub enum TryCached {
    Exists(CachedType),Pending(broadcast::Receiver<CachedType>),New(),}

impl Cache {
    pub fn try_get(&self,key: &str) -> TryCached {
        let mut inner = self.inner.lock().unwrap();
        if let Some(value) = inner.cached.get(key) {
            // To avoid clone,use HashMap<String,Arc<CachedType>> and clone anyway.
            TryCached::Exists(value.clone())
        } else if let Some(pending) = inner.pending.get(key) {
            TryCached::Pending(pending.subscribe())
        } else {
            let (channel,_) = broadcast::channel(1);
            inner.pending.insert(key.to_string(),channel);
            TryCached::New()
        }
    }
    pub fn put_computed(&self,key: String,value: CachedType) {
        let mut inner = self.inner.lock().unwrap();
        if let Some(chan) = inner.pending.remove(&key) {
            chan.send(value.clone());
        }
        inner.cached.insert(key,value);
    }
}

然后可以将该方法实现为对try_get的调用,该调用根据返回的枚举的值执行不同的操作。

pub async fn segment_handler(cache: &Cache,segment: String) -> CachedType {
    match cache.try_get(&segment) {
        TryCached::Exists(value) => value,TryCached::Pending(mut chan) => chan.recv().await.expect("Sender dropped without sending"),TryCached::New() => {
            let (segment,value) = tokio::task::spawn_blocking(move || {
                // Do some computation to get the result.
                let result = do_some_large_computation(&segment);
                // Cache this result to a file.
                let file_name = &format!("./cache/{}",&segment);
                std::fs::write(file_name,result.to_slice()).expect("Unable to write file");
                (segment,result)
            })
            .await
            .expect("Panic in spawn_blocking");

            cache.put_computed(segment,value.clone());
            value
        }
    }
}

完整的示例可以在the playground上找到。

由于互斥,此方法是完全线程安全的。请注意,这使用同步互斥锁而不是异步互斥锁。要了解更多有关为什么可以这样做的信息,请参阅Tokio教程中的the shared state chapter