Skip to content

Redis

Redis 是一个开源的内存数据结构存储,可用作数据库、缓存和消息代理。Rust 生态通过 redis crate 提供异步/同步客户端,配合 deadpool-redis 实现生产级连接池管理。

核心特点:

特点 说明
内存存储 所有数据存储在内存中,读写速度极快(微秒级)
丰富数据结构 String、List、Hash、Set、ZSet、Stream、Bitmap 等
持久化 RDB 快照 + AOF 日志,支持数据恢复
发布/订阅 内置 Pub/Sub 消息模式
原子操作 单命令原子性,支持 Lua 脚本和事务
集群支持 主从复制、哨兵、Cluster 模式

安装与项目初始化

# Cargo.toml
[dependencies]
redis           = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
deadpool-redis  = "0.18"
tokio           = { version = "1", features = ["full"] }
serde           = { version = "1", features = ["derive"] }
serde_json      = "1"
anyhow          = "1"

快速连接

use redis::AsyncCommands;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let client = redis::Client::open("redis://127.0.0.1:6379")?;
    let mut conn = client.get_multiplexed_async_connection().await?;

    conn.set("hello", "world").await?;
    let val: String = conn.get("hello").await?;
    println!("{val}"); // world

    Ok(())
}

连接池

生产环境使用 deadpool-redis 管理连接池,避免频繁建立/销毁连接。

use deadpool_redis::{Config, Pool, Runtime};

pub fn create_pool(url: &str) -> Pool {
    let cfg = Config::from_url(url);
    cfg.create_pool(Some(Runtime::Tokio1)).expect("创建 Redis 连接池失败")
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let pool = create_pool("redis://127.0.0.1:6379");
    let mut conn = pool.get().await?;

    redis::cmd("SET")
        .arg("key")
        .arg("value")
        .query_async::<()>(&mut *conn)
        .await?;

    Ok(())
}

带密码和数据库编号

// redis://:password@host:port/db
let pool = create_pool("redis://:mypassword@127.0.0.1:6379/1");

字符串(String)

最基础的数据类型,可存储文本、数字或二进制数据(上限 512 MB)。

use redis::AsyncCommands;

async fn string_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // SET / GET
    conn.set("name", "Alice").await?;
    let name: String = conn.get("name").await?;

    // 设置过期时间(秒)
    conn.set_ex("token", "abc123", 3600).await?;

    // 仅当键不存在时设置(分布式锁常用)
    let ok: bool = conn.set_nx("lock", "1").await?;

    // 自增 / 自减
    conn.set("counter", 10).await?;
    let n: i64 = conn.incr("counter", 1).await?;  // 11
    let n: i64 = conn.decr("counter", 3).await?;  // 8

    // 批量操作
    conn.mset(&[("k1", "v1"), ("k2", "v2")]).await?;
    let vals: Vec<String> = conn.mget(&["k1", "k2"]).await?;

    // 获取并更新 TTL
    let ttl: i64 = conn.ttl("token").await?;      // 剩余秒数,-1 永不过期,-2 不存在
    conn.expire("name", 60).await?;

    Ok(())
}

哈希(Hash)

适合存储对象字段,类似关系型数据库中的一行记录。

use redis::AsyncCommands;

async fn hash_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // HSET 单字段 / 多字段
    conn.hset("user:1", "name", "Alice").await?;
    conn.hset_multiple("user:1", &[
        ("name", "Alice"),
        ("age", "30"),
        ("email", "alice@example.com"),
    ]).await?;

    // HGET / HMGET
    let name: String = conn.hget("user:1", "name").await?;
    let fields: Vec<String> = conn.hmget("user:1", &["name", "age"]).await?;

    // HGETALL → HashMap
    use std::collections::HashMap;
    let user: HashMap<String, String> = conn.hgetall("user:1").await?;

    // HDEL / HEXISTS
    conn.hdel("user:1", "email").await?;
    let exists: bool = conn.hexists("user:1", "age").await?;

    // HINCRBY
    conn.hincr("user:1", "login_count", 1).await?;

    Ok(())
}

序列化结构体到 Hash

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct User {
    id: u64,
    name: String,
    email: String,
}

async fn save_user(conn: &mut impl AsyncCommands, user: &User) -> anyhow::Result<()> {
    let key = format!("user:{}", user.id);
    let json = serde_json::to_string(user)?;
    conn.set_ex(key, json, 86400).await?;
    Ok(())
}

