Skip to content

Tokio 学习文档

1. 什么是 Tokio

Tokio 是 Rust 生态中最流行的异步运行时,提供:

  • 异步 I/O:基于 epoll/kqueue/IOCP 的高性能非阻塞 I/O
  • 任务调度器:多线程工作窃取调度器
  • 异步原语:通道、互斥锁、信号量等
  • 定时器:高精度时间驱动功能
应用代码
   │
   ▼
Tokio Runtime
   ├── 任务调度器(多线程)
   ├── I/O 事件驱动(epoll/kqueue)
   └── 定时器轮

2. 环境准备

Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }

# 常用搭配库
tokio-util = "0.7"
futures = "0.3"

常用 feature flags:

Feature 说明
rt 基础运行时
rt-multi-thread 多线程运行时
macros #[tokio::main]
net TCP/UDP/Unix socket
time 定时器功能
sync 同步原语
fs 异步文件系统
io-util 异步 I/O 工具
full 以上全部

最小示例

#[tokio::main]
async fn main() {
    println!("Hello, Tokio!");
}

#[tokio::main] 展开后等价于:

fn main() {
    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async {
            println!("Hello, Tokio!");
        });
}

3. 异步编程基础

async / await

use tokio::time::{sleep, Duration};

async fn fetch_data() -> String {
    sleep(Duration::from_millis(100)).await;
    "data".to_string()
}

#[tokio::main]
async fn main() {
    let result = fetch_data().await;
    println!("{}", result);
}

核心概念:

  • async fn 返回一个实现了 Future trait 的类型
  • .await 暂停当前任务,让出执行权,等待 Future 完成
  • Future 是惰性的:不 .await 就不会执行

并发执行:tokio::join!

use tokio::time::{sleep, Duration};

async fn task_a() -> &'static str {
    sleep(Duration::from_millis(200)).await;
    "A"
}

async fn task_b() -> &'static str {
    sleep(Duration::from_millis(100)).await;
    "B"
}

#[tokio::main]
async fn main() {
    // 并发执行,总耗时约 200ms(不是 300ms)
    let (a, b) = tokio::join!(task_a(), task_b());
    println!("{} {}", a, b); // A B
}

竞速执行:tokio::select!

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_millis(100)) => {
            println!("100ms 定时器先触发");
        }
        _ = sleep(Duration::from_millis(200)) => {
            println!("200ms 定时器先触发");
        }
    }
}

select! 只执行第一个完成的分支,其余分支被取消。


4. 任务(Tasks)

spawn:创建独立任务

use tokio::task;

#[tokio::main]
async fn main() {
    let handle = task::spawn(async {
        println!("在独立任务中运行");
        42
    });

    let result = handle.await.unwrap();
    println!("任务返回: {}", result);
}

任务与线程的区别

维度 线程 Tokio 任务
创建开销 高(MB 级栈) 低(KB 级)
切换方式 抢占式(OS 调度) 协作式(await 点)
并发量 数千 数百万
阻塞影响 只阻塞当前线程 阻塞整个线程

在任务中共享数据

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0u32));

    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(tokio::spawn(async move {
            let mut lock = counter.lock().await;
            *lock += 1;
        }));
    }

    for h in handles {
        h.await.unwrap();
    }

    println!("最终计数: {}", *counter.lock().await); // 10
}

spawn_blocking:运行阻塞代码

use tokio::task;

#[tokio::main]
async fn main() {
    // 将阻塞操作放到专用线程池,避免阻塞异步运行时
    let result = task::spawn_blocking(|| {
        // 可以放耗时的 CPU 计算或同步 I/O
        std::fs::read_to_string("/etc/hosts").unwrap()
    })
    .await
    .unwrap();

    println!("文件内容长度: {}", result.len());
}

5. 通道(Channels)

mpsc:多生产者单消费者

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<i32>(32); // 缓冲区大小 32

    // 克隆 sender 实现多生产者
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send(1).await.unwrap();
        tx.send(2).await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send(3).await.unwrap();
    });

    drop(tx2); // 显式 drop(此处 tx2 已 move,仅示意)

    // 当所有 sender drop 后,recv() 返回 None
    while let Some(val) = rx.recv().await {
        println!("收到: {}", val);
    }
}

