Rust + Axum 后端架构设计文档

文章目录[x]
  1. 1:目录
  2. 2:1. 技术选型概览
  3. 2.1:整体架构图
  4. 2.2:设计原则
  5. 3:2. 核心依赖库详解
  6. 3.1:2.1 Web 框架层
  7. 3.2:2.2 数据库层
  8. 3.3:2.3 序列化与验证
  9. 3.4:2.4 认证与安全
  10. 3.5:2.5 配置与环境
  11. 3.6:2.6 日志与追踪
  12. 3.7:2.7 类型与工具
  13. 3.8:2.8 异步任务
  14. 4:3. 项目目录结构
  15. 5:4. 分层架构设计
  16. 5.1:4.1 各层职责
  17. 5.2:4.2 AppState 设计
  18. 6:5. 数据库与 ORM
  19. 6.1:5.1 sqlx 使用模式
  20. 6.2:5.2 数据库迁移
  21. 7:6. 错误处理体系
  22. 7.1:6.1 错误类型定义
  23. 8:7. 认证与授权
  24. 8.1:7.1 JWT 结构
  25. 8.2:7.2 RBAC 权限控制
  26. 9:8. 中间件设计
  27. 9.1:8.1 中间件注册顺序
  28. 9.2:8.2 限流中间件
  29. 10:9. 配置管理
  30. 10.1:9.1 配置结构体
  31. 10.2:9.2 配置加载
  32. 11:10. 日志与可观测性
  33. 11.1:10.1 初始化链路追踪
  34. 11.2:10.2 在 Handler 中使用
  35. 12:11. 异步任务与队列
  36. 12.1:11.1 定时任务
  37. 12.2:11.2 后台任务(Tokio Spawn)
  38. 13:12. API 响应规范
  39. 13.1:12.1 统一响应结构
  40. 14:13. Cargo.toml 完整依赖
  41. 15:14. 启动流程与代码示例
  42. 15.1:14.1 main.rs
  43. 15.2:14.2 路由注册示例
  44. 16:附录:常用命令速查
  45. 17:15. Docker 容器化
  46. 17.1:15.1 Dockerfile(多阶段构建)
  47. 17.2:15.2 .dockerignore
  48. 17.3:15.3 docker-compose.yml(本地开发环境)
  49. 17.4:15.4 docker-compose.override.yml(热重载开发模式)
  50. 17.5:15.5 构建与运行命令
  51. 17.6:15.6 生产环境优化:全静态二进制(musl)
  52. 18:16. Kubernetes 部署
  53. 18.1:目录结构
  54. 18.2:16.1 Namespace
  55. 18.3:16.2 ConfigMap(非敏感配置)
  56. 18.4:16.3 Secret(敏感配置)
  57. 18.5:16.4 Deployment
  58. 18.6:16.5 Service
  59. 18.7:16.6 Ingress(以 Nginx Ingress Controller 为例)
  60. 18.8:16.7 HorizontalPodAutoscaler(自动扩缩容)
  61. 18.9:16.8 PodDisruptionBudget(节点维护保护)
  62. 18.10:16.9 健康检查 Handler
  63. 18.11:16.10 常用 kubectl 命令
  64. 19:17. CI/CD 流程
  65. 19.1:17.1 流程总览
  66. 19.2:17.2 CI 工作流
  67. 19.3:17.3 CD 工作流
  68. 19.4:17.4 分支策略与 Git Flow
  69. 19.5:17.5 Secrets 配置清单
  70. 20:18. 测试策略
  71. 20.1:18.1 测试分层
  72. 20.2:18.2 单元测试
  73. 20.3:18.3 集成测试工具函数
  74. 20.4:18.4 API 集成测试示例
  75. 20.5:18.5 Mock 外部 HTTP 服务
  76. 20.6:18.6 测试覆盖率
  77. 21:19. API 文档自动生成
  78. 21.1:19.1 新增依赖
  79. 21.2:19.2 为 DTO 添加 Schema 注解
  80. 21.3:19.3 为 Handler 添加路径注解
  81. 21.4:19.4 组装 OpenAPI 文档并挂载 Swagger UI
  82. 22:20. Prometheus Metrics 监控
  83. 22.1:20.1 新增依赖
  84. 22.2:20.2 内置 HTTP 指标
  85. 22.3:20.3 自定义业务指标
  86. 22.4:20.4 Prometheus 抓取配置
  87. 22.5:20.5 推荐 Grafana Dashboard 面板
  88. 23:21. 文件上传与对象存储
  89. 23.1:21.1 新增依赖
  90. 23.2:21.2 S3 客户端初始化
  91. 23.3:21.3 文件上传 Handler
  92. 23.4:21.4 生成预签名下载 URL
  93. 23.5:21.5 大文件分片上传(Multipart Upload)
  94. 23.6:21.6 路由注册

Rust + Axum 后端架构设计文档

目录

  1. 技术选型概览
  2. 核心依赖库详解
  3. 项目目录结构
  4. 分层架构设计
  5. 数据库与 ORM
  6. 错误处理体系
  7. 认证与授权
  8. 中间件设计
  9. 配置管理
  10. 日志与可观测性
  11. 异步任务与队列
  12. API 响应规范
  13. Cargo.toml 完整依赖
  14. 启动流程与代码示例
  15. Docker 容器化
  16. Kubernetes 部署
  17. CI/CD 流程
  18. 测试策略
  19. API 文档自动生成
  20. Prometheus Metrics 监控
  21. 文件上传与对象存储

1. 技术选型概览

整体架构图

┌─────────────────────────────────────────────────────┐
│                    客户端 / API 消费方                  │
└──────────────────────────┬──────────────────────────┘
                           │ HTTP / WebSocket
┌──────────────────────────▼──────────────────────────┐
│                   反向代理 (Nginx / Caddy)              │
└──────────────────────────┬──────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────┐
│                    Axum Web 服务器                      │
│  ┌────────────┐  ┌──────────────┐  ┌─────────────┐  │
│  │  中间件层   │  │   路由层      │  │  提取器层    │  │
│  │ (Tower)    │  │  (Router)    │  │ (Extractor) │  │
│  └────────────┘  └──────────────┘  └─────────────┘  │
│  ┌──────────────────────────────────────────────┐   │
│  │                  Handler 层                    │   │
│  └──────────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────────┐   │
│  │                  Service 层                    │   │
│  └──────────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────────┐   │
│  │                Repository 层                   │   │
│  └──────────────────────────────────────────────┘   │
└──────────────────────────┬──────────────────────────┘
           ┌───────────────┼───────────────┐
           ▼               ▼               ▼
    ┌─────────────┐ ┌──────────┐  ┌──────────────┐
    │ PostgreSQL  │ │  Redis   │  │  消息队列     │
    │  (sqlx)     │ │ (deadpool│  │  (RabbitMQ/  │
    └─────────────┘ │  -redis) │  │   Kafka)     │
                    └──────────┘  └──────────────┘

设计原则

  • 依赖注入:通过 Axum 的 State 传递应用状态,避免全局变量
  • 分层解耦:Handler → Service → Repository,各层职责清晰
  • 类型安全:充分利用 Rust 类型系统,在编译期消除运行时错误
  • 异步优先:全链路 async/await,基于 Tokio 运行时
  • 可观测性:结构化日志 + 链路追踪 + 指标上报

2. 核心依赖库详解

2.1 Web 框架层

axum — Web 框架核心

axum = { version = "0.8", features = ["macros", "ws", "multipart"] }

