- 1:目录
- 2:1. 技术选型概览
- 2.1:整体架构图
- 2.2:设计原则
- 3:2. 核心依赖库详解
- 3.1:2.1 Web 框架层
- 3.2:2.2 数据库层
- 3.3:2.3 序列化与验证
- 3.4:2.4 认证与安全
- 3.5:2.5 配置与环境
- 3.6:2.6 日志与追踪
- 3.7:2.7 类型与工具
- 3.8:2.8 异步任务
- 4:3. 项目目录结构
- 5:4. 分层架构设计
- 5.1:4.1 各层职责
- 5.2:4.2 AppState 设计
- 6:5. 数据库与 ORM
- 6.1:5.1 sqlx 使用模式
- 6.2:5.2 数据库迁移
- 7:6. 错误处理体系
- 7.1:6.1 错误类型定义
- 8:7. 认证与授权
- 8.1:7.1 JWT 结构
- 8.2:7.2 RBAC 权限控制
- 9:8. 中间件设计
- 9.1:8.1 中间件注册顺序
- 9.2:8.2 限流中间件
- 10:9. 配置管理
- 10.1:9.1 配置结构体
- 10.2:9.2 配置加载
- 11:10. 日志与可观测性
- 11.1:10.1 初始化链路追踪
- 11.2:10.2 在 Handler 中使用
- 12:11. 异步任务与队列
- 12.1:11.1 定时任务
- 12.2:11.2 后台任务(Tokio Spawn)
- 13:12. API 响应规范
- 13.1:12.1 统一响应结构
- 14:13. Cargo.toml 完整依赖
- 15:14. 启动流程与代码示例
- 15.1:14.1 main.rs
- 15.2:14.2 路由注册示例
- 16:附录:常用命令速查
- 17:15. Docker 容器化
- 17.1:15.1 Dockerfile(多阶段构建)
- 17.2:15.2 .dockerignore
- 17.3:15.3 docker-compose.yml(本地开发环境)
- 17.4:15.4 docker-compose.override.yml(热重载开发模式)
- 17.5:15.5 构建与运行命令
- 17.6:15.6 生产环境优化:全静态二进制(musl)
- 18:16. Kubernetes 部署
- 18.1:目录结构
- 18.2:16.1 Namespace
- 18.3:16.2 ConfigMap(非敏感配置)
- 18.4:16.3 Secret(敏感配置)
- 18.5:16.4 Deployment
- 18.6:16.5 Service
- 18.7:16.6 Ingress(以 Nginx Ingress Controller 为例)
- 18.8:16.7 HorizontalPodAutoscaler(自动扩缩容)
- 18.9:16.8 PodDisruptionBudget(节点维护保护)
- 18.10:16.9 健康检查 Handler
- 18.11:16.10 常用 kubectl 命令
- 19:17. CI/CD 流程
- 19.1:17.1 流程总览
- 19.2:17.2 CI 工作流
- 19.3:17.3 CD 工作流
- 19.4:17.4 分支策略与 Git Flow
- 19.5:17.5 Secrets 配置清单
- 20:18. 测试策略
- 20.1:18.1 测试分层
- 20.2:18.2 单元测试
- 20.3:18.3 集成测试工具函数
- 20.4:18.4 API 集成测试示例
- 20.5:18.5 Mock 外部 HTTP 服务
- 20.6:18.6 测试覆盖率
- 21:19. API 文档自动生成
- 21.1:19.1 新增依赖
- 21.2:19.2 为 DTO 添加 Schema 注解
- 21.3:19.3 为 Handler 添加路径注解
- 21.4:19.4 组装 OpenAPI 文档并挂载 Swagger UI
- 22:20. Prometheus Metrics 监控
- 22.1:20.1 新增依赖
- 22.2:20.2 内置 HTTP 指标
- 22.3:20.3 自定义业务指标
- 22.4:20.4 Prometheus 抓取配置
- 22.5:20.5 推荐 Grafana Dashboard 面板
- 23:21. 文件上传与对象存储
- 23.1:21.1 新增依赖
- 23.2:21.2 S3 客户端初始化
- 23.3:21.3 文件上传 Handler
- 23.4:21.4 生成预签名下载 URL
- 23.5:21.5 大文件分片上传(Multipart Upload)
- 23.6:21.6 路由注册
Rust + Axum 后端架构设计文档
目录
- 技术选型概览
- 核心依赖库详解
- 项目目录结构
- 分层架构设计
- 数据库与 ORM
- 错误处理体系
- 认证与授权
- 中间件设计
- 配置管理
- 日志与可观测性
- 异步任务与队列
- API 响应规范
- Cargo.toml 完整依赖
- 启动流程与代码示例
- Docker 容器化
- Kubernetes 部署
- CI/CD 流程
- 测试策略
- API 文档自动生成
- Prometheus Metrics 监控
- 文件上传与对象存储
1. 技术选型概览
整体架构图
┌─────────────────────────────────────────────────────┐
│ 客户端 / API 消费方 │
└──────────────────────────┬──────────────────────────┘
│ HTTP / WebSocket
┌──────────────────────────▼──────────────────────────┐
│ 反向代理 (Nginx / Caddy) │
└──────────────────────────┬──────────────────────────┘
│
┌──────────────────────────▼──────────────────────────┐
│ Axum Web 服务器 │
│ ┌────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ 中间件层 │ │ 路由层 │ │ 提取器层 │ │
│ │ (Tower) │ │ (Router) │ │ (Extractor) │ │
│ └────────────┘ └──────────────┘ └─────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Handler 层 │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Service 层 │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Repository 层 │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────┐ ┌──────────┐ ┌──────────────┐
│ PostgreSQL │ │ Redis │ │ 消息队列 │
│ (sqlx) │ │ (deadpool│ │ (RabbitMQ/ │
└─────────────┘ │ -redis) │ │ Kafka) │
└──────────┘ └──────────────┘
设计原则
- 依赖注入:通过 Axum 的
State传递应用状态,避免全局变量 - 分层解耦:Handler → Service → Repository,各层职责清晰
- 类型安全:充分利用 Rust 类型系统,在编译期消除运行时错误
- 异步优先:全链路
async/await,基于 Tokio 运行时 - 可观测性:结构化日志 + 链路追踪 + 指标上报
2. 核心依赖库详解
2.1 Web 框架层
axum — Web 框架核心
axum = { version = "0.8", features = ["macros", "ws", "multipart"] }
Tokio 团队出品的异步 Web 框架,基于 Tower 中间件生态。核心特性:
- 宏友好的路由声明(#[debug_handler])
- 零成本的请求提取器(Path, Query, Json, State)
- 原生支持 WebSocket、SSE、Multipart
- 与 Tower 中间件完全兼容
tokio — 异步运行时
tokio = { version = "1", features = ["full"] }
Rust 生态事实标准的异步运行时,提供:
- 多线程工作窃取调度器
- 异步 I/O(网络、文件)
- 定时器、通道、同步原语
- #[tokio::main] 宏简化入口
tower / tower-http — 中间件生态
tower = { version = "0.5", features = ["full"] }
tower-http = { version = "0.6", features = [
"cors", "trace", "compression-gzip", "request-id",
"timeout", "limit", "fs", "auth"
] }
Tower 提供 Service / Layer trait,实现可组合的中间件。tower-http 包含:
- CorsLayer — CORS 跨域控制
- TraceLayer — 自动请求/响应追踪
- CompressionLayer — Gzip/Brotli 响应压缩
- RequestIdLayer — 自动注入请求 ID
- TimeoutLayer — 请求超时控制
- RequestBodyLimitLayer — 请求体大小限制
2.2 数据库层
sqlx — 异步 SQL 工具库
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls", "postgres", "uuid",
"chrono", "json", "migrate"
] }
编译期验证 SQL 语句的异步数据库库,核心特性:
- 编译期 SQL 检查:query! 宏在编译时验证 SQL 语法和类型
- 零 ORM 开销:直接映射到 Rust 结构体,无 N+1 风险
- 内置连接池:PgPool 支持连接复用
- 迁移管理:sqlx migrate CLI 管理数据库版本
sea-orm(可选替代)— 全功能 ORM
sea-orm = { version = "1", features = [
"sqlx-postgres", "runtime-tokio-rustls", "macros"
] }
适合需要实体关系映射、复杂查询构建器的场景。与 sqlx 相比提供更高层抽象,但有一定运行时开销。
deadpool-redis — Redis 连接池
deadpool-redis = { version = "0.18", features = ["rt_tokio_1"] }
redis = { version = "0.27", features = ["tokio-comp", "json"] }
高性能 Redis 连接池,支持连接复用、健康检查、自动重连。用于:
- Session / JWT Token 黑名单
- 缓存热点数据
- 分布式锁
- 限流计数器
2.3 序列化与验证
serde / serde_json — 序列化框架
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Rust 生态最重要的序列化库,#[derive(Serialize, Deserialize)] 零模板代码实现 JSON 互转。
validator — 请求体验证
validator = { version = "0.19", features = ["derive"] }
声明式验证宏,支持:
- 字符串长度、正则、邮箱、URL 验证
- 数字范围验证
- 自定义验证函数
- 嵌套结构验证
#[derive(Debug, Deserialize, Validate)]
pub struct CreateUserDto {
#[validate(length(min = 2, max = 50))]
pub name: String,
#[validate(email)]
pub email: String,
#[validate(length(min = 8))]
pub password: String,
}
2.4 认证与安全
jsonwebtoken — JWT 处理
jsonwebtoken = "9"
JWT 的编码、解码、验证,支持 HS256/RS256/ES256 等算法。
argon2 — 密码哈希
argon2 = "0.5"
password-hash = { version = "0.5", features = ["std"] }
Argon2id 是目前密码存储的最佳实践算法,抗 GPU 暴力破解,内存硬化。
axum-extra — 扩展提取器
axum-extra = { version = "0.10", features = ["typed-header", "cookie"] }
提供 TypedHeader 提取器(自动解析 Authorization: Bearer <token>)和 Cookie 管理。
2.5 配置与环境
config — 多环境配置
config = "0.14"
分层配置加载,支持 TOML/YAML/JSON/环境变量,优先级:环境变量 > 环境配置文件 > 默认配置。
dotenvy — .env 文件加载
dotenvy = "0.15"
在开发环境从 .env 文件注入环境变量。
2.6 日志与追踪
tracing / tracing-subscriber — 结构化日志
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-opentelemetry = "0.28"
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
分布式链路追踪 + 结构化日志一体化方案:
- tracing 提供 span!, info!, error! 等宏
- tracing-subscriber 配置输出格式(JSON/Pretty)和过滤级别
- opentelemetry-otlp 将 trace 上报至 Jaeger / Tempo
2.7 类型与工具
uuid — UUID 生成
uuid = { version = "1", features = ["v4", "v7", "serde"] }
生成 UUID v4(随机)或 v7(时间有序,适合数据库主键排序)。
chrono — 时间处理
chrono = { version = "0.4", features = ["serde"] }
全功能时间库,处理时区转换、时间格式化、时间运算。
thiserror — 错误定义
thiserror = "2"
通过 #[derive(Error)] 宏简洁定义错误枚举,自动实现 Display 和 Error trait。
anyhow — 错误传播
anyhow = "1"
在不需要精确匹配错误类型的场景(如初始化代码)中方便地传播任意错误。
bon — 构建者模式
bon = "3"
通过 #[builder] 宏为结构体自动生成类型安全的 Builder,替代冗长的手写 Builder 代码。
2.8 异步任务
tokio-cron-scheduler — 定时任务
tokio-cron-scheduler = "0.13"
基于 Tokio 的 Cron 调度器,支持标准 Cron 表达式,在同一进程内运行定时任务。
lapin — RabbitMQ 客户端(可选)
lapin = "2"
异步 AMQP 0-9-1 客户端,用于接入 RabbitMQ 消息队列实现异步任务处理。
3. 项目目录结构
my-api/
├── Cargo.toml
├── Cargo.lock
├── .env # 本地开发环境变量(不提交 git)
├── .env.example # 环境变量模板
├── config/
│ ├── default.toml # 默认配置
│ ├── development.toml # 开发环境配置
│ ├── production.toml # 生产环境配置
│ └── test.toml # 测试环境配置
├── migrations/ # sqlx 数据库迁移文件
│ ├── 20260101000000_create_users.sql
│ └── 20260102000000_create_posts.sql
├── src/
│ ├── main.rs # 程序入口
│ ├── lib.rs # 库入口(便于集成测试)
│ ├── app.rs # Axum 应用组装
│ ├── config/
│ │ ├── mod.rs
│ │ └── settings.rs # 配置结构体定义
│ ├── db/
│ │ ├── mod.rs
│ │ └── pool.rs # 数据库连接池初始化
│ ├── cache/
│ │ ├── mod.rs
│ │ └── redis.rs # Redis 连接池初始化
│ ├── errors/
│ │ ├── mod.rs
│ │ └── app_error.rs # 全局错误类型定义
│ ├── extractors/
│ │ ├── mod.rs
│ │ ├── auth.rs # JWT 认证提取器
│ │ └── validated_json.rs # 带验证的 JSON 提取器
│ ├── middleware/
│ │ ├── mod.rs
│ │ ├── auth.rs # 认证中间件
│ │ └── rate_limit.rs # 限流中间件
│ ├── models/ # 数据库模型(对应表结构)
│ │ ├── mod.rs
│ │ └── user.rs
│ ├── dto/ # 数据传输对象(请求/响应结构体)
│ │ ├── mod.rs
│ │ └── user.rs
│ ├── repositories/ # 数据访问层
│ │ ├── mod.rs
│ │ └── user_repository.rs
│ ├── services/ # 业务逻辑层
│ │ ├── mod.rs
│ │ └── user_service.rs
│ ├── handlers/ # HTTP 处理器层
│ │ ├── mod.rs
│ │ ├── health.rs
│ │ └── user.rs
│ ├── routes/ # 路由注册
│ │ ├── mod.rs
│ │ ├── api_v1.rs
│ │ └── health.rs
│ ├── state.rs # 全局 AppState 定义
│ └── telemetry.rs # 日志与追踪初始化
└── tests/
├── common/
│ └── mod.rs # 集成测试通用工具
└── api/
└── user_tests.rs # 用户接口集成测试
4. 分层架构设计
4.1 各层职责
请求
│
▼
┌──────────────────────────────────────────┐
│ Middleware 层 │
│ - 认证检查(JWT 验证) │
│ - 请求日志记录 │
│ - 限流 │
│ - CORS / 压缩 │
└──────────────────┬───────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Handler 层(handlers/) │
│ - 解析并验证请求参数 │
│ - 调用 Service 层 │
│ - 将结果转换为 HTTP 响应 │
│ - 不包含任何业务逻辑 │
└──────────────────┬───────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Service 层(services/) │
│ - 核心业务逻辑 │
│ - 事务管理 │
│ - 调用多个 Repository 进行数据组合 │
│ - 调用外部服务(邮件、短信、对象存储等) │
└──────────────────┬───────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Repository 层(repositories/) │
│ - 封装所有数据库操作 │
│ - 只关心数据的增删改查 │
│ - 返回 Model 或领域对象 │
│ - 无业务逻辑 │
└──────────────────┬───────────────────────┘
│
▼
数据库 / 缓存
4.2 AppState 设计
AppState 是注入所有依赖的容器,通过 Axum 的 State 提取器在 handler 中访问:
// src/state.rs
use std::sync::Arc;
use sqlx::PgPool;
use deadpool_redis::Pool as RedisPool;
use crate::config::Settings;
#[derive(Clone)]
pub struct AppState {
pub db: PgPool,
pub redis: RedisPool,
pub config: Arc<Settings>,
}
5. 数据库与 ORM
5.1 sqlx 使用模式
连接池初始化:
// src/db/pool.rs
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use crate::config::Settings;
pub async fn create_pool(settings: &Settings) -> anyhow::Result<PgPool> {
PgPoolOptions::new()
.max_connections(settings.database.max_connections)
.min_connections(settings.database.min_connections)
.acquire_timeout(std::time::Duration::from_secs(5))
.connect(&settings.database.url)
.await
.map_err(Into::into)
}
Repository 实现模式:
// src/repositories/user_repository.rs
use sqlx::PgPool;
use uuid::Uuid;
use crate::models::user::User;
use crate::errors::AppError;
pub struct UserRepository {
pool: PgPool,
}
impl UserRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
pub async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, AppError> {
let user = sqlx::query_as!(
User,
"SELECT * FROM users WHERE id = 1 AND deleted_at IS NULL",
id
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
pub async fn find_by_email(&self, email: &str) -> Result<Option<User>, AppError> {
let user = sqlx::query_as!(
User,
"SELECT * FROM users WHERE email =1 AND deleted_at IS NULL",
email
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
pub async fn create(&self, name: &str, email: &str, password_hash: &str) -> Result<User, AppError> {
let user = sqlx::query_as!(
User,
r#"
INSERT INTO users (id, name, email, password_hash, created_at, updated_at)
VALUES (1,2, 3,4, NOW(), NOW())
RETURNING *
"#,
Uuid::new_v4(),
name,
email,
password_hash
)
.fetch_one(&self.pool)
.await?;
Ok(user)
}
}
5.2 数据库迁移
-- migrations/20260101000000_create_users.sql
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role VARCHAR(50) NOT NULL DEFAULT 'user',
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);
运行迁移:
sqlx migrate run --database-url $DATABASE_URL
6. 错误处理体系
6.1 错误类型定义
// src/errors/app_error.rs
use axum::{http::StatusCode, response::{IntoResponse, Response}, Json};
use serde_json::json;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AppError {
// 业务错误
#[error("Resource not found: {0}")]
NotFound(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("Forbidden: {0}")]
Forbidden(String),
#[error("Validation error: {0}")]
Validation(String),
#[error("Conflict: {0}")]
Conflict(String),
// 基础设施错误(内部错误,不暴露给客户端)
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Redis error: {0}")]
Redis(#[from] deadpool_redis::PoolError),
#[error("Internal error: {0}")]
Internal(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, code, message) = match &self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND, "NOT_FOUND", msg.clone()),
AppError::Unauthorized(msg) => (StatusCode::UNAUTHORIZED, "UNAUTHORIZED", msg.clone()),
AppError::Forbidden(msg) => (StatusCode::FORBIDDEN, "FORBIDDEN", msg.clone()),
AppError::Validation(msg) => (StatusCode::UNPROCESSABLE_ENTITY, "VALIDATION_ERROR", msg.clone()),
AppError::Conflict(msg) => (StatusCode::CONFLICT, "CONFLICT", msg.clone()),
// 基础设施错误统一返回 500,不暴露内部细节
AppError::Database(e) => {
tracing::error!(error = %e, "Database error");
(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
}
AppError::Redis(e) => {
tracing::error!(error = %e, "Redis error");
(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
}
AppError::Internal(msg) => {
tracing::error!(error = %msg, "Internal error");
(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", "An internal error occurred".into())
}
};
let body = Json(json!({
"success": false,
"code": code,
"message": message,
}));
(status, body).into_response()
}
}
7. 认证与授权
7.1 JWT 结构
// src/extractors/auth.rs
use axum::{async_trait, extract::FromRequestParts, http::request::Parts};
use axum_extra::{headers::{Authorization, Bearer}, TypedHeader};
use jsonwebtoken::{decode, DecodingKey, Validation};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{errors::AppError, state::AppState};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
pub sub: Uuid, // 用户 ID
pub email: String,
pub role: String,
pub exp: usize, // 过期时间(Unix timestamp)
pub iat: usize, // 签发时间
}
/// 认证提取器:自动从请求头提取并验证 JWT
pub struct AuthUser(pub Claims);
#[async_trait]
impl FromRequestParts<AppState> for AuthUser {
type Rejection = AppError;
async fn from_request_parts(
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
// 提取 Authorization: Bearer <token>
let TypedHeader(Authorization(bearer)) =
TypedHeader::<Authorization<Bearer>>::from_request_parts(parts, state)
.await
.map_err(|_| AppError::Unauthorized("Missing or invalid Authorization header".into()))?;
// 检查 token 是否在黑名单(已登出)
let token = bearer.token();
if is_token_blacklisted(&state.redis, token).await? {
return Err(AppError::Unauthorized("Token has been revoked".into()));
}
// 验证并解码 JWT
let secret = state.config.auth.jwt_secret.as_bytes();
let token_data = decode::<Claims>(token, &DecodingKey::from_secret(secret), &Validation::default())
.map_err(|e| AppError::Unauthorized(format!("Invalid token: {e}")))?;
Ok(AuthUser(token_data.claims))
}
}
async fn is_token_blacklisted(redis: &deadpool_redis::Pool, token: &str) -> Result<bool, AppError> {
use redis::AsyncCommands;
let mut conn = redis.get().await?;
let key = format!("blacklist:token:{token}");
let exists: bool = conn.exists(&key).await.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(exists)
}
7.2 RBAC 权限控制
/// 角色提取器:在 AuthUser 基础上限制角色
pub struct RequireRole<const ROLE: &'static str>(pub Claims);
// 使用示例:
// async fn admin_handler(RequireRole::<"admin">(claims): RequireRole<"admin">) -> ...
#[async_trait]
impl<const ROLE: &'static str> FromRequestParts<AppState> for RequireRole<ROLE> {
type Rejection = AppError;
async fn from_request_parts(parts: &mut Parts, state: &AppState) -> Result<Self, Self::Rejection> {
let AuthUser(claims) = AuthUser::from_request_parts(parts, state).await?;
if claims.role != ROLE {
return Err(AppError::Forbidden("Insufficient permissions".into()));
}
Ok(RequireRole(claims))
}
}
8. 中间件设计
8.1 中间件注册顺序
中间件按照由外到内的顺序包裹路由,请求流过时从上到下,响应返回时从下到上:
// src/app.rs
use tower_http::{
cors::CorsLayer,
trace::TraceLayer,
compression::CompressionLayer,
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
timeout::TimeoutLayer,
limit::RequestBodyLimitLayer,
};
pub fn build_app(state: AppState) -> Router {
Router::new()
.merge(routes::health::router())
.nest("/api/v1", routes::api_v1::router())
.with_state(state)
// 中间件从下往上包裹(底部最先执行)
.layer(
tower::ServiceBuilder::new()
// 1. 设置请求 ID(最外层,确保每个请求都有唯一 ID)
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
// 2. 链路追踪
.layer(TraceLayer::new_for_http())
// 3. 超时控制
.layer(TimeoutLayer::new(Duration::from_secs(30)))
// 4. 请求体大小限制(10MB)
.layer(RequestBodyLimitLayer::new(10 * 1024 * 1024))
// 5. Gzip 响应压缩
.layer(CompressionLayer::new())
// 6. 传播请求 ID 到响应头
.layer(PropagateRequestIdLayer::x_request_id())
// 7. CORS(最内层)
.layer(build_cors_layer()),
)
}
8.2 限流中间件
// src/middleware/rate_limit.rs
use std::sync::Arc;
use axum::{extract::Request, middleware::Next, response::Response};
use redis::AsyncCommands;
use crate::{errors::AppError, state::AppState};
pub async fn rate_limit_middleware(
axum::extract::State(state): axum::extract::State<AppState>,
req: Request,
next: Next,
) -> Result<Response, AppError> {
// 以 IP 地址为限流 key(生产环境可换为用户 ID)
let ip = req
.headers()
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
let key = format!("rate_limit:{}:{}", ip, chrono::Utc::now().timestamp() / 60);
let mut conn = state.redis.get().await?;
let count: i64 = conn
.incr(&key, 1_i64)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
if count == 1 {
// 第一次请求时设置 60 秒过期
let _: () = conn.expire(&key, 60).await.map_err(|e| AppError::Internal(e.to_string()))?;
}
let limit = state.config.server.rate_limit_per_minute;
if count > limit as i64 {
return Err(AppError::Validation("Rate limit exceeded. Please slow down.".into()));
}
Ok(next.run(req).await)
}
9. 配置管理
9.1 配置结构体
// src/config/settings.rs
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
pub server: ServerSettings,
pub database: DatabaseSettings,
pub redis: RedisSettings,
pub auth: AuthSettings,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
pub host: String,
pub port: u16,
pub rate_limit_per_minute: u32,
pub env: String, // "development" | "production" | "test"
}
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseSettings {
pub url: String,
pub max_connections: u32,
pub min_connections: u32,
}
#[derive(Debug, Deserialize, Clone)]
pub struct RedisSettings {
pub url: String,
pub max_connections: usize,
}
#[derive(Debug, Deserialize, Clone)]
pub struct AuthSettings {
pub jwt_secret: String,
pub jwt_expires_in_seconds: u64,
pub refresh_token_expires_in_days: u64,
}
9.2 配置加载
// src/config/mod.rs
pub use settings::Settings;
mod settings;
use config::{Config, Environment, File};
pub fn load_settings() -> anyhow::Result<Settings> {
let env = std::env::var("APP_ENV").unwrap_or_else(|_| "development".into());
let settings = Config::builder()
// 默认配置(基础)
.add_source(File::with_name("config/default"))
// 环境专属配置(覆盖默认值)
.add_source(File::with_name(&format!("config/{env}")).required(false))
// 环境变量(最高优先级,格式:APP__SERVER__PORT=8080)
.add_source(Environment::with_prefix("APP").separator("__"))
.build()?
.try_deserialize()?;
Ok(settings)
}
config/default.toml:
[server]
host = "0.0.0.0"
port = 8080
rate_limit_per_minute = 100
env = "development"
[database]
max_connections = 10
min_connections = 2
[auth]
jwt_expires_in_seconds = 3600 # 1 小时
refresh_token_expires_in_days = 30
10. 日志与可观测性
10.1 初始化链路追踪
// src/telemetry.rs
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
pub fn init_tracing(env: &str) {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,sqlx=warn,hyper=warn"));
let subscriber = tracing_subscriber::registry().with(env_filter);
if env == "production" {
// 生产环境:JSON 格式,方便日志系统解析(Loki / CloudWatch)
subscriber
.with(tracing_subscriber::fmt::layer().json())
.init();
} else {
// 开发环境:彩色人类可读格式
subscriber
.with(tracing_subscriber::fmt::layer().pretty())
.init();
}
}
10.2 在 Handler 中使用
#[tracing::instrument(name = "create_user", skip(state, body))]
pub async fn create_user(
State(state): State<AppState>,
ValidatedJson(body): ValidatedJson<CreateUserDto>,
) -> Result<impl IntoResponse, AppError> {
tracing::info!(email = %body.email, "Creating new user");
let user = state.user_service.create_user(body).await?;
tracing::info!(user_id = %user.id, "User created successfully");
Ok((StatusCode::CREATED, Json(ApiResponse::success(user))))
}
11. 异步任务与队列
11.1 定时任务
// 在 main.rs 中注册定时任务
use tokio_cron_scheduler::{Job, JobScheduler};
pub async fn start_scheduler(state: AppState) -> anyhow::Result<()> {
let sched = JobScheduler::new().await?;
// 每天凌晨 3 点清理过期的 Token 黑名单
sched.add(Job::new_async("0 0 3 * * *", move |_, _| {
let state = state.clone();
Box::pin(async move {
if let Err(e) = cleanup_expired_tokens(&state.redis).await {
tracing::error!(error = %e, "Failed to cleanup expired tokens");
}
})
})?).await?;
sched.start().await?;
Ok(())
}
11.2 后台任务(Tokio Spawn)
对于需要异步执行但不阻塞请求的任务(如发送欢迎邮件):
// 在 Service 层
pub async fn register_user(&self, dto: CreateUserDto) -> Result<User, AppError> {
let user = self.user_repo.create(&dto.name, &dto.email, &hash).await?;
// fire-and-forget:发送欢迎邮件不阻塞注册响应
let email_service = self.email_service.clone();
let user_clone = user.clone();
tokio::spawn(async move {
if let Err(e) = email_service.send_welcome(&user_clone).await {
tracing::error!(error = %e, user_id = %user_clone.id, "Failed to send welcome email");
}
});
Ok(user)
}
12. API 响应规范
12.1 统一响应结构
// src/dto/response.rs
use serde::Serialize;
#[derive(Serialize)]
pub struct ApiResponse<T: Serialize> {
pub success: bool,
pub data: Option<T>,
pub message: Option<String>,
pub meta: Option<PaginationMeta>,
}
#[derive(Serialize)]
pub struct PaginationMeta {
pub total: i64,
pub page: i64,
pub per_page: i64,
pub total_pages: i64,
}
impl<T: Serialize> ApiResponse<T> {
pub fn success(data: T) -> Self {
Self { success: true, data: Some(data), message: None, meta: None }
}
pub fn paginated(data: T, meta: PaginationMeta) -> Self {
Self { success: true, data: Some(data), message: None, meta: Some(meta) }
}
}
成功响应示例:
{
"success": true,
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "Alice",
"email": "alice@example.com",
"created_at": "2026-04-29T10:00:00Z"
},
"message": null,
"meta": null
}
错误响应示例:
{
"success": false,
"code": "VALIDATION_ERROR",
"message": "Email is already in use"
}
13. Cargo.toml 完整依赖
[package]
name = "my-api"
version = "0.1.0"
edition = "2021"
[dependencies]
# ── Web 框架 ──────────────────────────────────────────────────
axum = { version = "0.8", features = ["macros", "ws", "multipart"] }
axum-extra = { version = "0.10", features = ["typed-header", "cookie"] }
tokio = { version = "1", features = ["full"] }
tower = { version = "0.5", features = ["full"] }
tower-http = { version = "0.6", features = [
"cors", "trace", "compression-gzip",
"request-id", "timeout", "limit", "auth"
] }
# ── 数据库 ────────────────────────────────────────────────────
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls", "postgres",
"uuid", "chrono", "json", "migrate"
] }
# ── 缓存 ──────────────────────────────────────────────────────
deadpool-redis = { version = "0.18", features = ["rt_tokio_1"] }
redis = { version = "0.27", features = ["tokio-comp", "json"] }
# ── 序列化 ────────────────────────────────────────────────────
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# ── 验证 ──────────────────────────────────────────────────────
validator = { version = "0.19", features = ["derive"] }
# ── 认证与安全 ────────────────────────────────────────────────
jsonwebtoken = "9"
argon2 = "0.5"
password-hash = { version = "0.5", features = ["std"] }
# ── 配置 ──────────────────────────────────────────────────────
config = "0.14"
dotenvy = "0.15"
# ── 日志与追踪 ────────────────────────────────────────────────
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-opentelemetry = "0.28"
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
# ── 类型与工具 ────────────────────────────────────────────────
uuid = { version = "1", features = ["v4", "v7", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
bon = "3"
# ── 错误处理 ──────────────────────────────────────────────────
thiserror = "2"
anyhow = "1"
# ── 异步任务 ──────────────────────────────────────────────────
tokio-cron-scheduler = "0.13"
[dev-dependencies]
# 集成测试 HTTP 客户端
reqwest = { version = "0.12", features = ["json"] }
# 集成测试框架辅助
axum-test = "16"
# 随机测试数据生成
fake = { version = "3", features = ["derive", "chrono", "uuid"] }
[profile.release]
opt-level = 3
lto = "thin"
strip = true
14. 启动流程与代码示例
14.1 main.rs
use std::sync::Arc;
use dotenvy::dotenv;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. 加载 .env(仅开发环境)
dotenv().ok();
// 2. 加载配置
let settings = config::load_settings()?;
// 3. 初始化日志与链路追踪
telemetry::init_tracing(&settings.server.env);
tracing::info!("Starting {} server...", env!("CARGO_PKG_NAME"));
// 4. 初始化数据库连接池
let db = db::pool::create_pool(&settings).await?;
// 5. 运行数据库迁移
sqlx::migrate!("./migrations").run(&db).await?;
tracing::info!("Database migrations applied");
// 6. 初始化 Redis 连接池
let redis = cache::redis::create_pool(&settings)?;
// 7. 组装 AppState
let state = AppState {
db,
redis,
config: Arc::new(settings.clone()),
};
// 8. 启动定时任务
tasks::start_scheduler(state.clone()).await?;
// 9. 构建 Axum 应用
let app = app::build_app(state);
// 10. 启动 HTTP 服务器
let addr = format!("{}:{}", settings.server.host, settings.server.port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
tracing::info!("Server listening on http://{addr}");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
/// 监听 Ctrl+C 和 SIGTERM 信号,实现优雅停机
async fn shutdown_signal() {
use tokio::signal;
let ctrl_c = async { signal::ctrl_c().await.expect("failed to listen for ctrl_c") };
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => tracing::info!("Received Ctrl+C, shutting down..."),
_ = terminate => tracing::info!("Received SIGTERM, shutting down..."),
}
}
14.2 路由注册示例
// src/routes/api_v1.rs
use axum::{middleware, routing::{get, post, put, delete}, Router};
use crate::{handlers, middleware::rate_limit::rate_limit_middleware, state::AppState};
pub fn router() -> Router<AppState> {
let public = Router::new()
.route("/auth/register", post(handlers::auth::register))
.route("/auth/login", post(handlers::auth::login))
.route("/health", get(handlers::health::check));
let protected = Router::new()
.route("/users/me", get(handlers::user::get_me))
.route("/users/me", put(handlers::user::update_me))
.route("/users/:id", get(handlers::user::get_by_id))
// 限流仅应用于 protected 路由组
.layer(middleware::from_fn_with_state(
AppState::default(), // 实际使用时通过 with_state 注入
rate_limit_middleware,
));
Router::new()
.merge(public)
.merge(protected)
}
附录:常用命令速查
# 开发热重载(需安装 cargo-watch)
cargo watch -x run
# 运行数据库迁移
sqlx migrate run --database-url DATABASE_URL
# 撤销最近一次迁移
sqlx migrate revert --database-urlDATABASE_URL
# 创建新迁移
sqlx migrate add create_posts_table
# 生成 sqlx 离线缓存(CI 环境无数据库时使用)
cargo sqlx prepare
# 运行所有测试
cargo test
# 运行集成测试
cargo test --test '*'
# Release 构建
cargo build --release
# 检查安全漏洞
cargo audit
15. Docker 容器化
15.1 Dockerfile(多阶段构建)
Rust 编译产物是静态二进制,非常适合多阶段构建:构建镜像负责编译,运行镜像只放最终二进制,体积可压缩到 ~20MB。
# syntax=docker/dockerfile:1.7
# ────────────────────────────────────────────────
# Stage 1: 依赖缓存层
# 单独复制 Cargo 文件,利用 Docker 层缓存加速重复构建
# ────────────────────────────────────────────────
FROM rust:1.82-bookworm AS chef
RUN cargo install cargo-chef --locked
WORKDIR /app
FROM chef AS planner
COPY . .
# 生成依赖清单(recipe.json),内容仅在 Cargo.toml/Cargo.lock 变化时改变
RUN cargo chef prepare --recipe-path recipe.json
# ────────────────────────────────────────────────
# Stage 2: 编译层
# ────────────────────────────────────────────────
FROM chef AS builder
# 安装 sqlx 离线模式所需的 musl 工具链(可选,用于全静态二进制)
# RUN rustup target add x86_64-unknown-linux-musl
# RUN apt-get update && apt-get install -y musl-tools
# 先只编译依赖(利用缓存:只要依赖不变,此层命中缓存)
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
# 再复制源代码并编译应用本体
COPY . .
# 设置 sqlx 离线模式,避免编译时连接数据库
ENV SQLX_OFFLINE=true
RUN cargo build --release --bin my-api
# ────────────────────────────────────────────────
# Stage 3: 运行镜像
# 使用 distroless 或 debian-slim,不含编译器和源码
# ────────────────────────────────────────────────
FROM debian:bookworm-slim AS runtime
# 安装运行时必要的 CA 证书(HTTPS 请求需要)和时区数据
RUN apt-get update && apt-get install -y \
ca-certificates \
tzdata \
&& rm -rf /var/lib/apt/lists/*
# 创建非 root 运行用户(安全最佳实践)
RUN groupadd --gid 1001 appgroup \
&& useradd --uid 1001 --gid appgroup --shell /bin/bash --create-home appuser
WORKDIR /app
# 从构建层复制编译产物
COPY --from=builder /app/target/release/my-api ./my-api
# 复制配置文件(不含敏感信息)
COPY config/ ./config/
# 复制数据库迁移文件(运行时执行 migrate)
COPY migrations/ ./migrations/
# 切换到非 root 用户
USER appuser
# 暴露应用端口
EXPOSE 8080
# 健康检查(每 30 秒,失败 3 次标记为 unhealthy)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
ENTRYPOINT ["./my-api"]
关键设计点说明:
| 设计 | 原因 |
|---|---|
cargo-chef 两步构建 |
将依赖编译层与源码编译层分离,依赖不变时跳过漫长的依赖编译 |
SQLX_OFFLINE=true |
构建期间无法连接数据库,使用 cargo sqlx prepare 生成的离线缓存 |
debian:bookworm-slim |
比 debian:bookworm 小 ~60%,保留 glibc(兼容动态链接库) |
| 非 root 用户 | 容器进程以 uid 1001 运行,防止容器逃逸后拥有宿主机 root 权限 |
HEALTHCHECK |
Kubernetes / Docker Compose 依赖此检查决定是否路由流量 |
15.2 .dockerignore
# Git
.git
.gitignore
# 构建产物(构建镜像内会重新生成)
target/
# 开发环境文件
.env
.env.*
!.env.example
# 编辑器
.vscode/
.idea/
*.swp
*.swo
# 文档
*.md
docs/
# CI
.github/
15.3 docker-compose.yml(本地开发环境)
# docker-compose.yml
version: "3.9"
services:
# ── 应用服务 ──────────────────────────────────────
api:
build:
context: .
dockerfile: Dockerfile
target: runtime # 使用生产镜像
image: my-api:dev
container_name: my-api
restart: unless-stopped
ports:
- "8080:8080"
environment:
APP_ENV: development
APP__DATABASE__URL: postgres://postgres:postgres@db:5432/mydb
APP__REDIS__URL: redis://redis:6379
APP__AUTH__JWT_SECRET: supersecretkey_change_in_production
RUST_LOG: info,sqlx=warn
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
networks:
- app-net
volumes:
# 挂载本地配置(开发时覆盖容器内配置,无需重新构建镜像)
- ./config:/app/config:ro
# ── PostgreSQL ────────────────────────────────────
db:
image: postgres:17-alpine
container_name: my-api-db
restart: unless-stopped
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: mydb
ports:
- "5432:5432" # 暴露端口供本机 GUI 工具连接
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d mydb"]
interval: 5s
timeout: 5s
retries: 10
networks:
- app-net
# ── Redis ─────────────────────────────────────────
redis:
image: redis:7-alpine
container_name: my-api-redis
restart: unless-stopped
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
networks:
- app-net
# ── 数据库管理 UI(开发用)────────────────────────
adminer:
image: adminer:latest
container_name: my-api-adminer
restart: unless-stopped
ports:
- "8888:8080"
depends_on:
- db
networks:
- app-net
profiles:
- tools # 只在 --profile tools 时启动
volumes:
postgres_data:
redis_data:
networks:
app-net:
driver: bridge
15.4 docker-compose.override.yml(热重载开发模式)
使用 cargo-watch + 源码挂载,在容器内实现热重载,无需每次修改代码都重新构建镜像:
# docker-compose.override.yml(git 忽略,仅本地使用)
version: "3.9"
services:
api:
# 覆盖为开发镜像(含编译工具链)
build:
target: builder
image: my-api:dev-watch
command: cargo watch -x run
environment:
RUST_LOG: debug,sqlx=debug
volumes:
# 挂载源代码(实现热重载)
- .:/app
# 使用具名卷保留编译缓存,避免每次重启都重新编译
- cargo_cache:/usr/local/cargo/registry
- target_cache:/app/target
ports:
- "8080:8080"
volumes:
cargo_cache:
target_cache:
15.5 构建与运行命令
# ── 生产构建 ──────────────────────────────────────
# 首次构建(生成 sqlx 离线查询缓存,需要数据库连接)
cargo sqlx prepare
# 构建生产镜像
docker build -t my-api:latest .
# 查看镜像大小
docker images my-api
# ── 本地开发(docker-compose)────────────────────
# 启动所有服务(后台运行)
docker compose up -d
# 启动含工具服务(Adminer)
docker compose --profile tools up -d
# 热重载开发模式(自动合并 override 文件)
docker compose up
# 查看应用日志(实时)
docker compose logs -f api
# 进入应用容器调试
docker compose exec api /bin/bash
# 手动执行数据库迁移
docker compose exec api ./my-api migrate
# 停止并清理(保留数据卷)
docker compose down
# 停止并清理(含数据卷,慎用)
docker compose down -v
# ── 镜像推送 ──────────────────────────────────────
# 打标签并推送到私有镜像仓库
docker tag my-api:latest registry.example.com/my-api:1.0.0
docker push registry.example.com/my-api:1.0.0
15.6 生产环境优化:全静态二进制(musl)
如果目标是最小化镜像(使用 scratch 或 distroless),可以编译为静态链接的 musl 二进制:
FROM rust:1.82-bookworm AS builder
# 安装 musl 工具链
RUN rustup target add x86_64-unknown-linux-musl
RUN apt-get update && apt-get install -y musl-tools
WORKDIR /app
COPY . .
ENV SQLX_OFFLINE=true
# 编译为全静态二进制
RUN cargo build --release --target x86_64-unknown-linux-musl --bin my-api
# ── 使用 scratch 镜像(最小,~10MB)────────────────
FROM scratch AS runtime
COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/my-api /my-api
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY config/ /config/
COPY migrations/ /migrations/
EXPOSE 8080
ENTRYPOINT ["/my-api"]
注意:
scratch镜像没有 shell,调试困难。生产推荐gcr.io/distroless/cc-debian12,它包含最小运行时但无 shell。
16. Kubernetes 部署
目录结构
k8s/
├── namespace.yaml
├── configmap.yaml
├── secret.yaml
├── deployment.yaml
├── service.yaml
├── ingress.yaml
├── hpa.yaml
└── pdb.yaml
16.1 Namespace
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: my-api
labels:
name: my-api
16.2 ConfigMap(非敏感配置)
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: my-api-config
namespace: my-api
data:
APP_ENV: "production"
APP__SERVER__HOST: "0.0.0.0"
APP__SERVER__PORT: "8080"
APP__SERVER__RATE_LIMIT_PER_MINUTE: "100"
APP__DATABASE__MAX_CONNECTIONS: "20"
APP__DATABASE__MIN_CONNECTIONS: "5"
RUST_LOG: "info,sqlx=warn,hyper=warn"
TZ: "Asia/Shanghai"
16.3 Secret(敏感配置)
生产环境推荐用 Vault 或 External Secrets Operator 替代裸 Secret,此处为最小化示例。
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: my-api-secret
namespace: my-api
type: Opaque
# 值须经过 base64 编码:echo -n "value" | base64
stringData:
APP__DATABASE__URL: "postgres://user:pass@postgres-svc:5432/mydb"
APP__REDIS__URL: "redis://:pass@redis-svc:6379"
APP__AUTH__JWT_SECRET: "your-super-secret-key-change-in-production"
16.4 Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-api
namespace: my-api
labels:
app: my-api
spec:
replicas: 3
selector:
matchLabels:
app: my-api
# 滚动更新策略:最多下线 1 个旧 Pod,最多超出 1 个新 Pod
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
metadata:
labels:
app: my-api
annotations:
# 当 ConfigMap/Secret 内容变化时强制滚动更新
configmap-checksum: "{{ checksum configmap.yaml }}"
spec:
# ── 安全上下文 ─────────────────────────────────
securityContext:
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
# ── 优雅终止等待时间(秒)────────────────────
terminationGracePeriodSeconds: 30
# ── 镜像拉取凭证(私有仓库)──────────────────
imagePullSecrets:
- name: registry-credentials
containers:
- name: my-api
image: registry.example.com/my-api:1.0.0 # 由 CI 注入具体 tag
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8080
protocol: TCP
# ── 环境变量:来自 ConfigMap ──────────────
envFrom:
- configMapRef:
name: my-api-config
- secretRef:
name: my-api-secret
# ── 资源限制 ─────────────────────────────
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"
# ── 存活探针:进程假死时重启 Pod ──────────
livenessProbe:
httpGet:
path: /health/live
port: http
initialDelaySeconds: 10
periodSeconds: 15
timeoutSeconds: 3
failureThreshold: 3
# ── 就绪探针:未就绪时不路由流量 ──────────
readinessProbe:
httpGet:
path: /health/ready
port: http
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
# ── 启动探针:给慢启动留出时间 ────────────
startupProbe:
httpGet:
path: /health/live
port: http
failureThreshold: 12 # 最多等待 12 * 5s = 60s
periodSeconds: 5
# ── 容器安全上下文 ────────────────────────
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
# ── 只读根文件系统时挂载可写临时目录 ──────
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir: {}
# ── 拓扑分散约束:Pod 分布到不同节点 ─────────
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: my-api
16.5 Service
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: my-api-svc
namespace: my-api
labels:
app: my-api
spec:
type: ClusterIP # 集群内部访问;对外由 Ingress 暴露
selector:
app: my-api
ports:
- name: http
port: 80
targetPort: http # 引用容器 port name
protocol: TCP
16.6 Ingress(以 Nginx Ingress Controller 为例)
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: my-api-ingress
namespace: my-api
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
# 请求体大小限制
nginx.ingress.kubernetes.io/proxy-body-size: "10m"
# 启用 HTTPS 重定向
nginx.ingress.kubernetes.io/ssl-redirect: "true"
# cert-manager 自动签发 TLS 证书
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
ingressClassName: nginx
tls:
- hosts:
- api.example.com
secretName: my-api-tls
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: my-api-svc
port:
name: http
16.7 HorizontalPodAutoscaler(自动扩缩容)
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: my-api-hpa
namespace: my-api
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: my-api
minReplicas: 2
maxReplicas: 10
metrics:
# CPU 使用率超过 70% 时扩容
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# 内存使用率超过 80% 时扩容
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
# 扩容:立即响应,每次最多新增 2 个 Pod
scaleUp:
stabilizationWindowSeconds: 30
policies:
- type: Pods
value: 2
periodSeconds: 60
# 缩容:稳定 5 分钟后才缩,防止抖动
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 20
periodSeconds: 60
16.8 PodDisruptionBudget(节点维护保护)
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: my-api-pdb
namespace: my-api
spec:
# 节点驱逐/升级时,始终保持至少 2 个 Pod 在线
minAvailable: 2
selector:
matchLabels:
app: my-api
16.9 健康检查 Handler
Deployment 中引用的三个探针端点对应代码实现:
// src/handlers/health.rs
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
use crate::state::AppState;
/// /health/live — 存活检查:进程是否正常运行
pub async fn liveness() -> impl IntoResponse {
(StatusCode::OK, Json(json!({ "status": "ok" })))
}
/// /health/ready — 就绪检查:是否可以接收流量(含依赖检查)
pub async fn readiness(State(state): State<AppState>) -> impl IntoResponse {
// 检查数据库连通性
let db_ok = sqlx::query("SELECT 1")
.execute(&state.db)
.await
.is_ok();
// 检查 Redis 连通性
let redis_ok = state.redis.get().await.is_ok();
if db_ok && redis_ok {
(StatusCode::OK, Json(json!({ "status": "ready", "db": "ok", "redis": "ok" })))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"status": "not ready",
"db": if db_ok { "ok" } else { "error" },
"redis": if redis_ok { "ok" } else { "error" },
})),
)
}
}
// src/routes/health.rs
use axum::{routing::get, Router};
use crate::{handlers::health, state::AppState};
pub fn router() -> Router<AppState> {
Router::new()
.route("/health/live", get(health::liveness))
.route("/health/ready", get(health::readiness))
}
16.10 常用 kubectl 命令
# 应用所有 k8s 资源
kubectl apply -f k8s/
# 查看 Pod 状态
kubectl get pods -n my-api -w
# 查看滚动更新进度
kubectl rollout status deployment/my-api -n my-api
# 回滚到上一个版本
kubectl rollout undo deployment/my-api -n my-api
# 查看 Pod 日志(实时)
kubectl logs -f deployment/my-api -n my-api
# 查看某个 Pod 的详细事件(排查 CrashLoopBackOff)
kubectl describe pod <pod-name> -n my-api
# 手动触发扩缩容
kubectl scale deployment/my-api --replicas=5 -n my-api
# 查看 HPA 当前状态
kubectl get hpa -n my-api
# 进入 Pod 调试
kubectl exec -it <pod-name> -n my-api -- /bin/sh
# 强制滚动更新(镜像 tag 未变时触发)
kubectl rollout restart deployment/my-api -n my-api
17. CI/CD 流程
17.1 流程总览
代码推送 / PR
│
▼
┌─────────────────────────────────┐
│ CI — GitHub Actions │
│ │
│ 1. fmt + clippy(代码质量) │
│ 2. 单元测试 + 集成测试 │
│ 3. cargo audit(安全扫描) │
│ 4. Docker 镜像构建 & 推送 │
│ 5. sqlx 离线缓存验证 │
└───────────────┬─────────────────┘
│ main 分支合并后
▼
┌─────────────────────────────────┐
│ CD — GitHub Actions │
│ │
│ 1. 更新 k8s Deployment 镜像 tag │
│ 2. kubectl apply │
│ 3. 等待 rollout 完成 │
│ 4. 烟雾测试(curl /health) │
│ 5. 失败自动回滚 │
└─────────────────────────────────┘
17.2 CI 工作流
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
CARGO_TERM_COLOR: always
SQLX_OFFLINE: true # 不连接数据库,使用离线缓存
jobs:
# ── 代码质量检查 ──────────────────────────────────
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy
- name: Cache cargo
uses: Swatinem/rust-cache@v2
- name: Check formatting
run: cargo fmt --all -- --check
- name: Run clippy
run: cargo clippy --all-targets --all-features -- -D warnings
# ── 测试 ──────────────────────────────────────────
test:
name: Test
runs-on: ubuntu-latest
services:
postgres:
image: postgres:17-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 5s
--health-timeout 5s
--health-retries 10
redis:
image: redis:7-alpine
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 5s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
- name: Cache cargo
uses: Swatinem/rust-cache@v2
- name: Run database migrations
run: cargo sqlx migrate run
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/test_db
- name: Run tests
run: cargo test --all-features
env:
APP__DATABASE__URL: postgres://postgres:postgres@localhost:5432/test_db
APP__REDIS__URL: redis://localhost:6379
APP__AUTH__JWT_SECRET: test-secret-key
APP_ENV: test
# ── 安全审计 ──────────────────────────────────────
audit:
name: Security Audit
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: rustsec/audit-check@v1
with:
token: {{ secrets.GITHUB_TOKEN }}
# ── sqlx 离线缓存一致性验证 ───────────────────────
sqlx-check:
name: SQLx Offline Check
runs-on: ubuntu-latest
services:
postgres:
image: postgres:17-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: check_db
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 5s
--health-retries 10
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: Install sqlx-cli
run: cargo install sqlx-cli --no-default-features --features postgres
- name: Run migrations
run: cargo sqlx migrate run
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/check_db
- name: Check sqlx offline cache is up-to-date
run: cargo sqlx prepare --check
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/check_db
# ── 构建并推送 Docker 镜像 ────────────────────────
build:
name: Build&Push Image
runs-on: ubuntu-latest
needs: [lint, test, audit, sqlx-check]
if: github.ref == 'refs/heads/main'
outputs:
image-tag:{{ steps.meta.outputs.version }}
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to registry
uses: docker/login-action@v3
with:
registry: {{ secrets.REGISTRY_URL }}
username:{{ secrets.REGISTRY_USERNAME }}
password: {{ secrets.REGISTRY_PASSWORD }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images:{{ secrets.REGISTRY_URL }}/my-api
tags: |
type=sha,prefix=,format=short
type=ref,event=branch
type=semver,pattern={{version}}
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: {{ steps.meta.outputs.tags }}
labels:{{ steps.meta.outputs.labels }}
# 使用 GitHub Actions 缓存加速构建
cache-from: type=gha
cache-to: type=gha,mode=max
# 构建参数
build-args: |
BUILDTIME={{ github.event.repository.updated_at }}
VERSION={{ steps.meta.outputs.version }}
REVISION=${{ github.sha }}
17.3 CD 工作流
# .github/workflows/cd.yml
name: CD
on:
workflow_run:
workflows: ["CI"]
types: [completed]
branches: [main]
jobs:
deploy:
name: Deploy to Kubernetes
runs-on: ubuntu-latest
if: {{ github.event.workflow_run.conclusion == 'success' }}
environment: production # 需要手动审批(在 GitHub Environments 配置)
steps:
- uses: actions/checkout@v4
# ── 配置 kubectl ──────────────────────────────
- name: Set up kubectl
uses: azure/setup-kubectl@v4
- name: Configure kubeconfig
run: |
mkdir -p ~/.kube
echo "{{ secrets.KUBECONFIG }}" | base64 -d > ~/.kube/config
chmod 600 ~/.kube/config
# ── 获取本次构建的镜像 tag ────────────────────
- name: Get image tag
id: image
run: |
SHA=(echo{{ github.sha }} | cut -c1-7)
echo "tag={{ secrets.REGISTRY_URL }}/my-api:{SHA}" >> GITHUB_OUTPUT
# ── 更新 Deployment 镜像 ──────────────────────
- name: Update image
run: |
kubectl set image deployment/my-api \
my-api={{ steps.image.outputs.tag }} \
-n my-api
# ── 等待滚动更新完成(最多 5 分钟)────────────
- name: Wait for rollout
run: |
kubectl rollout status deployment/my-api \
-n my-api \
--timeout=300s
# ── 烟雾测试:验证健康检查接口 ────────────────
- name: Smoke test
run: |
ENDPOINT="https://api.example.com/health/ready"
for i in (seq 1 5); do
STATUS=(curl -s -o /dev/null -w "%{http_code}" ENDPOINT)
if [ "STATUS" = "200" ]; then
echo "Smoke test passed (attempt i)"
exit 0
fi
echo "Attempti failed (status STATUS), retrying..."
sleep 10
done
echo "Smoke test failed after 5 attempts"
exit 1
# ── 失败时自动回滚 ────────────────────────────
- name: Rollback on failure
if: failure()
run: |
echo "Deployment failed, rolling back..."
kubectl rollout undo deployment/my-api -n my-api
kubectl rollout status deployment/my-api -n my-api --timeout=120s
# ── 通知(Slack / 钉钉)──────────────────────
- name: Notify success
if: success()
uses: slackapi/slack-github-action@v2
with:
payload: |
{
"text": ":white_check_mark: *my-api* deployed successfully\nTag: `{{ steps.image.outputs.tag }}`\nCommit: {{ github.sha }}"
}
env:
SLACK_WEBHOOK_URL:{{ secrets.SLACK_WEBHOOK_URL }}
- name: Notify failure
if: failure()
uses: slackapi/slack-github-action@v2
with:
payload: |
{
"text": ":x: *my-api* deployment FAILED and rolled back\nCommit: {{ github.sha }}\nSee:{{ github.server_url }}/{{ github.repository }}/actions/runs/{{ github.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
17.4 分支策略与 Git Flow
main ──────────────────────────────────────────► 生产部署
▲ ▲
│ PR + Review │ PR + Review
│ │
develop ─────────────────────────────────────► 测试环境
▲ ▲ ▲
│ │ │
feat/xxx fix/yyy chore/zzz
| 分支 | 触发动作 |
|---|---|
feat/* / fix/* |
仅运行 lint + test |
develop 合并 |
运行全量 CI,自动部署至 staging 环境 |
main 合并 |
运行全量 CI + CD,手动审批后部署至 production |
Tag v*.*.* |
额外构建带版本号的镜像,生成 GitHub Release |
17.5 Secrets 配置清单
在 GitHub 仓库 Settings → Secrets and variables → Actions 中配置:
| Secret 名称 | 说明 |
|---|---|
REGISTRY_URL |
镜像仓库地址(如 ghcr.io/org) |
REGISTRY_USERNAME |
仓库登录用户名 |
REGISTRY_PASSWORD |
仓库登录密码 / Token |
KUBECONFIG |
base64 编码的 kubeconfig 文件 |
SLACK_WEBHOOK_URL |
Slack 通知 Webhook 地址 |
18. 测试策略
18.1 测试分层
┌─────────────────────────────────────┐
│ E2E 测试(极少量,验证关键路径) │ ← 慢、成本高
├─────────────────────────────────────┤
│ 集成测试(API 层,含真实 DB/Redis) │ ← 中速、覆盖业务
├─────────────────────────────────────┤
│ 单元测试(Service/纯函数逻辑) │ ← 快、覆盖细节
└─────────────────────────────────────┘
新增依赖:
[dev-dependencies]
axum-test = "16" # 内存级 HTTP 测试,无需绑端口
reqwest = { version = "0.12", features = ["json"] }
sqlx = { version = "0.8", features = ["test"] }
tokio = { version = "1", features = ["full", "test-util"] }
fake = { version = "3", features = ["derive", "uuid", "chrono"] }
wiremock = "0.6" # Mock 外部 HTTP 服务
serial_test = "3" # 需要串行执行的测试(共享 DB 状态)
18.2 单元测试
针对 Service 层纯业务逻辑,Mock Repository 依赖:
// src/services/user_service.rs 末尾
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_password_hash_and_verify() {
let plain = "my_secure_password";
let hash = hash_password(plain).unwrap();
assert_ne!(plain, hash);
assert!(verify_password(plain, &hash).unwrap());
assert!(!verify_password("wrong_password", &hash).unwrap());
}
#[tokio::test]
async fn test_generate_jwt_and_parse_claims() {
let user_id = uuid::Uuid::new_v4();
let secret = "test-secret";
let token = generate_jwt(user_id, "user", secret, 3600).unwrap();
let claims = parse_jwt(&token, secret).unwrap();
assert_eq!(claims.sub, user_id);
assert_eq!(claims.role, "user");
}
#[tokio::test]
async fn test_expired_jwt_returns_error() {
let token = generate_jwt(uuid::Uuid::new_v4(), "user", "secret", 0).unwrap();
// 过期时间为 0,立即过期
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let result = parse_jwt(&token, "secret");
assert!(result.is_err());
}
}
18.3 集成测试工具函数
// tests/common/mod.rs
use axum_test::TestServer;
use sqlx::PgPool;
use my_api::{app::build_app, state::AppState, config::load_settings};
use std::sync::Arc;
pub struct TestApp {
pub server: TestServer,
pub db: PgPool,
}
impl TestApp {
pub async fn spawn() -> Self {
// 加载测试环境配置
std::env::set_var("APP_ENV", "test");
dotenvy::from_filename(".env.test").ok();
let settings = load_settings().expect("Failed to load test settings");
// 连接数据库,每个测试使用独立 schema 隔离
let db = PgPool::connect(&settings.database.url).await.unwrap();
let schema = format!("test_{}", uuid::Uuid::new_v4().simple());
sqlx::query(&format!("CREATE SCHEMA {schema}"))
.execute(&db)
.await
.unwrap();
sqlx::query(&format!("SET search_path TO {schema}"))
.execute(&db)
.await
.unwrap();
sqlx::migrate!("./migrations").run(&db).await.unwrap();
let redis = my_api::cache::redis::create_pool(&settings).unwrap();
let state = AppState { db: db.clone(), redis, config: Arc::new(settings) };
let app = build_app(state);
let server = TestServer::new(app).unwrap();
TestApp { server, db }
}
}
/// 快捷注册 + 登录,返回 Bearer token
pub async fn create_user_and_login(app: &TestApp) -> String {
use fake::{faker::internet::en::*, Fake};
let email: String = SafeEmail().fake();
let password = "Test@123456";
app.server
.post("/api/v1/auth/register")
.json(&serde_json::json!({ "name": "Test User", "email": email, "password": password }))
.await
.assert_status_created();
let resp = app.server
.post("/api/v1/auth/login")
.json(&serde_json::json!({ "email": email, "password": password }))
.await;
resp.assert_status_ok();
let body: serde_json::Value = resp.json();
format!("Bearer {}", body["data"]["access_token"].as_str().unwrap())
}
18.4 API 集成测试示例
// tests/api/user_tests.rs
mod common;
use common::{TestApp, create_user_and_login};
#[tokio::test]
async fn test_register_success() {
let app = TestApp::spawn().await;
let resp = app.server
.post("/api/v1/auth/register")
.json(&serde_json::json!({
"name": "Alice",
"email": "alice@example.com",
"password": "Password@123"
}))
.await;
resp.assert_status_created();
let body: serde_json::Value = resp.json();
assert_eq!(body["success"], true);
assert_eq!(body["data"]["email"], "alice@example.com");
}
#[tokio::test]
async fn test_register_duplicate_email_returns_conflict() {
let app = TestApp::spawn().await;
let payload = serde_json::json!({
"name": "Bob", "email": "bob@example.com", "password": "Password@123"
});
app.server.post("/api/v1/auth/register").json(&payload).await.assert_status_created();
// 第二次注册相同邮箱
let resp = app.server.post("/api/v1/auth/register").json(&payload).await;
resp.assert_status(axum::http::StatusCode::CONFLICT);
}
#[tokio::test]
async fn test_get_me_without_token_returns_401() {
let app = TestApp::spawn().await;
app.server.get("/api/v1/users/me").await.assert_status_unauthorized();
}
#[tokio::test]
async fn test_get_me_with_valid_token() {
let app = TestApp::spawn().await;
let token = create_user_and_login(&app).await;
let resp = app.server
.get("/api/v1/users/me")
.add_header("Authorization", token)
.await;
resp.assert_status_ok();
let body: serde_json::Value = resp.json();
assert_eq!(body["success"], true);
assert!(body["data"]["id"].is_string());
}
#[tokio::test]
async fn test_validation_rejects_invalid_email() {
let app = TestApp::spawn().await;
let resp = app.server
.post("/api/v1/auth/register")
.json(&serde_json::json!({
"name": "X",
"email": "not-an-email",
"password": "short"
}))
.await;
resp.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY);
let body: serde_json::Value = resp.json();
assert_eq!(body["code"], "VALIDATION_ERROR");
}
18.5 Mock 外部 HTTP 服务
使用 wiremock Mock 第三方 API(如邮件服务、短信网关):
// tests/api/email_tests.rs
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
#[tokio::test]
async fn test_register_triggers_welcome_email() {
// 启动本地 Mock 服务器
let mock_server = MockServer::start().await;
// 注册预期请求:POST /v1/mail/send 返回 202
Mock::given(method("POST"))
.and(path("/v1/mail/send"))
.respond_with(ResponseTemplate::new(202))
.expect(1) // 断言必须被调用 1 次
.mount(&mock_server)
.await;
// 将 Mock 服务器地址注入到应用配置
std::env::set_var("APP__EMAIL__BASE_URL", mock_server.uri());
let app = TestApp::spawn().await;
app.server
.post("/api/v1/auth/register")
.json(&serde_json::json!({
"name": "Carol", "email": "carol@example.com", "password": "Password@123"
}))
.await
.assert_status_created();
// wiremock 在 drop 时自动验证 expect(1)
}
18.6 测试覆盖率
# 安装 cargo-tarpaulin(Linux)
cargo install cargo-tarpaulin
# 生成覆盖率报告(HTML)
cargo tarpaulin --out Html --output-dir coverage/
# 或使用 cargo-llvm-cov(跨平台)
cargo install cargo-llvm-cov
cargo llvm-cov --html --output-dir coverage/
# CI 中输出 lcov 格式上传 Codecov
cargo llvm-cov --lcov --output-path lcov.info
在 CI 中添加覆盖率上报:
# .github/workflows/ci.yml 追加步骤
- name: Generate coverage report
run: cargo llvm-cov --lcov --output-path lcov.info
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/test_db
- name: Upload to Codecov
uses: codecov/codecov-action@v4
with:
files: lcov.info
token: ${{ secrets.CODECOV_TOKEN }}
19. API 文档自动生成
使用 utoipa 从代码注解生成 OpenAPI 3.0 规范,并内嵌 Swagger UI。
19.1 新增依赖
[dependencies]
utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "8", features = ["axum"] }
utoipa-redoc = { version = "5", features = ["axum"] } # 可选:更美观的文档 UI
19.2 为 DTO 添加 Schema 注解
// src/dto/user.rs
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use validator::Validate;
#[derive(Debug, Deserialize, Validate, ToSchema)]
pub struct CreateUserDto {
/// 用户显示名称
#[validate(length(min = 2, max = 50))]
#[schema(example = "Alice")]
pub name: String,
/// 用户邮箱(唯一)
#[validate(email)]
#[schema(example = "alice@example.com")]
pub email: String,
/// 登录密码(至少 8 位)
#[validate(length(min = 8))]
#[schema(example = "Password@123", format = "password")]
pub password: String,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct UserResponse {
#[schema(example = "550e8400-e29b-41d4-a716-446655440000")]
pub id: uuid::Uuid,
#[schema(example = "Alice")]
pub name: String,
#[schema(example = "alice@example.com")]
pub email: String,
pub role: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct LoginDto {
#[schema(example = "alice@example.com")]
pub email: String,
#[schema(example = "Password@123", format = "password")]
pub password: String,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct TokenResponse {
pub access_token: String,
pub token_type: String,
pub expires_in: u64,
}
19.3 为 Handler 添加路径注解
// src/handlers/auth.rs
use utoipa::openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme};
/// 用户注册
#[utoipa::path(
post,
path = "/api/v1/auth/register",
tag = "Auth",
request_body = CreateUserDto,
responses(
(status = 201, description = "注册成功", body = ApiResponse<UserResponse>),
(status = 409, description = "邮箱已存在", body = ErrorResponse),
(status = 422, description = "参数校验失败", body = ErrorResponse),
)
)]
pub async fn register(/* ... */) -> impl IntoResponse { /* ... */ }
/// 用户登录
#[utoipa::path(
post,
path = "/api/v1/auth/login",
tag = "Auth",
request_body = LoginDto,
responses(
(status = 200, description = "登录成功", body = ApiResponse<TokenResponse>),
(status = 401, description = "邮箱或密码错误", body = ErrorResponse),
)
)]
pub async fn login(/* ... */) -> impl IntoResponse { /* ... */ }
/// 获取当前用户信息
#[utoipa::path(
get,
path = "/api/v1/users/me",
tag = "User",
security(("bearer_auth" = [])),
responses(
(status = 200, description = "获取成功", body = ApiResponse<UserResponse>),
(status = 401, description = "未认证", body = ErrorResponse),
)
)]
pub async fn get_me(/* ... */) -> impl IntoResponse { /* ... */ }
19.4 组装 OpenAPI 文档并挂载 Swagger UI
// src/openapi.rs
use utoipa::{openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme}, OpenApi};
use utoipa_swagger_ui::SwaggerUi;
#[derive(OpenApi)]
#[openapi(
info(
title = "My API",
version = "1.0.0",
description = "Rust + Axum 后端服务",
contact(name = "Backend Team", email = "backend@example.com"),
license(name = "MIT"),
),
paths(
crate::handlers::auth::register,
crate::handlers::auth::login,
crate::handlers::user::get_me,
),
components(
schemas(
crate::dto::user::CreateUserDto,
crate::dto::user::LoginDto,
crate::dto::user::UserResponse,
crate::dto::user::TokenResponse,
)
),
modifiers(&SecurityAddon),
tags(
(name = "Auth", description = "认证相关接口"),
(name = "User", description = "用户管理接口"),
(name = "Health", description = "健康检查"),
)
)]
pub struct ApiDoc;
struct SecurityAddon;
impl utoipa::Modify for SecurityAddon {
fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
let components = openapi.components.get_or_insert_with(Default::default);
components.add_security_scheme(
"bearer_auth",
SecurityScheme::Http(
HttpBuilder::new()
.scheme(HttpAuthScheme::Bearer)
.bearer_format("JWT")
.build(),
),
);
}
}
/// 注册文档路由
pub fn swagger_router() -> axum::Router {
SwaggerUi::new("/docs")
.url("/api-docs/openapi.json", ApiDoc::openapi())
.into()
}
在 app.rs 中挂载(仅非生产环境):
// src/app.rs
pub fn build_app(state: AppState) -> Router {
let mut app = Router::new()
.merge(routes::health::router())
.nest("/api/v1", routes::api_v1::router())
.with_state(state.clone());
// 生产环境关闭 Swagger UI,避免接口信息泄露
if state.config.server.env != "production" {
app = app.merge(crate::openapi::swagger_router());
}
app.layer(/* 中间件 */)
}
访问地址:http://localhost:8080/docs
20. Prometheus Metrics 监控
20.1 新增依赖
[dependencies]
axum-prometheus = "0.7"
prometheus-client = "0.22" # 自定义指标
20.2 内置 HTTP 指标
axum-prometheus 自动记录每个路由的请求数、延迟分布、状态码分布:
// src/app.rs
use axum_prometheus::{PrometheusMetricLayer, metrics_exporter_prometheus};
pub fn build_app(state: AppState) -> Router {
// 初始化 Prometheus exporter
let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();
Router::new()
// 指标暴露端点(通常由 Prometheus Server 抓取)
.route("/metrics", axum::routing::get(move || async move {
metric_handle.render()
}))
.merge(routes::health::router())
.nest("/api/v1", routes::api_v1::router())
.with_state(state)
.layer(prometheus_layer) // 自动统计所有路由的 HTTP 指标
.layer(/* 其他中间件 */)
}
自动生成的指标:
| 指标名 | 类型 | 说明 |
|---|---|---|
axum_http_requests_total |
Counter | 请求总数(含 method、path、status) |
axum_http_requests_duration_seconds |
Histogram | 请求延迟分布 |
axum_http_requests_pending |
Gauge | 当前正在处理的请求数 |
20.3 自定义业务指标
// src/metrics.rs
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
registry::Registry,
};
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct AuthLabels {
pub method: String, // "register" | "login"
pub result: String, // "success" | "failure"
}
#[derive(Clone)]
pub struct AppMetrics {
/// 注册/登录次数
pub auth_attempts: Family<AuthLabels, Counter>,
/// 当前活跃用户 session 数
pub active_sessions: Gauge,
/// 数据库查询耗时分布
pub db_query_duration: Histogram,
}
impl AppMetrics {
pub fn new(registry: &mut Registry) -> Self {
let auth_attempts = Family::<AuthLabels, Counter>::default();
let active_sessions = Gauge::default();
let db_query_duration = Histogram::new(
[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0].into_iter(),
);
registry.register("app_auth_attempts", "认证尝试次数", auth_attempts.clone());
registry.register("app_active_sessions", "活跃会话数", active_sessions.clone());
registry.register("app_db_query_duration_seconds", "DB 查询耗时", db_query_duration.clone());
Self { auth_attempts, active_sessions, db_query_duration }
}
}
在 Service 层埋点:
// src/services/auth_service.rs
pub async fn login(&self, dto: LoginDto) -> Result<TokenResponse, AppError> {
let result = self.do_login(&dto).await;
// 记录登录结果
self.metrics.auth_attempts
.get_or_create(&AuthLabels {
method: "login".into(),
result: if result.is_ok() { "success" } else { "failure" }.into(),
})
.inc();
result
}
20.4 Prometheus 抓取配置
# prometheus.yml
scrape_configs:
- job_name: my-api
static_configs:
- targets: ["my-api-svc:80"]
metrics_path: /metrics
scrape_interval: 15s
K8s 中使用注解让 Prometheus 自动发现:
# deployment.yaml 中 Pod template annotations 追加
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "8080"
20.5 推荐 Grafana Dashboard 面板
| 面板 | PromQL 示例 |
|---|---|
| QPS | rate(axum_http_requests_total[1m]) |
| P99 延迟 | histogram_quantile(0.99, rate(axum_http_requests_duration_seconds_bucket[5m])) |
| 错误率 | rate(axum_http_requests_total{status=~"5.."}[1m]) / rate(axum_http_requests_total[1m]) |
| 活跃请求 | axum_http_requests_pending |
| DB 查询 P95 | histogram_quantile(0.95, rate(app_db_query_duration_seconds_bucket[5m])) |
| 登录失败率 | rate(app_auth_attempts{method="login",result="failure"}[5m]) |
21. 文件上传与对象存储
21.1 新增依赖
[dependencies]
# AWS S3 兼容(也支持 MinIO、Cloudflare R2)
aws-config = { version = "1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1"
# 文件类型检测(防止伪造 Content-Type)
infer = "0.16"
# 流式处理大文件
tokio-util = { version = "0.7", features = ["io"] }
bytes = "1"
21.2 S3 客户端初始化
// src/storage/s3.rs
use aws_config::BehaviorVersion;
use aws_sdk_s3::{config::Region, Client};
use crate::config::Settings;
pub async fn create_s3_client(settings: &Settings) -> Client {
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(settings.storage.region.clone()))
// 兼容 MinIO:设置自定义 endpoint
.endpoint_url(&settings.storage.endpoint_url)
.load()
.await;
Client::new(&config)
}
配置新增:
# config/default.toml
[storage]
bucket = "my-api-uploads"
region = "ap-northeast-1"
endpoint_url = "" # MinIO 时填写,如 "http://minio:9000"
max_file_size = 10485760 # 10MB(字节)
allowed_types = ["image/jpeg", "image/png", "image/webp", "application/pdf"]
21.3 文件上传 Handler
// src/handlers/upload.rs
use axum::{
extract::{Multipart, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use bytes::Bytes;
use uuid::Uuid;
use crate::{errors::AppError, extractors::auth::AuthUser, state::AppState};
const MAX_FILE_SIZE: usize = 10 * 1024 * 1024; // 10MB
#[utoipa::path(
post,
path = "/api/v1/upload",
tag = "Upload",
security(("bearer_auth" = [])),
request_body(content_type = "multipart/form-data"),
responses(
(status = 201, description = "上传成功", body = UploadResponse),
(status = 400, description = "文件类型不支持或超出大小限制"),
)
)]
pub async fn upload_file(
State(state): State<AppState>,
AuthUser(claims): AuthUser,
mut multipart: Multipart,
) -> Result<impl IntoResponse, AppError> {
let field = multipart
.next_field()
.await
.map_err(|e| AppError::Validation(format!("Failed to read multipart: {e}")))?
.ok_or_else(|| AppError::Validation("No file field found".into()))?;
let original_name = field
.file_name()
.unwrap_or("unknown")
.to_string();
// 读取文件内容
let data: Bytes = field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("Failed to read file: {e}")))?;
// 大小检查
if data.len() > MAX_FILE_SIZE {
return Err(AppError::Validation(
format!("File too large: {} bytes (max {})", data.len(), MAX_FILE_SIZE)
));
}
// 文件类型检测(读取文件魔数,而非信任 Content-Type 头)
let mime = infer::get(&data)
.map(|t| t.mime_type())
.unwrap_or("application/octet-stream");
let allowed = &state.config.storage.allowed_types;
if !allowed.iter().any(|t| t == mime) {
return Err(AppError::Validation(
format!("File type '{mime}' is not allowed")
));
}
// 生成唯一存储路径:users/{user_id}/{uuid}.{ext}
let ext = original_name.rsplit('.').next().unwrap_or("bin");
let key = format!("users/{}/{}.{}", claims.sub, Uuid::new_v7(uuid::timestamp::Timestamp::now(uuid::NoContext)), ext);
// 上传至 S3
state.s3
.put_object()
.bucket(&state.config.storage.bucket)
.key(&key)
.body(data.into())
.content_type(mime)
// 防止直接访问,通过 CDN 或预签名 URL 提供
.acl(aws_sdk_s3::types::ObjectCannedAcl::Private)
.send()
.await
.map_err(|e| AppError::Internal(format!("S3 upload failed: {e}")))?;
let url = format!("{}/{}/{}", state.config.storage.cdn_base_url, state.config.storage.bucket, key);
Ok((
StatusCode::CREATED,
Json(serde_json::json!({
"success": true,
"data": { "key": key, "url": url, "mime_type": mime, "size": data.len() }
})),
))
}
21.4 生成预签名下载 URL
对私有文件,不直接暴露 S3 URL,而是生成有时效的预签名链接:
// src/services/storage_service.rs
use aws_sdk_s3::presigning::PresigningConfig;
use std::time::Duration;
pub async fn get_presigned_url(
s3: &aws_sdk_s3::Client,
bucket: &str,
key: &str,
expires_in: Duration,
) -> Result<String, AppError> {
let presigned = s3
.get_object()
.bucket(bucket)
.key(key)
.presigned(
PresigningConfig::expires_in(expires_in)
.map_err(|e| AppError::Internal(e.to_string()))?,
)
.await
.map_err(|e| AppError::Internal(format!("Failed to generate presigned URL: {e}")))?;
Ok(presigned.uri().to_string())
}
Handler 中使用:
pub async fn get_file_url(
State(state): State<AppState>,
AuthUser(_claims): AuthUser,
Path(key): Path<String>,
) -> Result<impl IntoResponse, AppError> {
let url = get_presigned_url(
&state.s3,
&state.config.storage.bucket,
&key,
Duration::from_secs(3600), // 1 小时有效
).await?;
Ok(Json(serde_json::json!({ "success": true, "data": { "url": url } })))
}
21.5 大文件分片上传(Multipart Upload)
超过 100MB 的文件使用 S3 分片上传,避免超时和内存溢出:
// src/services/storage_service.rs
use aws_sdk_s3::types::CompletedMultipartUpload;
use aws_sdk_s3::types::CompletedPart;
pub async fn multipart_upload(
s3: &aws_sdk_s3::Client,
bucket: &str,
key: &str,
data: Bytes,
) -> Result<(), AppError> {
const PART_SIZE: usize = 5 * 1024 * 1024; // 5MB(S3 最小分片)
// 1. 创建分片上传任务
let upload = s3.create_multipart_upload()
.bucket(bucket)
.key(key)
.send()
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let upload_id = upload.upload_id().unwrap();
// 2. 并发上传各分片
let chunks: Vec<Bytes> = data.chunks(PART_SIZE)
.map(|c| Bytes::copy_from_slice(c))
.collect();
let mut parts = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
let part_number = (i + 1) as i32;
let resp = s3.upload_part()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.part_number(part_number)
.body(chunk.clone().into())
.send()
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
parts.push(
CompletedPart::builder()
.part_number(part_number)
.e_tag(resp.e_tag().unwrap_or_default())
.build(),
);
}
// 3. 完成分片上传
s3.complete_multipart_upload()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(parts))
.build(),
)
.send()
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(())
}
21.6 路由注册
// src/routes/api_v1.rs 追加
use axum::routing::post;
use crate::handlers::upload;
// 在 protected 路由组中添加
.route("/upload", post(upload::upload_file))
.route("/files/:key/url", get(upload::get_file_url))