oneshot:一次性通道

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        let response = "任务完成".to_string();
        tx.send(response).unwrap();
    });

    match rx.await {
        Ok(msg) => println!("收到: {}", msg),
        Err(_) => println!("发送端已关闭"),
    }
}

broadcast:广播通道

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<String>(16);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tx.send("广播消息".to_string()).unwrap();

    println!("rx1: {}", rx1.recv().await.unwrap());
    println!("rx2: {}", rx2.recv().await.unwrap());
}

watch:状态监听通道

use tokio::sync::watch;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("初始状态");

    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        tx.send("更新状态").unwrap();
    });

    // 等待值变化
    rx.changed().await.unwrap();
    println!("状态变为: {}", *rx.borrow());
}

6. I/O 操作

TCP 客户端

use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;

    stream.write_all(b"Hello, Server!").await?;

    let mut buf = vec![0u8; 1024];
    let n = stream.read(&mut buf).await?;
    println!("收到: {}", String::from_utf8_lossy(&buf[..n]));

    Ok(())
}

异步文件操作

use tokio::fs;
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 写文件
    let mut file = fs::File::create("hello.txt").await?;
    file.write_all(b"Hello, File!").await?;

    // 读文件
    let content = fs::read_to_string("hello.txt").await?;
    println!("{}", content);

    // 删除文件
    fs::remove_file("hello.txt").await?;

    Ok(())
}

使用 BufReader 提升性能

use tokio::io::{BufReader, AsyncBufReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("large_file.txt").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();

    while let Some(line) = lines.next_line().await? {
        println!("{}", line);
    }

    Ok(())
}

7. 定时器

use tokio::time::{sleep, timeout, interval, Duration, Instant};

#[tokio::main]
async fn main() {
    // 延迟
    sleep(Duration::from_secs(1)).await;

    // 超时控制
    let result = timeout(Duration::from_millis(100), async {
        sleep(Duration::from_millis(200)).await;
        "完成"
    })
    .await;

    match result {
        Ok(val) => println!("成功: {}", val),
        Err(_) => println!("超时!"),
    }

    // 定期执行(类似 ticker)
    let mut ticker = interval(Duration::from_millis(500));
    for i in 0..3 {
        ticker.tick().await;
        println!("第 {} 次 tick,时间: {:?}", i, Instant::now());
    }
}

8. 同步原语

Mutex(异步互斥锁)

use tokio::sync::Mutex;
use std::sync::Arc;

// 注意:持有锁时不要 .await 其他 Future(可能导致死锁)
// 如需跨 await 持有锁,使用 tokio::sync::Mutex(而非 std::sync::Mutex)
let mutex = Arc::new(Mutex::new(vec![]));
let m = Arc::clone(&mutex);

tokio::spawn(async move {
    let mut lock = m.lock().await;
    lock.push(1);
    // lock 在此 drop
});

RwLock(读写锁)

use tokio::sync::RwLock;

let lock = RwLock::new(5u32);

// 多个读锁可以同时持有
let r1 = lock.read().await;
let r2 = lock.read().await;
println!("r1={} r2={}", *r1, *r2);
drop((r1, r2));

// 写锁是独占的
let mut w = lock.write().await;
*w += 1;

Semaphore(信号量)

use tokio::sync::Semaphore;
use std::sync::Arc;

// 限制并发请求数量
let sem = Arc::new(Semaphore::new(3)); // 最多 3 个并发

let mut handles = vec![];
for i in 0..10 {
    let sem = Arc::clone(&sem);
    handles.push(tokio::spawn(async move {
        let _permit = sem.acquire().await.unwrap();
        println!("任务 {} 正在执行", i);
        tokio::time::sleep(Duration::from_millis(100)).await;
        // _permit drop 后,释放信号量
    }));
}

Notify(通知)

use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let n = Arc::clone(&notify);

tokio::spawn(async move {
    tokio::time::sleep(Duration::from_millis(100)).await;
    n.notify_one();
});

notify.notified().await;
println!("收到通知!");

9. 错误处理

使用 anyhow

[dependencies]
anyhow = "1"
use anyhow::{Context, Result};

async fn read_config(path: &str) -> Result<String> {
    tokio::fs::read_to_string(path)
        .await
        .with_context(|| format!("读取配置文件失败: {}", path))
}