Tokio 团队出品的异步 Web 框架,基于 Tower 中间件生态。核心特性:
- 宏友好的路由声明(#[debug_handler]
- 零成本的请求提取器(Path, Query, Json, State
- 原生支持 WebSocket、SSE、Multipart
- 与 Tower 中间件完全兼容

tokio — 异步运行时

tokio = { version = "1", features = ["full"] }

Rust 生态事实标准的异步运行时,提供:
- 多线程工作窃取调度器
- 异步 I/O(网络、文件)
- 定时器、通道、同步原语
- #[tokio::main] 宏简化入口

tower / tower-http — 中间件生态

tower = { version = "0.5", features = ["full"] }
tower-http = { version = "0.6", features = [
    "cors", "trace", "compression-gzip", "request-id",
    "timeout", "limit", "fs", "auth"
] }

Tower 提供 Service / Layer trait,实现可组合的中间件。tower-http 包含:
- CorsLayer — CORS 跨域控制
- TraceLayer — 自动请求/响应追踪
- CompressionLayer — Gzip/Brotli 响应压缩
- RequestIdLayer — 自动注入请求 ID
- TimeoutLayer — 请求超时控制
- RequestBodyLimitLayer — 请求体大小限制


2.2 数据库层

sqlx — 异步 SQL 工具库

sqlx = { version = "0.8", features = [
    "runtime-tokio-rustls", "postgres", "uuid",
    "chrono", "json", "migrate"
] }

编译期验证 SQL 语句的异步数据库库,核心特性:
- 编译期 SQL 检查query! 宏在编译时验证 SQL 语法和类型
- 零 ORM 开销:直接映射到 Rust 结构体,无 N+1 风险
- 内置连接池PgPool 支持连接复用
- 迁移管理sqlx migrate CLI 管理数据库版本

sea-orm(可选替代)— 全功能 ORM

sea-orm = { version = "1", features = [
    "sqlx-postgres", "runtime-tokio-rustls", "macros"
] }

适合需要实体关系映射、复杂查询构建器的场景。与 sqlx 相比提供更高层抽象,但有一定运行时开销。

deadpool-redis — Redis 连接池

deadpool-redis = { version = "0.18", features = ["rt_tokio_1"] }
redis = { version = "0.27", features = ["tokio-comp", "json"] }

高性能 Redis 连接池,支持连接复用、健康检查、自动重连。用于:
- Session / JWT Token 黑名单
- 缓存热点数据
- 分布式锁
- 限流计数器


2.3 序列化与验证

serde / serde_json — 序列化框架

serde = { version = "1", features = ["derive"] }
serde_json = "1"

Rust 生态最重要的序列化库,#[derive(Serialize, Deserialize)] 零模板代码实现 JSON 互转。

validator — 请求体验证

validator = { version = "0.19", features = ["derive"] }

声明式验证宏,支持:
- 字符串长度、正则、邮箱、URL 验证
- 数字范围验证
- 自定义验证函数
- 嵌套结构验证

#[derive(Debug, Deserialize, Validate)]
pub struct CreateUserDto {
    #[validate(length(min = 2, max = 50))]
    pub name: String,
    #[validate(email)]
    pub email: String,
    #[validate(length(min = 8))]
    pub password: String,
}

2.4 认证与安全

jsonwebtoken — JWT 处理

jsonwebtoken = "9"

JWT 的编码、解码、验证,支持 HS256/RS256/ES256 等算法。

argon2 — 密码哈希

argon2 = "0.5"
password-hash = { version = "0.5", features = ["std"] }

Argon2id 是目前密码存储的最佳实践算法,抗 GPU 暴力破解,内存硬化。

axum-extra — 扩展提取器

axum-extra = { version = "0.10", features = ["typed-header", "cookie"] }

提供 TypedHeader 提取器(自动解析 Authorization: Bearer <token>)和 Cookie 管理。


2.5 配置与环境

config — 多环境配置

config = "0.14"

分层配置加载,支持 TOML/YAML/JSON/环境变量,优先级:环境变量 > 环境配置文件 > 默认配置。

dotenvy — .env 文件加载

dotenvy = "0.15"

在开发环境从 .env 文件注入环境变量。


2.6 日志与追踪

tracing / tracing-subscriber — 结构化日志

tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-opentelemetry = "0.28"
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }

分布式链路追踪 + 结构化日志一体化方案:
- tracing 提供 span!, info!, error! 等宏
- tracing-subscriber 配置输出格式(JSON/Pretty)和过滤级别
- opentelemetry-otlp 将 trace 上报至 Jaeger / Tempo


2.7 类型与工具

uuid — UUID 生成

uuid = { version = "1", features = ["v4", "v7", "serde"] }

生成 UUID v4(随机)或 v7(时间有序,适合数据库主键排序)。

chrono — 时间处理

chrono = { version = "0.4", features = ["serde"] }

全功能时间库,处理时区转换、时间格式化、时间运算。

thiserror — 错误定义

thiserror = "2"

通过 #[derive(Error)] 宏简洁定义错误枚举,自动实现 DisplayError trait。

anyhow — 错误传播

anyhow = "1"

在不需要精确匹配错误类型的场景(如初始化代码)中方便地传播任意错误。

bon — 构建者模式

bon = "3"

通过 #[builder] 宏为结构体自动生成类型安全的 Builder,替代冗长的手写 Builder 代码。


2.8 异步任务

tokio-cron-scheduler — 定时任务

tokio-cron-scheduler = "0.13"

基于 Tokio 的 Cron 调度器,支持标准 Cron 表达式,在同一进程内运行定时任务。

lapin — RabbitMQ 客户端(可选)

lapin = "2"

异步 AMQP 0-9-1 客户端,用于接入 RabbitMQ 消息队列实现异步任务处理。


3. 项目目录结构

my-api/
├── Cargo.toml
├── Cargo.lock
├── .env                          # 本地开发环境变量(不提交 git)
├── .env.example                  # 环境变量模板
├── config/
│   ├── default.toml              # 默认配置
│   ├── development.toml          # 开发环境配置
│   ├── production.toml           # 生产环境配置
│   └── test.toml                 # 测试环境配置
├── migrations/                   # sqlx 数据库迁移文件
│   ├── 20260101000000_create_users.sql
│   └── 20260102000000_create_posts.sql
├── src/
│   ├── main.rs                   # 程序入口
│   ├── lib.rs                    # 库入口(便于集成测试)
│   ├── app.rs                    # Axum 应用组装
│   ├── config/
│   │   ├── mod.rs
│   │   └── settings.rs           # 配置结构体定义
│   ├── db/
│   │   ├── mod.rs
│   │   └── pool.rs               # 数据库连接池初始化
│   ├── cache/
│   │   ├── mod.rs
│   │   └── redis.rs              # Redis 连接池初始化
│   ├── errors/
│   │   ├── mod.rs
│   │   └── app_error.rs          # 全局错误类型定义
│   ├── extractors/
│   │   ├── mod.rs
│   │   ├── auth.rs               # JWT 认证提取器
│   │   └── validated_json.rs     # 带验证的 JSON 提取器
│   ├── middleware/
│   │   ├── mod.rs
│   │   ├── auth.rs               # 认证中间件
│   │   └── rate_limit.rs         # 限流中间件
│   ├── models/                   # 数据库模型(对应表结构)
│   │   ├── mod.rs
│   │   └── user.rs
│   ├── dto/                      # 数据传输对象(请求/响应结构体)
│   │   ├── mod.rs
│   │   └── user.rs
│   ├── repositories/             # 数据访问层
│   │   ├── mod.rs
│   │   └── user_repository.rs
│   ├── services/                 # 业务逻辑层
│   │   ├── mod.rs
│   │   └── user_service.rs
│   ├── handlers/                 # HTTP 处理器层
│   │   ├── mod.rs
│   │   ├── health.rs
│   │   └── user.rs
│   ├── routes/                   # 路由注册
│   │   ├── mod.rs
│   │   ├── api_v1.rs
│   │   └── health.rs
│   ├── state.rs                  # 全局 AppState 定义
│   └── telemetry.rs              # 日志与追踪初始化
└── tests/
    ├── common/
    │   └── mod.rs                # 集成测试通用工具
    └── api/
        └── user_tests.rs         # 用户接口集成测试

4. 分层架构设计

4.1 各层职责

请求
  │
  ▼
┌──────────────────────────────────────────┐
│  Middleware 层                             │
│  - 认证检查(JWT 验证)                    │
│  - 请求日志记录                            │
│  - 限流                                   │
│  - CORS / 压缩                            │
└──────────────────┬───────────────────────┘
                   │
                   ▼
┌──────────────────────────────────────────┐
│  Handler 层(handlers/)                   │
│  - 解析并验证请求参数                       │
│  - 调用 Service 层                         │
│  - 将结果转换为 HTTP 响应                   │
│  - 不包含任何业务逻辑                       │
└──────────────────┬───────────────────────┘
                   │
                   ▼
┌──────────────────────────────────────────┐
│  Service 层(services/)                   │
│  - 核心业务逻辑                            │
│  - 事务管理                               │
│  - 调用多个 Repository 进行数据组合         │
│  - 调用外部服务(邮件、短信、对象存储等)    │
└──────────────────┬───────────────────────┘
                   │
                   ▼
┌──────────────────────────────────────────┐
│  Repository 层(repositories/)            │
│  - 封装所有数据库操作                       │
│  - 只关心数据的增删改查                     │
│  - 返回 Model 或领域对象                   │
│  - 无业务逻辑                              │
└──────────────────┬───────────────────────┘
                   │
                   ▼
             数据库 / 缓存

4.2 AppState 设计

AppState 是注入所有依赖的容器,通过 Axum 的 State 提取器在 handler 中访问:

// src/state.rs
use std::sync::Arc;
use sqlx::PgPool;
use deadpool_redis::Pool as RedisPool;
use crate::config::Settings;

#[derive(Clone)]
pub struct AppState {
    pub db: PgPool,
    pub redis: RedisPool,
    pub config: Arc<Settings>,
}

5. 数据库与 ORM

5.1 sqlx 使用模式

连接池初始化:

// src/db/pool.rs
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use crate::config::Settings;

pub async fn create_pool(settings: &Settings) -> anyhow::Result<PgPool> {
    PgPoolOptions::new()
        .max_connections(settings.database.max_connections)
        .min_connections(settings.database.min_connections)
        .acquire_timeout(std::time::Duration::from_secs(5))
        .connect(&settings.database.url)
        .await
        .map_err(Into::into)
}

Repository 实现模式:

// src/repositories/user_repository.rs
use sqlx::PgPool;
use uuid::Uuid;
use crate::models::user::User;
use crate::errors::AppError;

pub struct UserRepository {
    pool: PgPool,
}

impl UserRepository {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }

    pub async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, AppError> {
        let user = sqlx::query_as!(
            User,
            "SELECT * FROM users WHERE id = 1 AND deleted_at IS NULL",
            id
        )
        .fetch_optional(&self.pool)
        .await?;
        Ok(user)
    }

    pub async fn find_by_email(&self, email: &str) -> Result<Option<User>, AppError> {
        let user = sqlx::query_as!(
            User,
            "SELECT * FROM users WHERE email =1 AND deleted_at IS NULL",
            email
        )
        .fetch_optional(&self.pool)
        .await?;
        Ok(user)
    }

    pub async fn create(&self, name: &str, email: &str, password_hash: &str) -> Result<User, AppError> {
        let user = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (id, name, email, password_hash, created_at, updated_at)
            VALUES (1,2, 3,4, NOW(), NOW())
            RETURNING *
            "#,
            Uuid::new_v4(),
            name,
            email,
            password_hash
        )
        .fetch_one(&self.pool)
        .await?;
        Ok(user)
    }
}

5.2 数据库迁移

-- migrations/20260101000000_create_users.sql
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE users (
    id          UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name        VARCHAR(100) NOT NULL,
    email       VARCHAR(255) NOT NULL UNIQUE,
    password_hash TEXT NOT NULL,
    role        VARCHAR(50) NOT NULL DEFAULT 'user',
    is_active   BOOLEAN NOT NULL DEFAULT TRUE,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    deleted_at  TIMESTAMPTZ
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);

运行迁移:

sqlx migrate run --database-url $DATABASE_URL

6. 错误处理体系

6.1 错误类型定义

// src/errors/app_error.rs
use axum::{http::StatusCode, response::{IntoResponse, Response}, Json};
use serde_json::json;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum AppError {
    // 业务错误
    #[error("Resource not found: {0}")]
    NotFound(String),

    #[error("Unauthorized: {0}")]
    Unauthorized(String),

    #[error("Forbidden: {0}")]
    Forbidden(String),

    #[error("Validation error: {0}")]
    Validation(String),

    #[error("Conflict: {0}")]
    Conflict(String),

    // 基础设施错误(内部错误,不暴露给客户端)
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),

    #[error("Redis error: {0}")]
    Redis(#[from] deadpool_redis::PoolError),

    #[error("Internal error: {0}")]
    Internal(String),
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, code, message) = match &self {
            AppError::NotFound(msg)     => (StatusCode::NOT_FOUND, "NOT_FOUND", msg.clone()),
            AppError::Unauthorized(msg) => (StatusCode::UNAUTHORIZED, "UNAUTHORIZED", msg.clone()),
            AppError::Forbidden(msg)    => (StatusCode::FORBIDDEN, "FORBIDDEN", msg.clone()),
            AppError::Validation(msg)   => (StatusCode::UNPROCESSABLE_ENTITY, "VALIDATION_ERROR", msg.clone()),
            AppError::Conflict(msg)     => (StatusCode::CONFLICT, "CONFLICT", msg.clone()),
            // 基础设施错误统一返回 500,不暴露内部细节
            AppError::Database(e) => {
                tracing::error!(error = %e, "Database error");
                (StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
            }
            AppError::Redis(e) => {
                tracing::error!(error = %e, "Redis error");
                (StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
            }
            AppError::Internal(msg) => {
                tracing::error!(error = %msg, "Internal error");
                (StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
            }
        };

        let body = Json(json!({
            "success": false,
            "code": code,
            "message": message,
        }));

        (status, body).into_response()
    }
}

7. 认证与授权

7.1 JWT 结构

// src/extractors/auth.rs
use axum::{async_trait, extract::FromRequestParts, http::request::Parts};
use axum_extra::{headers::{Authorization, Bearer}, TypedHeader};
use jsonwebtoken::{decode, DecodingKey, Validation};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{errors::AppError, state::AppState};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
    pub sub: Uuid,          // 用户 ID
    pub email: String,
    pub role: String,
    pub exp: usize,         // 过期时间(Unix timestamp)
    pub iat: usize,         // 签发时间
}