async fn load_user(conn: &mut impl AsyncCommands, id: u64) -> anyhow::Result<Option<User>> {
    let key = format!("user:{id}");
    let json: Option<String> = conn.get(key).await?;
    Ok(json.as_deref().map(serde_json::from_str).transpose()?)
}

列表(List)

有序双向链表,支持从两端插入/弹出,适合消息队列、最新记录等场景。

use redis::AsyncCommands;

async fn list_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // LPUSH / RPUSH(返回列表长度)
    let len: i64 = conn.lpush("queue", "task1").await?;
    conn.rpush("queue", &["task2", "task3"]).await?;

    // LPOP / RPOP
    let task: Option<String> = conn.lpop("queue", None).await?;

    // 阻塞弹出(超时秒数,0 = 永久阻塞)
    let item: Option<(String, String)> = conn.blpop(&["queue"], 5.0).await?;

    // LRANGE(含两端,-1 表示末尾)
    let items: Vec<String> = conn.lrange("queue", 0, -1).await?;

    // LLEN / LINDEX
    let len: i64 = conn.llen("queue").await?;
    let first: String = conn.lindex("queue", 0).await?;

    // 保留最近 N 条(滑动窗口)
    conn.ltrim("recent_logs", 0, 99).await?;

    Ok(())
}

集合(Set)

无序不重复元素集合,支持交集、并集、差集运算。

use redis::AsyncCommands;

async fn set_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // SADD / SREM
    conn.sadd("tags:post:1", &["rust", "redis", "backend"]).await?;
    conn.srem("tags:post:1", "backend").await?;

    // SMEMBERS / SISMEMBER
    let tags: Vec<String> = conn.smembers("tags:post:1").await?;
    let exists: bool = conn.sismember("tags:post:1", "rust").await?;

    // SCARD(元素数量)
    let count: i64 = conn.scard("tags:post:1").await?;

    // 集合运算
    conn.sadd("tags:post:2", &["rust", "axum"]).await?;
    let inter: Vec<String> = conn.sinter(&["tags:post:1", "tags:post:2"]).await?; // 交集
    let union: Vec<String> = conn.sunion(&["tags:post:1", "tags:post:2"]).await?; // 并集
    let diff:  Vec<String> = conn.sdiff(&["tags:post:1", "tags:post:2"]).await?;  // 差集

    Ok(())
}

有序集合(ZSet / Sorted Set)

每个元素关联一个浮点分数,按分数自动排序,适合排行榜、优先级队列。

use redis::AsyncCommands;

async fn zset_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // ZADD(分数, 成员)
    conn.zadd("leaderboard", "Alice", 1500.0).await?;
    conn.zadd_multiple("leaderboard", &[
        (1200.0, "Bob"),
        (1800.0, "Charlie"),
        (900.0,  "Dave"),
    ]).await?;

    // ZRANGE(按分数升序,withscores 返回分数)
    let top: Vec<(String, f64)> = conn.zrange_withscores("leaderboard", 0, -1).await?;

    // ZREVRANGE(降序,取前 N 名)
    let top3: Vec<String> = conn.zrevrange("leaderboard", 0, 2).await?;

    // ZSCORE / ZRANK
    let score: f64 = conn.zscore("leaderboard", "Alice").await?;
    let rank: i64  = conn.zrevrank("leaderboard", "Alice").await?; // 从0开始的降序排名

    // ZINCRBY(增加分数)
    conn.zincr("leaderboard", "Bob", 100.0).await?;

    // 按分数范围查询
    let mid: Vec<String> = conn.zrangebyscore("leaderboard", 1000.0, 1600.0).await?;

    // ZCARD(成员总数)
    let total: i64 = conn.zcard("leaderboard").await?;

    Ok(())
}

过期与键管理

use redis::AsyncCommands;

async fn key_ops(conn: &mut impl AsyncCommands) -> anyhow::Result<()> {
    // 检查键是否存在
    let exists: bool = conn.exists("mykey").await?;

    // 设置/查看/删除过期时间
    conn.expire("mykey", 60).await?;
    let ttl: i64 = conn.ttl("mykey").await?;
    conn.persist("mykey").await?; // 移除过期时间

    // 删除键
    conn.del("mykey").await?;
    conn.del(&["k1", "k2", "k3"]).await?;

    // 重命名
    conn.rename("old_key", "new_key").await?;

    // 扫描匹配键(生产环境禁用 KEYS,用 SCAN)
    let mut cursor = 0u64;
    loop {
        let (next, keys): (u64, Vec<String>) = redis::cmd("SCAN")
            .arg(cursor)
            .arg("MATCH")
            .arg("user:*")
            .arg("COUNT")
            .arg(100)
            .query_async(conn).await?;
        for key in keys {
            println!("{key}");
        }
        cursor = next;
        if cursor == 0 { break; }
    }

    Ok(())
}

