上一篇说的主要是同步多进程/线程/. 这篇学学异步编程.
使用第三方库tokio.
仅作为个人学习笔记.
参考资料:https://course.rs/advance-practice/overview.html

这本书里通过模仿redis来学习tokio.这篇笔记更多会记一些细节和rust语法, 不记全貌.
因为学一个库,要再开个项目,真搞心态啊

async/await

rust的async/await只是一个关键词, 需要配合具体的异步运行时框架才有效,比如tokio.
示例代码

1
2
3
4
5
6
7
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379") // io::Result<TcpListener>
.await // a Future
.unwrap();
}

这里io::Result实现了Future trait,进而可以.await:
pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

}

一个Future表示一个异步函数, 有两个状态:pending和ready.
tokio负责管理一个future的状态.
poll()负责检查future的状态, 返回Ready或Pending.

然后为每个连接生成一个任务:

1
2
3
4
5
6
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move { // socket转移所有权
process(socket).await;
});
}

一个 Tokio 任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。
绿色线程是由程序自己调度的, 操作系统不清楚其存在. goroutine也是一种绿色线程.

send/sync

Send trait意为可以在线程间安全地移动.
实现了Send的类型可以在线程阻塞或被中断时保存上下文,并在恢复线程的时候恢复上下文.
例如, Arc实现了Send trait, 而Rc没有, 所以当执行以下代码时会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::rc::Rc;
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
let v = Rc::new(String::from("asd"));
loop {
let (socket, _) = listener.accept().await.unwrap();
let s = Rc::clone(&v);
tokio::spawn(async move {
process(socket).await;
println!("{}", s);
});
}
}

rust给出了提示:

1
2
3
4
5
6
7
8
9
10
11
error: future cannot be sent between threads safely
--> src/main.rs:12:22
|
12 | tokio::spawn(async move {
| ______________________^
13 | | process(socket).await;
14 | | println!("{}", s);
15 | | });
| |_________^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:12:22: 15:10}`, the trait `Send` is not implemented for `Rc<String>`, which is required by `{async block@src/main.rs:12:22: 15:10}: Send`

Tokio的Mutex

引用原文:

Tokio 提供的异步锁只应该在跨多个 .await调用时使用

锁如果在多个 .await 过程中持有,应该使用 Tokio 提供的锁,
原因是 .await的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,
还没使用完就因为 .await 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁
锁竞争不多的情况下,使用 std::sync::Mutex
锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 parking_lot::Mutex

分片锁

可以对HashMap分段, 让锁尽可能变小, 减少竞争.
type ShardedDB = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

消息传递

tokio提供了以下通道,适用于异步场景:

  • mpsc 多生产者,单消费者
  • oneshot 1c1p
  • broadcast NcNp. 每条消息会被广播到所有消费者
  • watch 1pNc.

oneshot也有.send()和.resp(), 不过send不需要await.

async io

read
1
2
3
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
let n = f.read(&mut buffer[..]).await?;

read_to_file接收一个&mut Vec<u8>

read_frame 返回一个完整的帧,tcpstream为read_frame实现了一个缓冲区, 每次读取一个帧多余的部分会存到缓冲区里.

write

write()接收一个&[u8], 返回写入的字节数.

不能同时读写一个socket, 需要分离为reader和writer:
TcpStream::split需要rd和wr在一个任务中,否则会引发所有权问题.
相应的,into_split没有这个限制,但是因为是用Arc实现的,有点性能开销.

1
2
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
Buf和BufMut trait

两种trait, 分别用于读取和写入的缓冲区.

select

同时等待多个计算操作, 当其中一个操作完成时就退出等待.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {
let _ = tx1.send("one");
});

tokio::spawn(async {
let _ = tx2.send("two");
});

tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}

// 任何一个 select 分支结束后,都会继续执行接下来的代码
}

写到这里,我意识我了解到的到对tokio的用法应该已经足够用来写异步io处理了,因此本篇暂时结束.