/// 认证提取器:自动从请求头提取并验证 JWT
pub struct AuthUser(pub Claims);

#[async_trait]
impl FromRequestParts<AppState> for AuthUser {
    type Rejection = AppError;

    async fn from_request_parts(
        parts: &mut Parts,
        state: &AppState,
    ) -> Result<Self, Self::Rejection> {
        // 提取 Authorization: Bearer <token>
        let TypedHeader(Authorization(bearer)) =
            TypedHeader::<Authorization<Bearer>>::from_request_parts(parts, state)
                .await
                .map_err(|_| AppError::Unauthorized("Missing or invalid Authorization header".into()))?;

        // 检查 token 是否在黑名单(已登出)
        let token = bearer.token();
        if is_token_blacklisted(&state.redis, token).await? {
            return Err(AppError::Unauthorized("Token has been revoked".into()));
        }

        // 验证并解码 JWT
        let secret = state.config.auth.jwt_secret.as_bytes();
        let token_data = decode::<Claims>(token, &DecodingKey::from_secret(secret), &Validation::default())
            .map_err(|e| AppError::Unauthorized(format!("Invalid token: {e}")))?;

        Ok(AuthUser(token_data.claims))
    }
}

async fn is_token_blacklisted(redis: &deadpool_redis::Pool, token: &str) -> Result<bool, AppError> {
    use redis::AsyncCommands;
    let mut conn = redis.get().await?;
    let key = format!("blacklist:token:{token}");
    let exists: bool = conn.exists(&key).await.map_err(|e| AppError::Internal(e.to_string()))?;
    Ok(exists)
}

7.2 RBAC 权限控制

/// 角色提取器:在 AuthUser 基础上限制角色
pub struct RequireRole<const ROLE: &'static str>(pub Claims);

// 使用示例:
// async fn admin_handler(RequireRole::<"admin">(claims): RequireRole<"admin">) -> ...

#[async_trait]
impl<const ROLE: &'static str> FromRequestParts<AppState> for RequireRole<ROLE> {
    type Rejection = AppError;

    async fn from_request_parts(parts: &mut Parts, state: &AppState) -> Result<Self, Self::Rejection> {
        let AuthUser(claims) = AuthUser::from_request_parts(parts, state).await?;
        if claims.role != ROLE {
            return Err(AppError::Forbidden("Insufficient permissions".into()));
        }
        Ok(RequireRole(claims))
    }
}

8. 中间件设计

8.1 中间件注册顺序

中间件按照由外到内的顺序包裹路由,请求流过时从上到下,响应返回时从下到上:

// src/app.rs
use tower_http::{
    cors::CorsLayer,
    trace::TraceLayer,
    compression::CompressionLayer,
    request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
    timeout::TimeoutLayer,
    limit::RequestBodyLimitLayer,
};

pub fn build_app(state: AppState) -> Router {
    Router::new()
        .merge(routes::health::router())
        .nest("/api/v1", routes::api_v1::router())
        .with_state(state)
        // 中间件从下往上包裹(底部最先执行)
        .layer(
            tower::ServiceBuilder::new()
                // 1. 设置请求 ID(最外层,确保每个请求都有唯一 ID)
                .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
                // 2. 链路追踪
                .layer(TraceLayer::new_for_http())
                // 3. 超时控制
                .layer(TimeoutLayer::new(Duration::from_secs(30)))
                // 4. 请求体大小限制(10MB)
                .layer(RequestBodyLimitLayer::new(10 * 1024 * 1024))
                // 5. Gzip 响应压缩
                .layer(CompressionLayer::new())
                // 6. 传播请求 ID 到响应头
                .layer(PropagateRequestIdLayer::x_request_id())
                // 7. CORS(最内层)
                .layer(build_cors_layer()),
        )
}

8.2 限流中间件

// src/middleware/rate_limit.rs
use std::sync::Arc;
use axum::{extract::Request, middleware::Next, response::Response};
use redis::AsyncCommands;
use crate::{errors::AppError, state::AppState};

pub async fn rate_limit_middleware(
    axum::extract::State(state): axum::extract::State<AppState>,
    req: Request,
    next: Next,
) -> Result<Response, AppError> {
    // 以 IP 地址为限流 key(生产环境可换为用户 ID)
    let ip = req
        .headers()
        .get("x-forwarded-for")
        .and_then(|v| v.to_str().ok())
        .unwrap_or("unknown");

    let key = format!("rate_limit:{}:{}", ip, chrono::Utc::now().timestamp() / 60);

    let mut conn = state.redis.get().await?;
    let count: i64 = conn
        .incr(&key, 1_i64)
        .await
        .map_err(|e| AppError::Internal(e.to_string()))?;

    if count == 1 {
        // 第一次请求时设置 60 秒过期
        let _: () = conn.expire(&key, 60).await.map_err(|e| AppError::Internal(e.to_string()))?;
    }

    let limit = state.config.server.rate_limit_per_minute;
    if count > limit as i64 {
        return Err(AppError::Validation("Rate limit exceeded. Please slow down.".into()));
    }

    Ok(next.run(req).await)
}

9. 配置管理

9.1 配置结构体

// src/config/settings.rs
use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
    pub server: ServerSettings,
    pub database: DatabaseSettings,
    pub redis: RedisSettings,
    pub auth: AuthSettings,
}

#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
    pub host: String,
    pub port: u16,
    pub rate_limit_per_minute: u32,
    pub env: String,           // "development" | "production" | "test"
}

#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseSettings {
    pub url: String,
    pub max_connections: u32,
    pub min_connections: u32,
}

#[derive(Debug, Deserialize, Clone)]
pub struct RedisSettings {
    pub url: String,
    pub max_connections: usize,
}

#[derive(Debug, Deserialize, Clone)]
pub struct AuthSettings {
    pub jwt_secret: String,
    pub jwt_expires_in_seconds: u64,
    pub refresh_token_expires_in_days: u64,
}

9.2 配置加载

// src/config/mod.rs
pub use settings::Settings;
mod settings;

use config::{Config, Environment, File};

pub fn load_settings() -> anyhow::Result<Settings> {
    let env = std::env::var("APP_ENV").unwrap_or_else(|_| "development".into());

    let settings = Config::builder()
        // 默认配置(基础)
        .add_source(File::with_name("config/default"))
        // 环境专属配置(覆盖默认值)
        .add_source(File::with_name(&format!("config/{env}")).required(false))
        // 环境变量(最高优先级,格式:APP__SERVER__PORT=8080)
        .add_source(Environment::with_prefix("APP").separator("__"))
        .build()?
        .try_deserialize()?;

    Ok(settings)
}

config/default.toml

[server]
host = "0.0.0.0"
port = 8080
rate_limit_per_minute = 100
env = "development"

[database]
max_connections = 10
min_connections = 2

[auth]
jwt_expires_in_seconds = 3600        # 1 小时
refresh_token_expires_in_days = 30

10. 日志与可观测性

10.1 初始化链路追踪

// src/telemetry.rs
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

pub fn init_tracing(env: &str) {
    let env_filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("info,sqlx=warn,hyper=warn"));

    let subscriber = tracing_subscriber::registry().with(env_filter);

    if env == "production" {
        // 生产环境:JSON 格式,方便日志系统解析(Loki / CloudWatch)
        subscriber
            .with(tracing_subscriber::fmt::layer().json())
            .init();
    } else {
        // 开发环境:彩色人类可读格式
        subscriber
            .with(tracing_subscriber::fmt::layer().pretty())
            .init();
    }
}

10.2 在 Handler 中使用

#[tracing::instrument(name = "create_user", skip(state, body))]
pub async fn create_user(
    State(state): State<AppState>,
    ValidatedJson(body): ValidatedJson<CreateUserDto>,
) -> Result<impl IntoResponse, AppError> {
    tracing::info!(email = %body.email, "Creating new user");

    let user = state.user_service.create_user(body).await?;

    tracing::info!(user_id = %user.id, "User created successfully");
    Ok((StatusCode::CREATED, Json(ApiResponse::success(user))))
}

11. 异步任务与队列

11.1 定时任务

// 在 main.rs 中注册定时任务
use tokio_cron_scheduler::{Job, JobScheduler};

pub async fn start_scheduler(state: AppState) -> anyhow::Result<()> {
    let sched = JobScheduler::new().await?;

    // 每天凌晨 3 点清理过期的 Token 黑名单
    sched.add(Job::new_async("0 0 3 * * *", move |_, _| {
        let state = state.clone();
        Box::pin(async move {
            if let Err(e) = cleanup_expired_tokens(&state.redis).await {
                tracing::error!(error = %e, "Failed to cleanup expired tokens");
            }
        })
    })?).await?;

    sched.start().await?;
    Ok(())
}

11.2 后台任务(Tokio Spawn)

对于需要异步执行但不阻塞请求的任务(如发送欢迎邮件):

// 在 Service 层
pub async fn register_user(&self, dto: CreateUserDto) -> Result<User, AppError> {
    let user = self.user_repo.create(&dto.name, &dto.email, &hash).await?;

    // fire-and-forget:发送欢迎邮件不阻塞注册响应
    let email_service = self.email_service.clone();
    let user_clone = user.clone();
    tokio::spawn(async move {
        if let Err(e) = email_service.send_welcome(&user_clone).await {
            tracing::error!(error = %e, user_id = %user_clone.id, "Failed to send welcome email");
        }
    });

    Ok(user)
}

12. API 响应规范

12.1 统一响应结构

// src/dto/response.rs
use serde::Serialize;

#[derive(Serialize)]
pub struct ApiResponse<T: Serialize> {
    pub success: bool,
    pub data: Option<T>,
    pub message: Option<String>,
    pub meta: Option<PaginationMeta>,
}

#[derive(Serialize)]
pub struct PaginationMeta {
    pub total: i64,
    pub page: i64,
    pub per_page: i64,
    pub total_pages: i64,
}

impl<T: Serialize> ApiResponse<T> {
    pub fn success(data: T) -> Self {
        Self { success: true, data: Some(data), message: None, meta: None }
    }

    pub fn paginated(data: T, meta: PaginationMeta) -> Self {
        Self { success: true, data: Some(data), message: None, meta: Some(meta) }
    }
}

成功响应示例:

{
  "success": true,
  "data": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "name": "Alice",
    "email": "alice@example.com",
    "created_at": "2026-04-29T10:00:00Z"
  },
  "message": null,
  "meta": null
}

错误响应示例:

{
  "success": false,
  "code": "VALIDATION_ERROR",
  "message": "Email is already in use"
}

13. Cargo.toml 完整依赖

[package]
name    = "my-api"
version = "0.1.0"
edition = "2021"

[dependencies]
# ── Web 框架 ──────────────────────────────────────────────────
axum       = { version = "0.8", features = ["macros", "ws", "multipart"] }
axum-extra = { version = "0.10", features = ["typed-header", "cookie"] }
tokio      = { version = "1",   features = ["full"] }
tower      = { version = "0.5", features = ["full"] }
tower-http = { version = "0.6", features = [
    "cors", "trace", "compression-gzip",
    "request-id", "timeout", "limit", "auth"
] }