事务(Multi/Exec)

Redis 事务将多个命令打包为一个原子操作,要么全部执行,要么全部不执行(注意:不支持回滚)。

use redis::AsyncCommands;

async fn transaction_example(conn: &mut redis::aio::MultiplexedConnection) -> anyhow::Result<()> {
    // WATCH + MULTI/EXEC 实现乐观锁
    loop {
        redis::cmd("WATCH").arg("balance").query_async::<()>(conn).await?;

        let balance: i64 = conn.get("balance").await.unwrap_or(0);
        if balance < 100 {
            redis::cmd("UNWATCH").query_async::<()>(conn).await?;
            return Err(anyhow::anyhow!("余额不足"));
        }

        let result: Option<((),)> = redis::pipe()
            .atomic()
            .cmd("DECRBY").arg("balance").arg(100).ignore()
            .query_async(conn)
            .await?;

        if result.is_some() {
            break; // 事务成功
        }
        // result 为 None 说明 WATCH 的键被修改,重试
    }
    Ok(())
}

Pipeline(管道)

批量发送多条命令,减少网络往返次数。

use redis::AsyncCommands;

async fn pipeline_example(conn: &mut redis::aio::MultiplexedConnection) -> anyhow::Result<()> {
    let (v1, v2, v3): (String, i64, bool) = redis::pipe()
        .cmd("SET").arg("p1").arg("hello").ignore()
        .cmd("INCR").arg("counter")
        .cmd("EXISTS").arg("p1")
        .query_async(conn)
        .await?;

    println!("counter={v2}, exists={v3}");
    Ok(())
}

Lua 脚本

Lua 脚本在 Redis 服务端原子执行,适合需要读-改-写的复合操作。

use redis::Script;