#[tokio::main]
async fn main() -> Result<()> {
    let config = read_config("config.toml").await?;
    println!("{}", config);
    Ok(())
}

JoinHandle 错误处理

use tokio::task::JoinError;

let handle = tokio::spawn(async {
    panic!("任务崩溃了!");
});

match handle.await {
    Ok(result) => println!("任务成功: {:?}", result),
    Err(e) if e.is_panic() => println!("任务 panic: {:?}", e),
    Err(e) if e.is_cancelled() => println!("任务被取消"),
    Err(e) => println!("其他错误: {:?}", e),
}

10. 实战:构建 TCP 服务器

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle_client(mut stream: TcpStream) {
    let peer = stream.peer_addr().unwrap();
    println!("新连接: {}", peer);

    let mut buf = vec![0u8; 4096];

    loop {
        match stream.read(&mut buf).await {
            Ok(0) => {
                println!("连接断开: {}", peer);
                return;
            }
            Ok(n) => {
                // 回显服务
                if stream.write_all(&buf[..n]).await.is_err() {
                    return;
                }
            }
            Err(e) => {
                eprintln!("读取错误 {}: {}", peer, e);
                return;
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器监听 127.0.0.1:8080");

    loop {
        let (stream, _) = listener.accept().await?;
        // 每个连接一个任务
        tokio::spawn(handle_client(stream));
    }
}

11. 常见陷阱

陷阱 1:在异步代码中使用阻塞操作

// 错误:会阻塞整个线程
async fn bad() {
    std::thread::sleep(std::time::Duration::from_secs(1)); // 阻塞!
    std::fs::read_to_string("file.txt").unwrap();          // 阻塞!
}

// 正确:使用异步版本
async fn good() {
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    tokio::fs::read_to_string("file.txt").await.unwrap();
}

// 正确:用 spawn_blocking 包装无法避免的阻塞操作
async fn also_good() {
    tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("file.txt").unwrap()
    })
    .await
    .unwrap();
}

陷阱 2:跨 await 持有 std::sync::MutexGuard

use std::sync::Mutex;

// 错误:std::sync::MutexGuard 跨 await 不是 Send
async fn bad(mutex: &Mutex<u32>) {
    let mut guard = mutex.lock().unwrap();
    some_async_fn().await; // 编译错误:MutexGuard 不是 Send
    *guard += 1;
}

// 正确:使用 tokio::sync::Mutex
use tokio::sync::Mutex;

async fn good(mutex: &Mutex<u32>) {
    let mut guard = mutex.lock().await;
    some_async_fn().await; // OK
    *guard += 1;
}

// 或者:在 await 前 drop
async fn also_good(mutex: &std::sync::Mutex<u32>) {
    {
        let mut guard = mutex.lock().unwrap();
        *guard += 1;
    } // guard 在这里 drop
    some_async_fn().await;
}

陷阱 3:select! 的公平性

// select! 随机选择就绪的分支,不是按顺序
// 如果某个分支持续就绪,其他分支可能饥饿
// 使用 tokio::select! 的 biased 修饰符强制顺序检查
tokio::select! {
    biased;                    // 按声明顺序检查
    _ = high_priority() => {} // 优先检查
    _ = low_priority() => {}
}

陷阱 4:任务泄露

// tokio::spawn 返回的 JoinHandle 如果被 drop,任务会继续运行(detached)
// 如果需要在函数退出时取消任务,使用 JoinHandle 并在 Drop 时 abort

struct TaskGuard(tokio::task::JoinHandle<()>);

impl Drop for TaskGuard {
    fn drop(&mut self) {
        self.0.abort(); // 函数退出时自动取消任务
    }
}

快速参考

需求 API
并发运行多个 Future tokio::join!
竞速,取第一个完成 tokio::select!
创建独立任务 tokio::spawn
运行阻塞代码 tokio::task::spawn_blocking
延迟 tokio::time::sleep
超时 tokio::time::timeout
定期执行 tokio::time::interval
多生产者单消费者 tokio::sync::mpsc
一次性消息 tokio::sync::oneshot
广播 tokio::sync::broadcast
状态订阅 tokio::sync::watch
异步互斥锁 tokio::sync::Mutex
限制并发 tokio::sync::Semaphore