# ── 数据库 ────────────────────────────────────────────────────
sqlx = { version = "0.8", features = [
    "runtime-tokio-rustls", "postgres",
    "uuid", "chrono", "json", "migrate"
] }

# ── 缓存 ──────────────────────────────────────────────────────
deadpool-redis = { version = "0.18", features = ["rt_tokio_1"] }
redis          = { version = "0.27",  features = ["tokio-comp", "json"] }

# ── 序列化 ────────────────────────────────────────────────────
serde      = { version = "1", features = ["derive"] }
serde_json = "1"

# ── 验证 ──────────────────────────────────────────────────────
validator = { version = "0.19", features = ["derive"] }

# ── 认证与安全 ────────────────────────────────────────────────
jsonwebtoken   = "9"
argon2         = "0.5"
password-hash  = { version = "0.5", features = ["std"] }

# ── 配置 ──────────────────────────────────────────────────────
config  = "0.14"
dotenvy = "0.15"

# ── 日志与追踪 ────────────────────────────────────────────────
tracing              = "0.1"
tracing-subscriber   = { version = "0.3", features = ["env-filter", "json"] }
tracing-opentelemetry = "0.28"
opentelemetry        = "0.27"
opentelemetry-otlp   = { version = "0.27", features = ["tonic"] }

# ── 类型与工具 ────────────────────────────────────────────────
uuid    = { version = "1", features = ["v4", "v7", "serde"] }
chrono  = { version = "0.4", features = ["serde"] }
bon     = "3"

# ── 错误处理 ──────────────────────────────────────────────────
thiserror = "2"
anyhow    = "1"

# ── 异步任务 ──────────────────────────────────────────────────
tokio-cron-scheduler = "0.13"

[dev-dependencies]
# 集成测试 HTTP 客户端
reqwest    = { version = "0.12", features = ["json"] }
# 集成测试框架辅助
axum-test  = "16"
# 随机测试数据生成
fake       = { version = "3",  features = ["derive", "chrono", "uuid"] }

[profile.release]
opt-level = 3
lto       = "thin"
strip     = true

14. 启动流程与代码示例

14.1 main.rs

use std::sync::Arc;
use dotenvy::dotenv;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. 加载 .env(仅开发环境)
    dotenv().ok();

    // 2. 加载配置
    let settings = config::load_settings()?;

    // 3. 初始化日志与链路追踪
    telemetry::init_tracing(&settings.server.env);

    tracing::info!("Starting {} server...", env!("CARGO_PKG_NAME"));

    // 4. 初始化数据库连接池
    let db = db::pool::create_pool(&settings).await?;

    // 5. 运行数据库迁移
    sqlx::migrate!("./migrations").run(&db).await?;
    tracing::info!("Database migrations applied");

    // 6. 初始化 Redis 连接池
    let redis = cache::redis::create_pool(&settings)?;

    // 7. 组装 AppState
    let state = AppState {
        db,
        redis,
        config: Arc::new(settings.clone()),
    };

    // 8. 启动定时任务
    tasks::start_scheduler(state.clone()).await?;

    // 9. 构建 Axum 应用
    let app = app::build_app(state);

    // 10. 启动 HTTP 服务器
    let addr = format!("{}:{}", settings.server.host, settings.server.port);
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    tracing::info!("Server listening on http://{addr}");

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await?;

    Ok(())
}

/// 监听 Ctrl+C 和 SIGTERM 信号,实现优雅停机
async fn shutdown_signal() {
    use tokio::signal;
    let ctrl_c = async { signal::ctrl_c().await.expect("failed to listen for ctrl_c") };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c    => tracing::info!("Received Ctrl+C, shutting down..."),
        _ = terminate => tracing::info!("Received SIGTERM, shutting down..."),
    }
}

14.2 路由注册示例

// src/routes/api_v1.rs
use axum::{middleware, routing::{get, post, put, delete}, Router};
use crate::{handlers, middleware::rate_limit::rate_limit_middleware, state::AppState};

pub fn router() -> Router<AppState> {
    let public = Router::new()
        .route("/auth/register", post(handlers::auth::register))
        .route("/auth/login",    post(handlers::auth::login))
        .route("/health",        get(handlers::health::check));

    let protected = Router::new()
        .route("/users/me",      get(handlers::user::get_me))
        .route("/users/me",      put(handlers::user::update_me))
        .route("/users/:id",     get(handlers::user::get_by_id))
        // 限流仅应用于 protected 路由组
        .layer(middleware::from_fn_with_state(
            AppState::default(),  // 实际使用时通过 with_state 注入
            rate_limit_middleware,
        ));

    Router::new()
        .merge(public)
        .merge(protected)
}

附录:常用命令速查

# 开发热重载(需安装 cargo-watch)
cargo watch -x run

# 运行数据库迁移
sqlx migrate run --database-url DATABASE_URL

# 撤销最近一次迁移
sqlx migrate revert --database-urlDATABASE_URL

# 创建新迁移
sqlx migrate add create_posts_table

# 生成 sqlx 离线缓存(CI 环境无数据库时使用)
cargo sqlx prepare

# 运行所有测试
cargo test

# 运行集成测试
cargo test --test '*'

# Release 构建
cargo build --release

# 检查安全漏洞
cargo audit

15. Docker 容器化

15.1 Dockerfile(多阶段构建)

Rust 编译产物是静态二进制,非常适合多阶段构建:构建镜像负责编译,运行镜像只放最终二进制,体积可压缩到 ~20MB

# syntax=docker/dockerfile:1.7

# ────────────────────────────────────────────────
# Stage 1: 依赖缓存层
# 单独复制 Cargo 文件,利用 Docker 层缓存加速重复构建
# ────────────────────────────────────────────────
FROM rust:1.82-bookworm AS chef
RUN cargo install cargo-chef --locked
WORKDIR /app

FROM chef AS planner
COPY . .
# 生成依赖清单(recipe.json),内容仅在 Cargo.toml/Cargo.lock 变化时改变
RUN cargo chef prepare --recipe-path recipe.json

# ────────────────────────────────────────────────
# Stage 2: 编译层
# ────────────────────────────────────────────────
FROM chef AS builder

# 安装 sqlx 离线模式所需的 musl 工具链(可选,用于全静态二进制)
# RUN rustup target add x86_64-unknown-linux-musl
# RUN apt-get update && apt-get install -y musl-tools

# 先只编译依赖(利用缓存:只要依赖不变,此层命中缓存)
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json

# 再复制源代码并编译应用本体
COPY . .

# 设置 sqlx 离线模式,避免编译时连接数据库
ENV SQLX_OFFLINE=true

RUN cargo build --release --bin my-api

# ────────────────────────────────────────────────
# Stage 3: 运行镜像
# 使用 distroless 或 debian-slim,不含编译器和源码
# ────────────────────────────────────────────────
FROM debian:bookworm-slim AS runtime