async fn lua_example(conn: &mut redis::aio::MultiplexedConnection) -> anyhow::Result<()> {
    // 原子性地检查并消费限流令牌
    let script = Script::new(r#"
        local current = redis.call('GET', KEYS[1])
        if current and tonumber(current) >= tonumber(ARGV[1]) then
            return redis.call('DECRBY', KEYS[1], 1)
        end
        return -1
    "#);

    let remaining: i64 = script
        .key("rate:user:123")
        .arg(1)
        .invoke_async(conn)
        .await?;

    if remaining >= 0 {
        println!("剩余配额: {remaining}");
    } else {
        println!("配额不足");
    }

    Ok(())
}

发布/订阅(Pub/Sub)

use redis::AsyncCommands;

// 发布者
async fn publish(conn: &mut impl AsyncCommands, channel: &str, msg: &str) -> anyhow::Result<()> {
    let receivers: i64 = conn.publish(channel, msg).await?;
    println!("消息发送给 {receivers} 个订阅者");
    Ok(())
}

// 订阅者
async fn subscribe_loop(client: &redis::Client) -> anyhow::Result<()> {
    let conn = client.get_async_pubsub().await?;
    let mut pubsub = conn;
    pubsub.subscribe("notifications").await?;

    use futures_util::StreamExt;
    let mut stream = pubsub.on_message();
    while let Some(msg) = stream.next().await {
        let payload: String = msg.get_payload()?;
        println!("收到消息: {payload}");
    }
    Ok(())
}

常用场景

缓存(Cache-Aside 模式)

use deadpool_redis::Pool;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Article {
    id: u64,
    title: String,
    content: String,
}

async fn get_article(pool: &Pool, id: u64) -> anyhow::Result<Article> {
    let mut conn = pool.get().await?;
    let key = format!("article:{id}");

    // 先查缓存
    if let Ok(json) = redis::cmd("GET").arg(&key).query_async::<String>(&mut *conn).await {
        return Ok(serde_json::from_str(&json)?);
    }

    // 缓存未命中,查数据库(示意)
    let article = Article {
        id,
        title: format!("文章 {id}"),
        content: "内容...".to_string(),
    };

    // 写入缓存,TTL 10 分钟
    let json = serde_json::to_string(&article)?;
    redis::cmd("SET").arg(&key).arg(&json).arg("EX").arg(600)
        .query_async::<()>(&mut *conn).await?;

    Ok(article)
}

分布式限流(滑动窗口)

use redis::Script;

/// 返回 true 表示允许请求,false 表示超出限制
async fn rate_limit(
    conn: &mut redis::aio::MultiplexedConnection,
    key: &str,
    limit: u64,
    window_secs: u64,
) -> anyhow::Result<bool> {
    let script = Script::new(r#"
        local key    = KEYS[1]
        local limit  = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local now    = tonumber(ARGV[3])

        redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
        local count = redis.call('ZCARD', key)
        if count < limit then
            redis.call('ZADD', key, now, now)
            redis.call('EXPIRE', key, window)
            return 1
        end
        return 0
    "#);

    use std::time::{SystemTime, UNIX_EPOCH};
    let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;

    let allowed: i64 = script
        .key(key)
        .arg(limit)
        .arg(window_secs)
        .arg(now)
        .invoke_async(conn)
        .await?;

    Ok(allowed == 1)
}

分布式锁

use redis::AsyncCommands;
use std::time::Duration;
use uuid::Uuid;

pub struct RedisLock {
    key: String,
    token: String,
}

impl RedisLock {
    /// 尝试获取锁,ttl_secs 为自动释放时间
    pub async fn acquire(
        conn: &mut impl AsyncCommands,
        key: &str,
        ttl_secs: u64,
    ) -> anyhow::Result<Option<Self>> {
        let token = Uuid::new_v4().to_string();
        let ok: bool = conn.set_options(
            key,
            &token,
            redis::SetOptions::default()
                .conditional_set(redis::ExistenceCheck::NX)
                .with_expiration(redis::SetExpiry::EX(ttl_secs)),
        ).await?;

        if ok {
            Ok(Some(Self { key: key.to_string(), token }))
        } else {
            Ok(None)
        }
    }

    /// 释放锁(Lua 脚本保证原子性:只释放自己持有的锁)
    pub async fn release(&self, conn: &mut redis::aio::MultiplexedConnection) -> anyhow::Result<()> {
        let script = redis::Script::new(r#"
            if redis.call('GET', KEYS[1]) == ARGV[1] then
                return redis.call('DEL', KEYS[1])
            end
            return 0
        "#);
        script.key(&self.key).arg(&self.token).invoke_async::<i64>(conn).await?;
        Ok(())
    }
}

与 Axum 集成

# Cargo.toml
[dependencies]
axum           = "0.7"
tokio          = { version = "1", features = ["full"] }
deadpool-redis = "0.18"
redis          = { version = "0.27", features = ["tokio-comp"] }
serde          = { version = "1", features = ["derive"] }
serde_json     = "1"
use axum::{extract::State, routing::get, Json, Router};
use deadpool_redis::{Config, Pool, Runtime};
use redis::AsyncCommands;
use serde_json::{json, Value};

#[derive(Clone)]
struct AppState {
    redis: Pool,
}

async fn get_counter(State(state): State<AppState>) -> Json<Value> {
    let mut conn = state.redis.get().await.unwrap();
    let count: i64 = conn.incr("visit_count", 1).await.unwrap_or(0);
    Json(json!({ "count": count }))
}

#[tokio::main]
async fn main() {
    let cfg = Config::from_url("redis://127.0.0.1:6379");
    let pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap();

    let state = AppState { redis: pool };
    let app = Router::new()
        .route("/counter", get(get_counter))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

常用命令速查

场景 命令 Rust 方法
字符串读写 SET / GET conn.set() / conn.get()
带过期时间 SET key val EX 60 conn.set_ex()
不存在则设置 SET key val NX conn.set_nx()
自增计数器 INCR / INCRBY conn.incr()
哈希字段 HSET / HGET / HGETALL conn.hset() / conn.hget() / conn.hgetall()
列表推入/弹出 LPUSH / RPOP conn.lpush() / conn.rpop()
阻塞弹出 BLPOP conn.blpop()
集合操作 SADD / SMEMBERS / SINTER conn.sadd() / conn.smembers() / conn.sinter()
有序集合 ZADD / ZREVRANGE / ZSCORE conn.zadd() / conn.zrevrange() / conn.zscore()
发布消息 PUBLISH conn.publish()
删除键 DEL conn.del()
检查存在 EXISTS conn.exists()
剩余过期 TTL conn.ttl()
安全扫描键 SCAN redis::cmd("SCAN")