V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
steins2628
V2EX  ›  Rust

两次 move 以后怎样保证 tokio channel 有足够长的生命周期?

  •  
  •   steins2628 · 263 天前 · 1114 次点击
    这是一个创建于 263 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如题,在尝试把 notify 的 debouncer—watcher 改成 async 模式, 主要是为了把 watcher 顺利放结构体里然后内部调用,实验发现不改 async 的话整个程序会死等事件

    提了个问题 Is there some way to make notify debounce watcher async?,照着说法改着改着发现对于 tx channel 两次 move 以后已经没办法保证其生命周期了

    这是不是说明我这样的改法不行,或者有没有别的方法可以来让 debouncer-watcher 变成 async 的模式?

    具体代码及报错如下, 这里 main loop 是为了测试

    use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error};
    use std::{path::Path, time::Duration};
    use chrono::prelude::*;
    use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent};
    use tokio::sync::mpsc::Receiver;
    
    pub struct NotifyHandler {
        pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>,
        pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>>
    }
    
    impl NotifyHandler {    
        pub async fn initialize_notify_scheduler(&mut self) {
            let (tx, rx) = tokio::sync::mpsc::channel(1);
        
            let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
                tokio::spawn(async move {
                    if let Err(e) = tx.send(result).await {
                        println!("Error sending event result: {:?}", e);
                    }
                });
            });
        
            match debouncer {
                Ok(watcher)=> {
                    println!("Initialize notify watcher success");
                    self.notify_watcher = Some(watcher);
    
                    self.receiver = Some(rx);
                },
                Err(error) => {
                    println!("{:?}", error);
                }
            }
        }
    
        
        pub async fn watch(&mut self, path: &str) -> notify::Result<()> {
            let watch_path = Path::new(path);
    
            if watch_path.exists() {
                let is_file = watch_path.is_file();
                println!("Valid path {} is file {}", path, is_file);
            } else {
                println!("watch path {:?} not exists", watch_path);
            }
    
            if let Some(mut watcher) = self.notify_watcher.take() {
                watcher
                    .watcher()
                    .watch(watch_path, RecursiveMode::Recursive)?;         
    
                watcher
                    .cache()
                    .add_root(watch_path, RecursiveMode::Recursive);     
    
                if let Some(mut rx) = self.receiver.take() {
                    tokio::spawn(async move {
                        while let Some(res) = rx.recv().await {
                            match res {
                                Ok(events) => {
                                    println!("events: {:?}", events);
                                },
                                Err(errors) => {
                                    println!("errors: {:?}", errors)
                                }
                            }
                        }                    
                    });  
                }
            }
    
            Ok(())
        }
    }
    
    #[tokio::main]
    async fn main() {
        let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None, receiver: None };
    
        notifier.initialize_notify_scheduler().await;
        notifier.watch("D:\\TEMP\\TestNote.txt").await.unwrap();
    
        loop {
            tokio::time::sleep(Duration::from_secs(3)).await;
    
            let time: DateTime<Local> = Local::now();
    
            println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string());
        }
    }
    
    expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
    required for `[closure@src\main.rs:16:69: 16:103]` to implement `DebounceEventHandler`rustcClick for full compiler diagnostic
    main.rs(16, 69): the requirement to implement `FnMut` derives from here
    main.rs(18, 33): closure is `FnOnce` because it moves the variable `tx` out of its environment
    main.rs(16, 25): required by a bound introduced by this call
    lib.rs(634, 25): required by a bound in `new_debouncer`
    
    8 条回复    2023-08-12 22:09:38 +08:00
    araraloren
        1
    araraloren  
       262 天前
    You should clone the `tx` before pass it to async block.

    ```
    let tx = tx.clone();
    tokio::spawn(async move {
    ...
    ```
    steins2628
        2
    steins2628  
    OP
       262 天前
    @araraloren 感谢回答,倒是能通过编译了,但是 watcher 的 event 完全不会触发

    ```rust
    let sapwn_tx = tx.clone();
    let test_ts = tx.clone();
    self.sender = Som(test_ts);

    tokio::spawn(async move {
    if let Err(e) = sapwn_tx.send(result).await {
    println!("Error sending event result: {:?}", e);
    }
    })

    ...

    loop {
    ...
    if let Some(tx) = notifier.sender.take() {
    tx.send(Err(notify::error(notify::ErrorKind::PathNotfoundError))).await;
    }
    }
    ```

    然后我尝试把代码改成这样子,在 main loop 里手动发 error event 是正常接收的,但 watcher 的 event 是完全没有的,
    我查了下相关问题比如 [Alternative to cloning tokio channel's sender for futures' closures]( https://stackoverflow.com/questions/54552165/alternative-to-cloning-tokio-channels-sender-for-futures-closures), 理论上 sender clone 不会对原对象有什么影响,这是不是能说明 debouncer_watcher 就是不能这么改?
    fakeshadow
        3
    fakeshadow  
       261 天前
    你好,建议朋友附上`Cargo.tmol`以及最小化例子,这样能提高获得答案的几率。我把你的例子缩小了一下,并且给了出一个解决方案。
    ```
    use notify::{RecursiveMode, Watcher};
    use notify_debouncer_full::{new_debouncer, DebounceEventResult};
    use std::{path::Path, time::Duration};

    #[tokio::main]
    async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

    let mut watcher = new_debouncer(
    Duration::from_secs(3),
    None,
    move |result: DebounceEventResult| {
    let _ = tx.send(result);
    },
    )
    .unwrap();

    let path = Path::new("D:\\temp\\TestNote.txt");

    watcher
    .watcher()
    .watch(&path, RecursiveMode::Recursive)
    .unwrap();

    watcher.cache().add_root(&path, RecursiveMode::Recursive);

    while let Some(events) = rx.recv().await.transpose().unwrap() {
    println!("{events:?}");
    }
    }

    ```
    你的问题本质看上去像是同步和异步的协同问题。我们一般可以采用`unbounded_channel`来让同步端无阻塞唤醒异步端,这个和你的代码基本上是等效的,这个方法有一个缺点就是异步端如果消费不及时,可能造成内存溢出。另一种方法是使用`tokio::sync::mpsc::Sender::blocking_send`方法,堵塞你的 debounder 线程来唤醒异步端,这个方法虽不会有内存泄漏的问题但消费端不及时会造成 debouncer 线程的阻塞。
    fakeshadow
        4
    fakeshadow  
       261 天前
    还有就是如果只需要单体观测的话可以使用 tokio::sync::watch::channel.
    araraloren
        5
    araraloren  
       261 天前
    @steins2628 The code you provide has a lot of problems.
    You take the `notify_watcher` from self, the `watcher` will drop end of `if` block.
    The tokio::spawn can not called from handler provide for `new_debouncer`, you should using `Handler::spawn` instead.
    steins2628
        6
    steins2628  
    OP
       261 天前
    @fakeshadow 我分类尝试了十种情况,发现只要是把 wacher 放结构体里,然后初始化和 rx 分开处理,就不会有 events 发出来,感觉和 notify 本体关系可能更大一点
    [Test template for rust notify]( https://gist.github.com/Hellager/6bc77d610ff20932ccec379e20599083)


    @araraloren 感谢回答,handler::spawn 是 std::thread::spawn 吗?我最开始是 std handle block send 的,就像这样,但这样也是不行的,具体过程在 StackOverflow 那个问题里

    ```rust

    fn get_runtime_handle() -> (Handle, Option<Runtime>) {
    match Handle::try_current() {
    Ok(h) => (h, None),
    Err(_) => {
    let rt = Runtime::new().unwrap();
    (rt.handle().clone(), Some(rt))
    }
    }
    }

    ...
    let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
    let (handle, _rt) = get_runtime_handle();
    handle.block_on(async {
    tx.send(result).await.unwrap();
    })
    });
    ...
    ```
    steins2628
        8
    steins2628  
    OP
       259 天前
    @araraloren Great thanks for your help, now it works!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1419 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 17:11 · PVG 01:11 · LAX 10:11 · JFK 13:11
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.