# 安装运行时必要的 CA 证书(HTTPS 请求需要)和时区数据
RUN apt-get update && apt-get install -y \
    ca-certificates \
    tzdata \
    && rm -rf /var/lib/apt/lists/*

# 创建非 root 运行用户(安全最佳实践)
RUN groupadd --gid 1001 appgroup \
    && useradd  --uid 1001 --gid appgroup --shell /bin/bash --create-home appuser

WORKDIR /app

# 从构建层复制编译产物
COPY --from=builder /app/target/release/my-api ./my-api

# 复制配置文件(不含敏感信息)
COPY config/ ./config/

# 复制数据库迁移文件(运行时执行 migrate)
COPY migrations/ ./migrations/

# 切换到非 root 用户
USER appuser

# 暴露应用端口
EXPOSE 8080

# 健康检查(每 30 秒,失败 3 次标记为 unhealthy)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

ENTRYPOINT ["./my-api"]

关键设计点说明:

设计 原因
cargo-chef 两步构建 将依赖编译层与源码编译层分离,依赖不变时跳过漫长的依赖编译
SQLX_OFFLINE=true 构建期间无法连接数据库,使用 cargo sqlx prepare 生成的离线缓存
debian:bookworm-slim debian:bookworm 小 ~60%,保留 glibc(兼容动态链接库)
非 root 用户 容器进程以 uid 1001 运行,防止容器逃逸后拥有宿主机 root 权限
HEALTHCHECK Kubernetes / Docker Compose 依赖此检查决定是否路由流量

15.2 .dockerignore

# Git
.git
.gitignore

# 构建产物(构建镜像内会重新生成)
target/

# 开发环境文件
.env
.env.*
!.env.example

# 编辑器
.vscode/
.idea/
*.swp
*.swo

# 文档
*.md
docs/

# CI
.github/

15.3 docker-compose.yml(本地开发环境)

# docker-compose.yml
version: "3.9"

services:
  # ── 应用服务 ──────────────────────────────────────
  api:
    build:
      context: .
      dockerfile: Dockerfile
      target: runtime           # 使用生产镜像
    image: my-api:dev
    container_name: my-api
    restart: unless-stopped
    ports:
      - "8080:8080"
    environment:
      APP_ENV: development
      APP__DATABASE__URL: postgres://postgres:postgres@db:5432/mydb
      APP__REDIS__URL: redis://redis:6379
      APP__AUTH__JWT_SECRET: supersecretkey_change_in_production
      RUST_LOG: info,sqlx=warn
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    networks:
      - app-net
    volumes:
      # 挂载本地配置(开发时覆盖容器内配置,无需重新构建镜像)
      - ./config:/app/config:ro

  # ── PostgreSQL ────────────────────────────────────
  db:
    image: postgres:17-alpine
    container_name: my-api-db
    restart: unless-stopped
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: mydb
    ports:
      - "5432:5432"            # 暴露端口供本机 GUI 工具连接
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d mydb"]
      interval: 5s
      timeout: 5s
      retries: 10
    networks:
      - app-net

  # ── Redis ─────────────────────────────────────────
  redis:
    image: redis:7-alpine
    container_name: my-api-redis
    restart: unless-stopped
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5
    networks:
      - app-net

  # ── 数据库管理 UI(开发用)────────────────────────
  adminer:
    image: adminer:latest
    container_name: my-api-adminer
    restart: unless-stopped
    ports:
      - "8888:8080"
    depends_on:
      - db
    networks:
      - app-net
    profiles:
      - tools                  # 只在 --profile tools 时启动

volumes:
  postgres_data:
  redis_data:

networks:
  app-net:
    driver: bridge

15.4 docker-compose.override.yml(热重载开发模式)

使用 cargo-watch + 源码挂载,在容器内实现热重载,无需每次修改代码都重新构建镜像:

# docker-compose.override.yml(git 忽略,仅本地使用)
version: "3.9"

services:
  api:
    # 覆盖为开发镜像(含编译工具链)
    build:
      target: builder
    image: my-api:dev-watch
    command: cargo watch -x run
    environment:
      RUST_LOG: debug,sqlx=debug
    volumes:
      # 挂载源代码(实现热重载)
      - .:/app
      # 使用具名卷保留编译缓存,避免每次重启都重新编译
      - cargo_cache:/usr/local/cargo/registry
      - target_cache:/app/target
    ports:
      - "8080:8080"

volumes:
  cargo_cache:
  target_cache:

15.5 构建与运行命令

# ── 生产构建 ──────────────────────────────────────
# 首次构建(生成 sqlx 离线查询缓存,需要数据库连接)
cargo sqlx prepare

# 构建生产镜像
docker build -t my-api:latest .

# 查看镜像大小
docker images my-api

# ── 本地开发(docker-compose)────────────────────
# 启动所有服务(后台运行)
docker compose up -d

# 启动含工具服务(Adminer)
docker compose --profile tools up -d

# 热重载开发模式(自动合并 override 文件)
docker compose up

# 查看应用日志(实时)
docker compose logs -f api

# 进入应用容器调试
docker compose exec api /bin/bash

# 手动执行数据库迁移
docker compose exec api ./my-api migrate

# 停止并清理(保留数据卷)
docker compose down

# 停止并清理(含数据卷,慎用)
docker compose down -v

# ── 镜像推送 ──────────────────────────────────────
# 打标签并推送到私有镜像仓库
docker tag my-api:latest registry.example.com/my-api:1.0.0
docker push registry.example.com/my-api:1.0.0

15.6 生产环境优化:全静态二进制(musl)

如果目标是最小化镜像(使用 scratchdistroless),可以编译为静态链接的 musl 二进制:

FROM rust:1.82-bookworm AS builder

# 安装 musl 工具链
RUN rustup target add x86_64-unknown-linux-musl
RUN apt-get update && apt-get install -y musl-tools

WORKDIR /app
COPY . .
ENV SQLX_OFFLINE=true

# 编译为全静态二进制
RUN cargo build --release --target x86_64-unknown-linux-musl --bin my-api

# ── 使用 scratch 镜像(最小,~10MB)────────────────
FROM scratch AS runtime
COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/my-api /my-api
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY config/ /config/
COPY migrations/ /migrations/
EXPOSE 8080
ENTRYPOINT ["/my-api"]

注意scratch 镜像没有 shell,调试困难。生产推荐 gcr.io/distroless/cc-debian12,它包含最小运行时但无 shell。


16. Kubernetes 部署

目录结构

k8s/
├── namespace.yaml
├── configmap.yaml
├── secret.yaml
├── deployment.yaml
├── service.yaml
├── ingress.yaml
├── hpa.yaml
└── pdb.yaml

16.1 Namespace

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: my-api
  labels:
    name: my-api

16.2 ConfigMap(非敏感配置)

# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-api-config
  namespace: my-api
data:
  APP_ENV: "production"
  APP__SERVER__HOST: "0.0.0.0"
  APP__SERVER__PORT: "8080"
  APP__SERVER__RATE_LIMIT_PER_MINUTE: "100"
  APP__DATABASE__MAX_CONNECTIONS: "20"
  APP__DATABASE__MIN_CONNECTIONS: "5"
  RUST_LOG: "info,sqlx=warn,hyper=warn"
  TZ: "Asia/Shanghai"

16.3 Secret(敏感配置)

生产环境推荐用 VaultExternal Secrets Operator 替代裸 Secret,此处为最小化示例。

# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: my-api-secret
  namespace: my-api
type: Opaque
# 值须经过 base64 编码:echo -n "value" | base64
stringData:
  APP__DATABASE__URL: "postgres://user:pass@postgres-svc:5432/mydb"
  APP__REDIS__URL: "redis://:pass@redis-svc:6379"
  APP__AUTH__JWT_SECRET: "your-super-secret-key-change-in-production"

16.4 Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-api
  namespace: my-api
  labels:
    app: my-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-api
  # 滚动更新策略:最多下线 1 个旧 Pod,最多超出 1 个新 Pod
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1
      maxSurge: 1
  template:
    metadata:
      labels:
        app: my-api
      annotations:
        # 当 ConfigMap/Secret 内容变化时强制滚动更新
        configmap-checksum: "{{ checksum configmap.yaml }}"
    spec:
      # ── 安全上下文 ─────────────────────────────────
      securityContext:
        runAsNonRoot: true
        runAsUser: 1001
        runAsGroup: 1001
        fsGroup: 1001

      # ── 优雅终止等待时间(秒)────────────────────
      terminationGracePeriodSeconds: 30

      # ── 镜像拉取凭证(私有仓库)──────────────────
      imagePullSecrets:
        - name: registry-credentials

      containers:
        - name: my-api
          image: registry.example.com/my-api:1.0.0   # 由 CI 注入具体 tag
          imagePullPolicy: IfNotPresent
          ports:
            - name: http
              containerPort: 8080
              protocol: TCP

          # ── 环境变量:来自 ConfigMap ──────────────
          envFrom:
            - configMapRef:
                name: my-api-config
            - secretRef:
                name: my-api-secret

          # ── 资源限制 ─────────────────────────────
          resources:
            requests:
              cpu: "100m"
              memory: "128Mi"
            limits:
              cpu: "500m"
              memory: "512Mi"

          # ── 存活探针:进程假死时重启 Pod ──────────
          livenessProbe:
            httpGet:
              path: /health/live
              port: http
            initialDelaySeconds: 10
            periodSeconds: 15
            timeoutSeconds: 3
            failureThreshold: 3

          # ── 就绪探针:未就绪时不路由流量 ──────────
          readinessProbe:
            httpGet:
              path: /health/ready
              port: http
            initialDelaySeconds: 5
            periodSeconds: 10
            timeoutSeconds: 3
            failureThreshold: 3

          # ── 启动探针:给慢启动留出时间 ────────────
          startupProbe:
            httpGet:
              path: /health/live
              port: http
            failureThreshold: 12      # 最多等待 12 * 5s = 60s
            periodSeconds: 5

          # ── 容器安全上下文 ────────────────────────
          securityContext:
            allowPrivilegeEscalation: false
            readOnlyRootFilesystem: true
            capabilities:
              drop: ["ALL"]

          # ── 只读根文件系统时挂载可写临时目录 ──────
          volumeMounts:
            - name: tmp
              mountPath: /tmp

      volumes:
        - name: tmp
          emptyDir: {}

      # ── 拓扑分散约束:Pod 分布到不同节点 ─────────
      topologySpreadConstraints:
        - maxSkew: 1
          topologyKey: kubernetes.io/hostname
          whenUnsatisfiable: DoNotSchedule
          labelSelector:
            matchLabels:
              app: my-api

16.5 Service

# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: my-api-svc
  namespace: my-api
  labels:
    app: my-api
spec:
  type: ClusterIP        # 集群内部访问;对外由 Ingress 暴露
  selector:
    app: my-api
  ports:
    - name: http
      port: 80
      targetPort: http   # 引用容器 port name
      protocol: TCP

16.6 Ingress(以 Nginx Ingress Controller 为例)

# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: my-api-ingress
  namespace: my-api
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
    # 请求体大小限制
    nginx.ingress.kubernetes.io/proxy-body-size: "10m"
    # 启用 HTTPS 重定向
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    # cert-manager 自动签发 TLS 证书
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  ingressClassName: nginx
  tls:
    - hosts:
        - api.example.com
      secretName: my-api-tls
  rules:
    - host: api.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: my-api-svc
                port:
                  name: http

16.7 HorizontalPodAutoscaler(自动扩缩容)

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: my-api-hpa
  namespace: my-api
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: my-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
    # CPU 使用率超过 70% 时扩容
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    # 内存使用率超过 80% 时扩容
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
  behavior:
    # 扩容:立即响应,每次最多新增 2 个 Pod
    scaleUp:
      stabilizationWindowSeconds: 30
      policies:
        - type: Pods
          value: 2
          periodSeconds: 60
    # 缩容:稳定 5 分钟后才缩,防止抖动
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 20
          periodSeconds: 60

16.8 PodDisruptionBudget(节点维护保护)

# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: my-api-pdb
  namespace: my-api
spec:
  # 节点驱逐/升级时,始终保持至少 2 个 Pod 在线
  minAvailable: 2
  selector:
    matchLabels:
      app: my-api

16.9 健康检查 Handler

Deployment 中引用的三个探针端点对应代码实现:

// src/handlers/health.rs
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
use crate::state::AppState;

/// /health/live — 存活检查:进程是否正常运行
pub async fn liveness() -> impl IntoResponse {
    (StatusCode::OK, Json(json!({ "status": "ok" })))
}

/// /health/ready — 就绪检查:是否可以接收流量(含依赖检查)
pub async fn readiness(State(state): State<AppState>) -> impl IntoResponse {
    // 检查数据库连通性
    let db_ok = sqlx::query("SELECT 1")
        .execute(&state.db)
        .await
        .is_ok();

    // 检查 Redis 连通性
    let redis_ok = state.redis.get().await.is_ok();

    if db_ok && redis_ok {
        (StatusCode::OK, Json(json!({ "status": "ready", "db": "ok", "redis": "ok" })))
    } else {
        (
            StatusCode::SERVICE_UNAVAILABLE,
            Json(json!({
                "status": "not ready",
                "db": if db_ok { "ok" } else { "error" },
                "redis": if redis_ok { "ok" } else { "error" },
            })),
        )
    }
}
// src/routes/health.rs
use axum::{routing::get, Router};
use crate::{handlers::health, state::AppState};

pub fn router() -> Router<AppState> {
    Router::new()
        .route("/health/live",  get(health::liveness))
        .route("/health/ready", get(health::readiness))
}

16.10 常用 kubectl 命令

# 应用所有 k8s 资源
kubectl apply -f k8s/

# 查看 Pod 状态
kubectl get pods -n my-api -w

# 查看滚动更新进度
kubectl rollout status deployment/my-api -n my-api

# 回滚到上一个版本
kubectl rollout undo deployment/my-api -n my-api

# 查看 Pod 日志(实时)
kubectl logs -f deployment/my-api -n my-api

# 查看某个 Pod 的详细事件(排查 CrashLoopBackOff)
kubectl describe pod <pod-name> -n my-api

# 手动触发扩缩容
kubectl scale deployment/my-api --replicas=5 -n my-api

# 查看 HPA 当前状态
kubectl get hpa -n my-api

# 进入 Pod 调试
kubectl exec -it <pod-name> -n my-api -- /bin/sh

# 强制滚动更新(镜像 tag 未变时触发)
kubectl rollout restart deployment/my-api -n my-api

17. CI/CD 流程

17.1 流程总览

代码推送 / PR
     │
     ▼
┌─────────────────────────────────┐
│  CI — GitHub Actions             │
│                                  │
│  1. fmt + clippy(代码质量)     │
│  2. 单元测试 + 集成测试           │
│  3. cargo audit(安全扫描)      │
│  4. Docker 镜像构建 & 推送        │
│  5. sqlx 离线缓存验证             │
└───────────────┬─────────────────┘
                │ main 分支合并后
                ▼
┌─────────────────────────────────┐
│  CD — GitHub Actions             │
│                                  │
│  1. 更新 k8s Deployment 镜像 tag │
│  2. kubectl apply                │
│  3. 等待 rollout 完成            │
│  4. 烟雾测试(curl /health)     │
│  5. 失败自动回滚                  │
└─────────────────────────────────┘

17.2 CI 工作流

# .github/workflows/ci.yml
name: CI

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  CARGO_TERM_COLOR: always
  SQLX_OFFLINE: true          # 不连接数据库,使用离线缓存

jobs:
  # ── 代码质量检查 ──────────────────────────────────
  lint:
    name: Lint
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install Rust toolchain
        uses: dtolnay/rust-toolchain@stable
        with:
          components: rustfmt, clippy

      - name: Cache cargo
        uses: Swatinem/rust-cache@v2

      - name: Check formatting
        run: cargo fmt --all -- --check

      - name: Run clippy
        run: cargo clippy --all-targets --all-features -- -D warnings

  # ── 测试 ──────────────────────────────────────────
  test:
    name: Test
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:17-alpine
        env:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: test_db
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 5s
          --health-timeout 5s
          --health-retries 10
      redis:
        image: redis:7-alpine
        ports:
          - 6379:6379
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 5s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Install Rust toolchain
        uses: dtolnay/rust-toolchain@stable

      - name: Cache cargo
        uses: Swatinem/rust-cache@v2

      - name: Run database migrations
        run: cargo sqlx migrate run
        env:
          DATABASE_URL: postgres://postgres:postgres@localhost:5432/test_db

      - name: Run tests
        run: cargo test --all-features
        env:
          APP__DATABASE__URL: postgres://postgres:postgres@localhost:5432/test_db
          APP__REDIS__URL: redis://localhost:6379
          APP__AUTH__JWT_SECRET: test-secret-key
          APP_ENV: test

  # ── 安全审计 ──────────────────────────────────────
  audit:
    name: Security Audit
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: rustsec/audit-check@v1
        with:
          token: {{ secrets.GITHUB_TOKEN }}

  # ── sqlx 离线缓存一致性验证 ───────────────────────
  sqlx-check:
    name: SQLx Offline Check
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:17-alpine
        env:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: check_db
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 5s
          --health-retries 10

    steps:
      - uses: actions/checkout@v4
      - uses: dtolnay/rust-toolchain@stable
      - uses: Swatinem/rust-cache@v2

      - name: Install sqlx-cli
        run: cargo install sqlx-cli --no-default-features --features postgres

      - name: Run migrations
        run: cargo sqlx migrate run
        env:
          DATABASE_URL: postgres://postgres:postgres@localhost:5432/check_db

      - name: Check sqlx offline cache is up-to-date
        run: cargo sqlx prepare --check
        env:
          DATABASE_URL: postgres://postgres:postgres@localhost:5432/check_db

  # ── 构建并推送 Docker 镜像 ────────────────────────
  build:
    name: Build&Push Image
    runs-on: ubuntu-latest
    needs: [lint, test, audit, sqlx-check]
    if: github.ref == 'refs/heads/main'
    outputs:
      image-tag:{{ steps.meta.outputs.version }}

    steps:
      - uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Log in to registry
        uses: docker/login-action@v3
        with:
          registry: {{ secrets.REGISTRY_URL }}
          username:{{ secrets.REGISTRY_USERNAME }}
          password: {{ secrets.REGISTRY_PASSWORD }}

      - name: Extract Docker metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images:{{ secrets.REGISTRY_URL }}/my-api
          tags: |
            type=sha,prefix=,format=short
            type=ref,event=branch
            type=semver,pattern={{version}}

      - name: Build and push
        uses: docker/build-push-action@v6
        with:
          context: .
          push: true
          tags: {{ steps.meta.outputs.tags }}
          labels:{{ steps.meta.outputs.labels }}
          # 使用 GitHub Actions 缓存加速构建
          cache-from: type=gha
          cache-to: type=gha,mode=max
          # 构建参数
          build-args: |
            BUILDTIME={{ github.event.repository.updated_at }}
            VERSION={{ steps.meta.outputs.version }}
            REVISION=${{ github.sha }}

17.3 CD 工作流

# .github/workflows/cd.yml
name: CD

on:
  workflow_run:
    workflows: ["CI"]
    types: [completed]
    branches: [main]

jobs:
  deploy:
    name: Deploy to Kubernetes
    runs-on: ubuntu-latest
    if: {{ github.event.workflow_run.conclusion == 'success' }}
    environment: production      # 需要手动审批(在 GitHub Environments 配置)

    steps:
      - uses: actions/checkout@v4

      # ── 配置 kubectl ──────────────────────────────
      - name: Set up kubectl
        uses: azure/setup-kubectl@v4

      - name: Configure kubeconfig
        run: |
          mkdir -p ~/.kube
          echo "{{ secrets.KUBECONFIG }}" | base64 -d > ~/.kube/config
          chmod 600 ~/.kube/config

      # ── 获取本次构建的镜像 tag ────────────────────
      - name: Get image tag
        id: image
        run: |
          SHA=(echo{{ github.sha }} | cut -c1-7)
          echo "tag={{ secrets.REGISTRY_URL }}/my-api:{SHA}" >> GITHUB_OUTPUT

      # ── 更新 Deployment 镜像 ──────────────────────
      - name: Update image
        run: |
          kubectl set image deployment/my-api \
            my-api={{ steps.image.outputs.tag }} \
            -n my-api

      # ── 等待滚动更新完成(最多 5 分钟)────────────
      - name: Wait for rollout
        run: |
          kubectl rollout status deployment/my-api \
            -n my-api \
            --timeout=300s

      # ── 烟雾测试:验证健康检查接口 ────────────────
      - name: Smoke test
        run: |
          ENDPOINT="https://api.example.com/health/ready"
          for i in (seq 1 5); do
            STATUS=(curl -s -o /dev/null -w "%{http_code}" ENDPOINT)
            if [ "STATUS" = "200" ]; then
              echo "Smoke test passed (attempt i)"
              exit 0
            fi
            echo "Attempti failed (status STATUS), retrying..."
            sleep 10
          done
          echo "Smoke test failed after 5 attempts"
          exit 1

      # ── 失败时自动回滚 ────────────────────────────
      - name: Rollback on failure
        if: failure()
        run: |
          echo "Deployment failed, rolling back..."
          kubectl rollout undo deployment/my-api -n my-api
          kubectl rollout status deployment/my-api -n my-api --timeout=120s

      # ── 通知(Slack / 钉钉)──────────────────────
      - name: Notify success
        if: success()
        uses: slackapi/slack-github-action@v2
        with:
          payload: |
            {
              "text": ":white_check_mark: *my-api* deployed successfully\nTag: `{{ steps.image.outputs.tag }}`\nCommit: {{ github.sha }}"
            }
        env:
          SLACK_WEBHOOK_URL:{{ secrets.SLACK_WEBHOOK_URL }}

      - name: Notify failure
        if: failure()
        uses: slackapi/slack-github-action@v2
        with:
          payload: |
            {
              "text": ":x: *my-api* deployment FAILED and rolled back\nCommit: {{ github.sha }}\nSee:{{ github.server_url }}/{{ github.repository }}/actions/runs/{{ github.run_id }}"
            }
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

17.4 分支策略与 Git Flow

main ──────────────────────────────────────────► 生产部署
  ▲                  ▲
  │ PR + Review      │ PR + Review
  │                  │
develop ─────────────────────────────────────► 测试环境
  ▲         ▲         ▲
  │         │         │
feat/xxx  fix/yyy  chore/zzz
分支 触发动作
feat/* / fix/* 仅运行 lint + test
develop 合并 运行全量 CI,自动部署至 staging 环境
main 合并 运行全量 CI + CD,手动审批后部署至 production
Tag v*.*.* 额外构建带版本号的镜像,生成 GitHub Release

17.5 Secrets 配置清单

在 GitHub 仓库 Settings → Secrets and variables → Actions 中配置:

Secret 名称 说明
REGISTRY_URL 镜像仓库地址(如 ghcr.io/org
REGISTRY_USERNAME 仓库登录用户名
REGISTRY_PASSWORD 仓库登录密码 / Token
KUBECONFIG base64 编码的 kubeconfig 文件
SLACK_WEBHOOK_URL Slack 通知 Webhook 地址

18. 测试策略

18.1 测试分层

┌─────────────────────────────────────┐
│  E2E 测试(极少量,验证关键路径)     │  ← 慢、成本高
├─────────────────────────────────────┤
│  集成测试(API 层,含真实 DB/Redis) │  ← 中速、覆盖业务
├─────────────────────────────────────┤
│  单元测试(Service/纯函数逻辑)      │  ← 快、覆盖细节
└─────────────────────────────────────┘

新增依赖:

[dev-dependencies]
axum-test       = "16"           # 内存级 HTTP 测试,无需绑端口
reqwest         = { version = "0.12", features = ["json"] }
sqlx             = { version = "0.8", features = ["test"] }
tokio            = { version = "1", features = ["full", "test-util"] }
fake             = { version = "3", features = ["derive", "uuid", "chrono"] }
wiremock         = "0.6"         # Mock 外部 HTTP 服务
serial_test      = "3"           # 需要串行执行的测试(共享 DB 状态)

18.2 单元测试

针对 Service 层纯业务逻辑,Mock Repository 依赖:

// src/services/user_service.rs 末尾
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_password_hash_and_verify() {
        let plain = "my_secure_password";
        let hash = hash_password(plain).unwrap();

        assert_ne!(plain, hash);
        assert!(verify_password(plain, &hash).unwrap());
        assert!(!verify_password("wrong_password", &hash).unwrap());
    }

    #[tokio::test]
    async fn test_generate_jwt_and_parse_claims() {
        let user_id = uuid::Uuid::new_v4();
        let secret = "test-secret";

        let token = generate_jwt(user_id, "user", secret, 3600).unwrap();
        let claims = parse_jwt(&token, secret).unwrap();

        assert_eq!(claims.sub, user_id);
        assert_eq!(claims.role, "user");
    }

    #[tokio::test]
    async fn test_expired_jwt_returns_error() {
        let token = generate_jwt(uuid::Uuid::new_v4(), "user", "secret", 0).unwrap();
        // 过期时间为 0,立即过期
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        let result = parse_jwt(&token, "secret");
        assert!(result.is_err());
    }
}

18.3 集成测试工具函数

// tests/common/mod.rs
use axum_test::TestServer;
use sqlx::PgPool;
use my_api::{app::build_app, state::AppState, config::load_settings};
use std::sync::Arc;

pub struct TestApp {
    pub server: TestServer,
    pub db: PgPool,
}

impl TestApp {
    pub async fn spawn() -> Self {
        // 加载测试环境配置
        std::env::set_var("APP_ENV", "test");
        dotenvy::from_filename(".env.test").ok();
        let settings = load_settings().expect("Failed to load test settings");

        // 连接数据库,每个测试使用独立 schema 隔离
        let db = PgPool::connect(&settings.database.url).await.unwrap();
        let schema = format!("test_{}", uuid::Uuid::new_v4().simple());
        sqlx::query(&format!("CREATE SCHEMA {schema}"))
            .execute(&db)
            .await
            .unwrap();
        sqlx::query(&format!("SET search_path TO {schema}"))
            .execute(&db)
            .await
            .unwrap();
        sqlx::migrate!("./migrations").run(&db).await.unwrap();

        let redis = my_api::cache::redis::create_pool(&settings).unwrap();
        let state = AppState { db: db.clone(), redis, config: Arc::new(settings) };
        let app = build_app(state);
        let server = TestServer::new(app).unwrap();

        TestApp { server, db }
    }
}

/// 快捷注册 + 登录,返回 Bearer token
pub async fn create_user_and_login(app: &TestApp) -> String {
    use fake::{faker::internet::en::*, Fake};

    let email: String = SafeEmail().fake();
    let password = "Test@123456";

    app.server
        .post("/api/v1/auth/register")
        .json(&serde_json::json!({ "name": "Test User", "email": email, "password": password }))
        .await
        .assert_status_created();

    let resp = app.server
        .post("/api/v1/auth/login")
        .json(&serde_json::json!({ "email": email, "password": password }))
        .await;
    resp.assert_status_ok();

    let body: serde_json::Value = resp.json();
    format!("Bearer {}", body["data"]["access_token"].as_str().unwrap())
}

18.4 API 集成测试示例

// tests/api/user_tests.rs
mod common;
use common::{TestApp, create_user_and_login};

#[tokio::test]
async fn test_register_success() {
    let app = TestApp::spawn().await;

    let resp = app.server
        .post("/api/v1/auth/register")
        .json(&serde_json::json!({
            "name": "Alice",
            "email": "alice@example.com",
            "password": "Password@123"
        }))
        .await;

    resp.assert_status_created();
    let body: serde_json::Value = resp.json();
    assert_eq!(body["success"], true);
    assert_eq!(body["data"]["email"], "alice@example.com");
}

#[tokio::test]
async fn test_register_duplicate_email_returns_conflict() {
    let app = TestApp::spawn().await;
    let payload = serde_json::json!({
        "name": "Bob", "email": "bob@example.com", "password": "Password@123"
    });

    app.server.post("/api/v1/auth/register").json(&payload).await.assert_status_created();
    // 第二次注册相同邮箱
    let resp = app.server.post("/api/v1/auth/register").json(&payload).await;
    resp.assert_status(axum::http::StatusCode::CONFLICT);
}

#[tokio::test]
async fn test_get_me_without_token_returns_401() {
    let app = TestApp::spawn().await;
    app.server.get("/api/v1/users/me").await.assert_status_unauthorized();
}

#[tokio::test]
async fn test_get_me_with_valid_token() {
    let app = TestApp::spawn().await;
    let token = create_user_and_login(&app).await;

    let resp = app.server
        .get("/api/v1/users/me")
        .add_header("Authorization", token)
        .await;

    resp.assert_status_ok();
    let body: serde_json::Value = resp.json();
    assert_eq!(body["success"], true);
    assert!(body["data"]["id"].is_string());
}

#[tokio::test]
async fn test_validation_rejects_invalid_email() {
    let app = TestApp::spawn().await;

    let resp = app.server
        .post("/api/v1/auth/register")
        .json(&serde_json::json!({
            "name": "X",
            "email": "not-an-email",
            "password": "short"
        }))
        .await;

    resp.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY);
    let body: serde_json::Value = resp.json();
    assert_eq!(body["code"], "VALIDATION_ERROR");
}

18.5 Mock 外部 HTTP 服务

使用 wiremock Mock 第三方 API(如邮件服务、短信网关):

// tests/api/email_tests.rs
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};

#[tokio::test]
async fn test_register_triggers_welcome_email() {
    // 启动本地 Mock 服务器
    let mock_server = MockServer::start().await;

    // 注册预期请求:POST /v1/mail/send 返回 202
    Mock::given(method("POST"))
        .and(path("/v1/mail/send"))
        .respond_with(ResponseTemplate::new(202))
        .expect(1)           // 断言必须被调用 1 次
        .mount(&mock_server)
        .await;

    // 将 Mock 服务器地址注入到应用配置
    std::env::set_var("APP__EMAIL__BASE_URL", mock_server.uri());

    let app = TestApp::spawn().await;
    app.server
        .post("/api/v1/auth/register")
        .json(&serde_json::json!({
            "name": "Carol", "email": "carol@example.com", "password": "Password@123"
        }))
        .await
        .assert_status_created();

    // wiremock 在 drop 时自动验证 expect(1)
}

18.6 测试覆盖率

# 安装 cargo-tarpaulin(Linux)
cargo install cargo-tarpaulin

# 生成覆盖率报告(HTML)
cargo tarpaulin --out Html --output-dir coverage/

# 或使用 cargo-llvm-cov(跨平台)
cargo install cargo-llvm-cov
cargo llvm-cov --html --output-dir coverage/

# CI 中输出 lcov 格式上传 Codecov
cargo llvm-cov --lcov --output-path lcov.info

在 CI 中添加覆盖率上报:

# .github/workflows/ci.yml 追加步骤
- name: Generate coverage report
  run: cargo llvm-cov --lcov --output-path lcov.info
  env:
    DATABASE_URL: postgres://postgres:postgres@localhost:5432/test_db

- name: Upload to Codecov
  uses: codecov/codecov-action@v4
  with:
    files: lcov.info
    token: ${{ secrets.CODECOV_TOKEN }}

19. API 文档自动生成

使用 utoipa 从代码注解生成 OpenAPI 3.0 规范,并内嵌 Swagger UI。

19.1 新增依赖

[dependencies]
utoipa            = { version = "5", features = ["axum_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "8", features = ["axum"] }
utoipa-redoc      = { version = "5", features = ["axum"] }   # 可选:更美观的文档 UI

19.2 为 DTO 添加 Schema 注解

// src/dto/user.rs
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use validator::Validate;

#[derive(Debug, Deserialize, Validate, ToSchema)]
pub struct CreateUserDto {
    /// 用户显示名称
    #[validate(length(min = 2, max = 50))]
    #[schema(example = "Alice")]
    pub name: String,

    /// 用户邮箱(唯一)
    #[validate(email)]
    #[schema(example = "alice@example.com")]
    pub email: String,

    /// 登录密码(至少 8 位)
    #[validate(length(min = 8))]
    #[schema(example = "Password@123", format = "password")]
    pub password: String,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct UserResponse {
    #[schema(example = "550e8400-e29b-41d4-a716-446655440000")]
    pub id: uuid::Uuid,
    #[schema(example = "Alice")]
    pub name: String,
    #[schema(example = "alice@example.com")]
    pub email: String,
    pub role: String,
    pub created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct LoginDto {
    #[schema(example = "alice@example.com")]
    pub email: String,
    #[schema(example = "Password@123", format = "password")]
    pub password: String,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct TokenResponse {
    pub access_token: String,
    pub token_type: String,
    pub expires_in: u64,
}

19.3 为 Handler 添加路径注解

// src/handlers/auth.rs
use utoipa::openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme};

/// 用户注册
#[utoipa::path(
    post,
    path = "/api/v1/auth/register",
    tag = "Auth",
    request_body = CreateUserDto,
    responses(
        (status = 201, description = "注册成功", body = ApiResponse<UserResponse>),
        (status = 409, description = "邮箱已存在", body = ErrorResponse),
        (status = 422, description = "参数校验失败", body = ErrorResponse),
    )
)]
pub async fn register(/* ... */) -> impl IntoResponse { /* ... */ }

