上一篇说的主要是同步多进程/线程/. 这篇学学异步编程.
使用第三方库tokio.
仅作为个人学习笔记.
参考资料:https://course.rs/advance-practice/overview.html
这本书里通过模仿redis来学习tokio.这篇笔记更多会记一些细节和rust语法, 不记全貌.因为学一个库,要再开个项目,真搞心态啊
async/await
rust的async/await只是一个关键词, 需要配合具体的异步运行时框架才有效,比如tokio.
示例代码
1 | use tokio::net::TcpListener; |
这里io::Result实现了Future trait,进而可以.await:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
一个Future表示一个异步函数, 有两个状态:pending和ready.
tokio负责管理一个future的状态.
poll()负责检查future的状态, 返回Ready或Pending.
然后为每个连接生成一个任务:
1 | loop { |
一个 Tokio 任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。
绿色线程是由程序自己调度的, 操作系统不清楚其存在. goroutine也是一种绿色线程.
send/sync
Send trait意为可以在线程间安全地移动.
实现了Send的类型可以在线程阻塞或被中断时保存上下文,并在恢复线程的时候恢复上下文.
例如, Arc实现了Send trait, 而Rc没有, 所以当执行以下代码时会报错:
1 | use std::rc::Rc; |
rust给出了提示:
1 | error: future cannot be sent between threads safely |
Tokio的Mutex
引用原文:
Tokio 提供的异步锁只应该在跨多个 .await调用时使用
锁如果在多个 .await 过程中持有,应该使用 Tokio 提供的锁,
原因是 .await的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,
还没使用完就因为 .await 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁
锁竞争不多的情况下,使用 std::sync::Mutex
锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 parking_lot::Mutex
分片锁
可以对HashMap分段, 让锁尽可能变小, 减少竞争.type ShardedDB = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
消息传递
tokio提供了以下通道,适用于异步场景:
- mpsc 多生产者,单消费者
- oneshot 1c1p
- broadcast NcNp. 每条消息会被广播到所有消费者
- watch 1pNc.
oneshot也有.send()和.resp(), 不过send不需要await.
async io
read
1 | let mut f = File::open("foo.txt").await?; |
read_to_file接收一个&mut Vec<u8>
read_frame 返回一个完整的帧,tcpstream为read_frame实现了一个缓冲区, 每次读取一个帧多余的部分会存到缓冲区里.
write
write()接收一个&[u8], 返回写入的字节数.
不能同时读写一个socket, 需要分离为reader和writer:
TcpStream::split需要rd和wr在一个任务中,否则会引发所有权问题.
相应的,into_split没有这个限制,但是因为是用Arc实现的,有点性能开销.
1 | let socket = TcpStream::connect("127.0.0.1:6142").await?; |
Buf和BufMut trait
两种trait, 分别用于读取和写入的缓冲区.
select
同时等待多个计算操作, 当其中一个操作完成时就退出等待.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
// 任何一个 select 分支结束后,都会继续执行接下来的代码
}
写到这里,我意识我了解到的到对tokio的用法应该已经足够用来写异步io处理了,因此本篇暂时结束.