/// 用户登录
#[utoipa::path(
    post,
    path = "/api/v1/auth/login",
    tag = "Auth",
    request_body = LoginDto,
    responses(
        (status = 200, description = "登录成功", body = ApiResponse<TokenResponse>),
        (status = 401, description = "邮箱或密码错误", body = ErrorResponse),
    )
)]
pub async fn login(/* ... */) -> impl IntoResponse { /* ... */ }

/// 获取当前用户信息
#[utoipa::path(
    get,
    path = "/api/v1/users/me",
    tag = "User",
    security(("bearer_auth" = [])),
    responses(
        (status = 200, description = "获取成功", body = ApiResponse<UserResponse>),
        (status = 401, description = "未认证", body = ErrorResponse),
    )
)]
pub async fn get_me(/* ... */) -> impl IntoResponse { /* ... */ }

19.4 组装 OpenAPI 文档并挂载 Swagger UI

// src/openapi.rs
use utoipa::{openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme}, OpenApi};
use utoipa_swagger_ui::SwaggerUi;

#[derive(OpenApi)]
#[openapi(
    info(
        title = "My API",
        version = "1.0.0",
        description = "Rust + Axum 后端服务",
        contact(name = "Backend Team", email = "backend@example.com"),
        license(name = "MIT"),
    ),
    paths(
        crate::handlers::auth::register,
        crate::handlers::auth::login,
        crate::handlers::user::get_me,
    ),
    components(
        schemas(
            crate::dto::user::CreateUserDto,
            crate::dto::user::LoginDto,
            crate::dto::user::UserResponse,
            crate::dto::user::TokenResponse,
        )
    ),
    modifiers(&SecurityAddon),
    tags(
        (name = "Auth",   description = "认证相关接口"),
        (name = "User",   description = "用户管理接口"),
        (name = "Health", description = "健康检查"),
    )
)]
pub struct ApiDoc;

struct SecurityAddon;
impl utoipa::Modify for SecurityAddon {
    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
        let components = openapi.components.get_or_insert_with(Default::default);
        components.add_security_scheme(
            "bearer_auth",
            SecurityScheme::Http(
                HttpBuilder::new()
                    .scheme(HttpAuthScheme::Bearer)
                    .bearer_format("JWT")
                    .build(),
            ),
        );
    }
}

/// 注册文档路由
pub fn swagger_router() -> axum::Router {
    SwaggerUi::new("/docs")
        .url("/api-docs/openapi.json", ApiDoc::openapi())
        .into()
}

app.rs 中挂载(仅非生产环境):

// src/app.rs
pub fn build_app(state: AppState) -> Router {
    let mut app = Router::new()
        .merge(routes::health::router())
        .nest("/api/v1", routes::api_v1::router())
        .with_state(state.clone());

    // 生产环境关闭 Swagger UI,避免接口信息泄露
    if state.config.server.env != "production" {
        app = app.merge(crate::openapi::swagger_router());
    }

    app.layer(/* 中间件 */)
}

访问地址:http://localhost:8080/docs


20. Prometheus Metrics 监控

20.1 新增依赖

[dependencies]
axum-prometheus   = "0.7"
prometheus-client = "0.22"    # 自定义指标

20.2 内置 HTTP 指标

axum-prometheus 自动记录每个路由的请求数、延迟分布、状态码分布:

// src/app.rs
use axum_prometheus::{PrometheusMetricLayer, metrics_exporter_prometheus};

pub fn build_app(state: AppState) -> Router {
    // 初始化 Prometheus exporter
    let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();

    Router::new()
        // 指标暴露端点(通常由 Prometheus Server 抓取)
        .route("/metrics", axum::routing::get(move || async move {
            metric_handle.render()
        }))
        .merge(routes::health::router())
        .nest("/api/v1", routes::api_v1::router())
        .with_state(state)
        .layer(prometheus_layer)   // 自动统计所有路由的 HTTP 指标
        .layer(/* 其他中间件 */)
}

自动生成的指标:

指标名 类型 说明
axum_http_requests_total Counter 请求总数(含 method、path、status)
axum_http_requests_duration_seconds Histogram 请求延迟分布
axum_http_requests_pending Gauge 当前正在处理的请求数

20.3 自定义业务指标

// src/metrics.rs
use prometheus_client::{
    encoding::EncodeLabelSet,
    metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
    registry::Registry,
};
use std::sync::{Arc, Mutex};

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct AuthLabels {
    pub method: String,   // "register" | "login"
    pub result: String,   // "success" | "failure"
}

#[derive(Clone)]
pub struct AppMetrics {
    /// 注册/登录次数
    pub auth_attempts: Family<AuthLabels, Counter>,
    /// 当前活跃用户 session 数
    pub active_sessions: Gauge,
    /// 数据库查询耗时分布
    pub db_query_duration: Histogram,
}

impl AppMetrics {
    pub fn new(registry: &mut Registry) -> Self {
        let auth_attempts = Family::<AuthLabels, Counter>::default();
        let active_sessions = Gauge::default();
        let db_query_duration = Histogram::new(
            [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0].into_iter(),
        );

        registry.register("app_auth_attempts",    "认证尝试次数", auth_attempts.clone());
        registry.register("app_active_sessions",  "活跃会话数",   active_sessions.clone());
        registry.register("app_db_query_duration_seconds", "DB 查询耗时", db_query_duration.clone());

        Self { auth_attempts, active_sessions, db_query_duration }
    }
}

在 Service 层埋点:

// src/services/auth_service.rs
pub async fn login(&self, dto: LoginDto) -> Result<TokenResponse, AppError> {
    let result = self.do_login(&dto).await;

    // 记录登录结果
    self.metrics.auth_attempts
        .get_or_create(&AuthLabels {
            method: "login".into(),
            result: if result.is_ok() { "success" } else { "failure" }.into(),
        })
        .inc();

    result
}

20.4 Prometheus 抓取配置

# prometheus.yml
scrape_configs:
  - job_name: my-api
    static_configs:
      - targets: ["my-api-svc:80"]
    metrics_path: /metrics
    scrape_interval: 15s

K8s 中使用注解让 Prometheus 自动发现:

# deployment.yaml 中 Pod template annotations 追加
annotations:
  prometheus.io/scrape: "true"
  prometheus.io/path:   "/metrics"
  prometheus.io/port:   "8080"

20.5 推荐 Grafana Dashboard 面板

面板 PromQL 示例
QPS rate(axum_http_requests_total[1m])
P99 延迟 histogram_quantile(0.99, rate(axum_http_requests_duration_seconds_bucket[5m]))
错误率 rate(axum_http_requests_total{status=~"5.."}[1m]) / rate(axum_http_requests_total[1m])
活跃请求 axum_http_requests_pending
DB 查询 P95 histogram_quantile(0.95, rate(app_db_query_duration_seconds_bucket[5m]))
登录失败率 rate(app_auth_attempts{method="login",result="failure"}[5m])

21. 文件上传与对象存储

21.1 新增依赖

[dependencies]
# AWS S3 兼容(也支持 MinIO、Cloudflare R2)
aws-config     = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-s3     = "1"

# 文件类型检测(防止伪造 Content-Type)
infer          = "0.16"

# 流式处理大文件
tokio-util     = { version = "0.7", features = ["io"] }
bytes          = "1"

21.2 S3 客户端初始化

// src/storage/s3.rs
use aws_config::BehaviorVersion;
use aws_sdk_s3::{config::Region, Client};
use crate::config::Settings;

pub async fn create_s3_client(settings: &Settings) -> Client {
    let config = aws_config::defaults(BehaviorVersion::latest())
        .region(Region::new(settings.storage.region.clone()))
        // 兼容 MinIO:设置自定义 endpoint
        .endpoint_url(&settings.storage.endpoint_url)
        .load()
        .await;

    Client::new(&config)
}

配置新增:

# config/default.toml
[storage]
bucket          = "my-api-uploads"
region          = "ap-northeast-1"
endpoint_url    = ""                  # MinIO 时填写,如 "http://minio:9000"
max_file_size   = 10485760            # 10MB(字节)
allowed_types   = ["image/jpeg", "image/png", "image/webp", "application/pdf"]

21.3 文件上传 Handler

// src/handlers/upload.rs
use axum::{
    extract::{Multipart, State},
    http::StatusCode,
    response::IntoResponse,
    Json,
};
use bytes::Bytes;
use uuid::Uuid;
use crate::{errors::AppError, extractors::auth::AuthUser, state::AppState};

const MAX_FILE_SIZE: usize = 10 * 1024 * 1024;  // 10MB

#[utoipa::path(
    post,
    path = "/api/v1/upload",
    tag = "Upload",
    security(("bearer_auth" = [])),
    request_body(content_type = "multipart/form-data"),
    responses(
        (status = 201, description = "上传成功", body = UploadResponse),
        (status = 400, description = "文件类型不支持或超出大小限制"),
    )
)]
pub async fn upload_file(
    State(state): State<AppState>,
    AuthUser(claims): AuthUser,
    mut multipart: Multipart,
) -> Result<impl IntoResponse, AppError> {
    let field = multipart
        .next_field()
        .await
        .map_err(|e| AppError::Validation(format!("Failed to read multipart: {e}")))?
        .ok_or_else(|| AppError::Validation("No file field found".into()))?;

    let original_name = field
        .file_name()
        .unwrap_or("unknown")
        .to_string();

    // 读取文件内容
    let data: Bytes = field
        .bytes()
        .await
        .map_err(|e| AppError::Validation(format!("Failed to read file: {e}")))?;

    // 大小检查
    if data.len() > MAX_FILE_SIZE {
        return Err(AppError::Validation(
            format!("File too large: {} bytes (max {})", data.len(), MAX_FILE_SIZE)
        ));
    }

    // 文件类型检测(读取文件魔数,而非信任 Content-Type 头)
    let mime = infer::get(&data)
        .map(|t| t.mime_type())
        .unwrap_or("application/octet-stream");

    let allowed = &state.config.storage.allowed_types;
    if !allowed.iter().any(|t| t == mime) {
        return Err(AppError::Validation(
            format!("File type '{mime}' is not allowed")
        ));
    }

    // 生成唯一存储路径:users/{user_id}/{uuid}.{ext}
    let ext = original_name.rsplit('.').next().unwrap_or("bin");
    let key = format!("users/{}/{}.{}", claims.sub, Uuid::new_v7(uuid::timestamp::Timestamp::now(uuid::NoContext)), ext);

    // 上传至 S3
    state.s3
        .put_object()
        .bucket(&state.config.storage.bucket)
        .key(&key)
        .body(data.into())
        .content_type(mime)
        // 防止直接访问,通过 CDN 或预签名 URL 提供
        .acl(aws_sdk_s3::types::ObjectCannedAcl::Private)
        .send()
        .await
        .map_err(|e| AppError::Internal(format!("S3 upload failed: {e}")))?;

    let url = format!("{}/{}/{}", state.config.storage.cdn_base_url, state.config.storage.bucket, key);

    Ok((
        StatusCode::CREATED,
        Json(serde_json::json!({
            "success": true,
            "data": { "key": key, "url": url, "mime_type": mime, "size": data.len() }
        })),
    ))
}

21.4 生成预签名下载 URL

对私有文件,不直接暴露 S3 URL,而是生成有时效的预签名链接:

// src/services/storage_service.rs
use aws_sdk_s3::presigning::PresigningConfig;
use std::time::Duration;

pub async fn get_presigned_url(
    s3: &aws_sdk_s3::Client,
    bucket: &str,
    key: &str,
    expires_in: Duration,
) -> Result<String, AppError> {
    let presigned = s3
        .get_object()
        .bucket(bucket)
        .key(key)
        .presigned(
            PresigningConfig::expires_in(expires_in)
                .map_err(|e| AppError::Internal(e.to_string()))?,
        )
        .await
        .map_err(|e| AppError::Internal(format!("Failed to generate presigned URL: {e}")))?;

    Ok(presigned.uri().to_string())
}

Handler 中使用:

pub async fn get_file_url(
    State(state): State<AppState>,
    AuthUser(_claims): AuthUser,
    Path(key): Path<String>,
) -> Result<impl IntoResponse, AppError> {
    let url = get_presigned_url(
        &state.s3,
        &state.config.storage.bucket,
        &key,
        Duration::from_secs(3600),  // 1 小时有效
    ).await?;

    Ok(Json(serde_json::json!({ "success": true, "data": { "url": url } })))
}

21.5 大文件分片上传(Multipart Upload)

超过 100MB 的文件使用 S3 分片上传,避免超时和内存溢出:

// src/services/storage_service.rs
use aws_sdk_s3::types::CompletedMultipartUpload;
use aws_sdk_s3::types::CompletedPart;

pub async fn multipart_upload(
    s3: &aws_sdk_s3::Client,
    bucket: &str,
    key: &str,
    data: Bytes,
) -> Result<(), AppError> {
    const PART_SIZE: usize = 5 * 1024 * 1024;  // 5MB(S3 最小分片)

    // 1. 创建分片上传任务
    let upload = s3.create_multipart_upload()
        .bucket(bucket)
        .key(key)
        .send()
        .await
        .map_err(|e| AppError::Internal(e.to_string()))?;

    let upload_id = upload.upload_id().unwrap();

    // 2. 并发上传各分片
    let chunks: Vec<Bytes> = data.chunks(PART_SIZE)
        .map(|c| Bytes::copy_from_slice(c))
        .collect();

    let mut parts = Vec::new();
    for (i, chunk) in chunks.iter().enumerate() {
        let part_number = (i + 1) as i32;
        let resp = s3.upload_part()
            .bucket(bucket)
            .key(key)
            .upload_id(upload_id)
            .part_number(part_number)
            .body(chunk.clone().into())
            .send()
            .await
            .map_err(|e| AppError::Internal(e.to_string()))?;

        parts.push(
            CompletedPart::builder()
                .part_number(part_number)
                .e_tag(resp.e_tag().unwrap_or_default())
                .build(),
        );
    }

    // 3. 完成分片上传
    s3.complete_multipart_upload()
        .bucket(bucket)
        .key(key)
        .upload_id(upload_id)
        .multipart_upload(
            CompletedMultipartUpload::builder()
                .set_parts(Some(parts))
                .build(),
        )
        .send()
        .await
        .map_err(|e| AppError::Internal(e.to_string()))?;

    Ok(())
}

21.6 路由注册

// src/routes/api_v1.rs 追加
use axum::routing::post;
use crate::handlers::upload;

// 在 protected 路由组中添加
.route("/upload",          post(upload::upload_file))
.route("/files/:key/url",  get(upload::get_file_url))
点赞

发表评论

昵称和uid可以选填一个,填邮箱必填(留言回复后将会发邮件给你)
tips:输入uid可以快速获得你的昵称和头像

Title - Artist
0:00