Skip to content

微服务 —— 结合 Rust 实践

1. 微服务简介

微服务架构(Microservices Architecture)将单体应用拆分为一组小型、独立部署的服务,每个服务围绕单一业务能力构建,通过网络相互通信。

单体 vs 微服务

单体架构:
  ┌──────────────────────────────┐
  │  用户模块 │ 订单模块 │ 支付模块 │
  │  商品模块 │ 通知模块 │ 报表模块 │
  └──────────────────────────────┘
              单一进程部署
  缺点:代码耦合、扩展困难、技术栈单一、一处故障全站宕机

微服务架构:
  [用户服务] ──┐
  [订单服务] ──┼──▶ [API Gateway] ──▶ Client
  [支付服务] ──┤
  [通知服务] ──┘
  每个服务独立部署、独立扩展、独立技术栈

微服务的收益与代价

收益 代价
独立部署,快速迭代 分布式系统复杂度大幅增加
按需扩展,资源利用率高 服务间网络通信引入延迟和故障点
技术栈自由 需要服务发现、链路追踪等基础设施
故障隔离,一个服务宕机不影响全局 分布式事务难以处理
团队可独立开发(康威定律) 运维复杂度高(需要容器编排)

为什么选 Rust 做微服务?

性能:接近 C 的运行效率,极低内存占用(镜像小、冷启动快)
安全:编译期内存安全,无 GC 停顿,适合低延迟要求
并发:async/await + Tokio,高并发无需多线程
生态:Axum / Actix-web / tonic / sqlx 已足够生产使用

2. 核心概念与设计原则

设计原则

单一职责(SRP)    每个服务只做一件事,围绕业务能力划分
高内聚低耦合       服务内部紧密协作,服务间依赖最小化
独立部署           服务可以单独构建、测试、上线,不依赖其他服务
去中心化           数据存储独立,避免共享数据库
容错设计           假设网络和依赖服务随时会失败,设计降级策略
可观测性           日志、指标、链路追踪三位一体

服务拆分策略

按业务领域(DDD 限界上下文):
  电商平台 →
    用户域:注册、登录、个人信息
    商品域:商品管理、库存、价格
    订单域:下单、订单状态机
    支付域:支付渠道、对账
    通知域:短信、邮件、Push

按变更频率:频繁变更的模块单独拆分
按团队边界:一个团队负责一个或多个服务
按性能需求:高并发的服务单独扩展

服务通信模式

同步通信(请求-响应):
  REST/HTTP  ──▶ 简单、通用,适合对外 API
  gRPC       ──▶ 高性能、强类型,适合内部服务间
  GraphQL    ──▶ 灵活查询,适合 BFF 层

异步通信(事件驱动):
  消息队列(Kafka/RabbitMQ)──▶ 解耦,削峰,最终一致性
  事件总线                  ──▶ 服务间事件广播

3. Rust 微服务技术栈全景

# 核心框架
axum            = "0.7"      # HTTP Web 框架(Tower 生态)
tonic           = "0.11"     # gRPC 框架
tower           = "0.4"      # 中间件抽象层
tower-http      = "0.5"      # HTTP 中间件(限流、压缩、CORS 等)

# 异步运行时
tokio           = { version = "1", features = ["full"] }

# 序列化
serde           = { version = "1", features = ["derive"] }
serde_json      = "1"

# 数据库
sqlx            = { version = "0.7", features = ["postgres", "runtime-tokio", "migrate"] }
sea-orm         = "0.12"     # 全功能 ORM(可选)

# 可观测性
tracing         = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
opentelemetry   = "0.22"
metrics         = "0.22"
metrics-exporter-prometheus = "0.13"

# 服务发现
consul-client   = "0.5"      # Consul
etcd-client     = "0.13"     # etcd

# 熔断 / 限流
tower           = { version = "0.4", features = ["limit", "timeout", "retry"] }

# JWT 认证
jsonwebtoken    = "9"

# 配置
config          = "0.14"

4. HTTP 服务框架(Axum)

4.1 基础服务结构

// src/main.rs
use axum::{
    Router,
    routing::{get, post, put, delete},
    extract::{Path, Query, State, Json},
    http::StatusCode,
    response::{IntoResponse, Json as JsonResponse},
    middleware,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::net::TcpListener;

#[derive(Clone)]
struct AppState {
    db: sqlx::PgPool,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 初始化日志
    tracing_subscriber::fmt()
        .with_env_filter("info,tower_http=debug")
        .json()
        .init();

    let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
    let state = Arc::new(AppState { db });

    let app = Router::new()
        .route("/health",          get(health_check))
        .nest("/api/v1", api_routes())
        .with_state(state)
        .layer(
            tower_http::trace::TraceLayer::new_for_http()
        )
        .layer(
            tower_http::cors::CorsLayer::permissive()
        );

    let listener = TcpListener::bind("0.0.0.0:8080").await?;
    tracing::info!("服务启动于 0.0.0.0:8080");
    axum::serve(listener, app).await?;
    Ok(())
}

fn api_routes() -> Router<Arc<AppState>> {
    Router::new()
        .route("/users",     get(list_users).post(create_user))
        .route("/users/:id", get(get_user).put(update_user).delete(delete_user))
        .route("/orders",    post(create_order))
}

async fn health_check() -> impl IntoResponse {
    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
}

4.2 请求处理与提取器

#[derive(Debug, Serialize, Deserialize)]
struct User {
    id:       i64,
    username: String,
    email:    String,
}

#[derive(Debug, Deserialize)]
struct CreateUserRequest {
    username: String,
    email:    String,
    password: String,
}

#[derive(Debug, Deserialize)]
struct ListQuery {
    page:     Option<u32>,
    per_page: Option<u32>,
    status:   Option<String>,
}

// GET /api/v1/users?page=1&per_page=20
async fn list_users(
    State(state): State<Arc<AppState>>,
    Query(params): Query<ListQuery>,
) -> Result<JsonResponse<Vec<User>>, AppError> {
    let page     = params.page.unwrap_or(1);
    let per_page = params.per_page.unwrap_or(20).min(100);
    let offset   = ((page - 1) * per_page) as i64;

    let users = sqlx::query_as!(
        User,
        "SELECT id, username, email FROM users LIMIT $1 OFFSET $2",
        per_page as i64,
        offset,
    )
    .fetch_all(&state.db)
    .await?;

    Ok(JsonResponse(users))
}

// GET /api/v1/users/:id
async fn get_user(
    State(state): State<Arc<AppState>>,
    Path(id): Path<i64>,
) -> Result<JsonResponse<User>, AppError> {
    let user = sqlx::query_as!(User,
        "SELECT id, username, email FROM users WHERE id = $1",
        id
    )
    .fetch_optional(&state.db)
    .await?
    .ok_or(AppError::NotFound("用户不存在".into()))?;

    Ok(JsonResponse(user))
}

// POST /api/v1/users
async fn create_user(
    State(state): State<Arc<AppState>>,
    Json(req): Json<CreateUserRequest>,
) -> Result<impl IntoResponse, AppError> {
    // 输入验证
    if req.username.is_empty() || req.email.is_empty() {
        return Err(AppError::BadRequest("username 和 email 不能为空".into()));
    }

    let user = sqlx::query_as!(
        User,
        "INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id, username, email",
        req.username,
        req.email,
        bcrypt_hash(&req.password),
    )
    .fetch_one(&state.db)
    .await?;

    Ok((StatusCode::CREATED, JsonResponse(user)))
}

4.3 统一错误处理

use axum::{
    response::{IntoResponse, Response},
    http::StatusCode,
    Json,
};

#[derive(Debug, thiserror::Error)]
enum AppError {
    #[error("资源不存在: {0}")]
    NotFound(String),

    #[error("请求参数错误: {0}")]
    BadRequest(String),

    #[error("未授权")]
    Unauthorized,

    #[error("权限不足")]
    Forbidden,

    #[error("数据库错误: {0}")]
    Database(#[from] sqlx::Error),

    #[error("内部错误: {0}")]
    Internal(#[from] anyhow::Error),
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, message) = match &self {
            AppError::NotFound(msg)    => (StatusCode::NOT_FOUND,            msg.clone()),
            AppError::BadRequest(msg)  => (StatusCode::BAD_REQUEST,          msg.clone()),
            AppError::Unauthorized     => (StatusCode::UNAUTHORIZED,          "未授权".into()),
            AppError::Forbidden        => (StatusCode::FORBIDDEN,             "权限不足".into()),
            AppError::Database(e)      => {
                tracing::error!("数据库错误: {}", e);
                (StatusCode::INTERNAL_SERVER_ERROR, "服务器内部错误".into())
            }
            AppError::Internal(e)      => {
                tracing::error!("内部错误: {}", e);
                (StatusCode::INTERNAL_SERVER_ERROR, "服务器内部错误".into())
            }
        };

        (status, Json(serde_json::json!({
            "error":   status.as_u16(),
            "message": message,
        }))).into_response()
    }
}

4.4 中间件

use axum::{
    extract::Request,
    middleware::Next,
    response::Response,
    http::HeaderMap,
};
use std::time::Instant;

// 请求耗时日志中间件
async fn logging_middleware(req: Request, next: Next) -> Response {
    let method = req.method().clone();
    let uri    = req.uri().clone();
    let start  = Instant::now();

    let response = next.run(req).await;

    tracing::info!(
        method  = %method,
        uri     = %uri,
        status  = response.status().as_u16(),
        latency = ?start.elapsed(),
        "HTTP request"
    );

    response
}

// 请求 ID 注入中间件
async fn request_id_middleware(mut req: Request, next: Next) -> Response {
    let id = uuid::Uuid::new_v4().to_string();
    req.headers_mut().insert(
        "x-request-id",
        id.parse().unwrap(),
    );
    next.run(req).await
}

// 注册中间件
let app = Router::new()
    .route("/api", get(handler))
    .layer(middleware::from_fn(logging_middleware))
    .layer(middleware::from_fn(request_id_middleware));

5. gRPC 服务(tonic)

5.1 定义 Proto 文件

// proto/user.proto
syntax = "proto3";
package user;

service UserService {
    rpc GetUser    (GetUserRequest)    returns (UserResponse);
    rpc ListUsers  (ListUsersRequest)  returns (ListUsersResponse);
    rpc CreateUser (CreateUserRequest) returns (UserResponse);
    rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}

message GetUserRequest {
    int64 id = 1;
}

message CreateUserRequest {
    string username = 1;
    string email    = 2;
}

message UserResponse {
    int64  id       = 1;
    string username = 2;
    string email    = 3;
}

message ListUsersRequest {
    int32 page     = 1;
    int32 per_page = 2;
}

message ListUsersResponse {
    repeated UserResponse users = 1;
    int32 total = 2;
}

message DeleteUserRequest {
    int64 id = 1;
}

message DeleteUserResponse {
    bool success = 1;
}
# Cargo.toml
[dependencies]
tonic   = "0.11"
prost   = "0.12"

[build-dependencies]
tonic-build = "0.11"
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::configure()
        .build_server(true)
        .build_client(true)
        .compile(&["proto/user.proto"], &["proto"])?;
    Ok(())
}

5.2 gRPC 服务端

use tonic::{transport::Server, Request, Response, Status};

// 引入 proto 生成的代码
pub mod user_proto {
    tonic::include_proto!("user");
}
use user_proto::{
    user_service_server::{UserService, UserServiceServer},
    *,
};

#[derive(Debug, Clone)]
struct UserServiceImpl {
    db: sqlx::PgPool,
}

#[tonic::async_trait]
impl UserService for UserServiceImpl {
    async fn get_user(
        &self,
        req: Request<GetUserRequest>,
    ) -> Result<Response<UserResponse>, Status> {
        let id = req.into_inner().id;

        let user = sqlx::query!(
            "SELECT id, username, email FROM users WHERE id = $1",
            id
        )
        .fetch_optional(&self.db)
        .await
        .map_err(|e| Status::internal(e.to_string()))?
        .ok_or_else(|| Status::not_found(format!("用户 {} 不存在", id)))?;

        Ok(Response::new(UserResponse {
            id:       user.id,
            username: user.username,
            email:    user.email,
        }))
    }

    async fn create_user(
        &self,
        req: Request<CreateUserRequest>,
    ) -> Result<Response<UserResponse>, Status> {
        let r = req.into_inner();

        if r.username.is_empty() {
            return Err(Status::invalid_argument("username 不能为空"));
        }

        let user = sqlx::query!(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email",
            r.username, r.email
        )
        .fetch_one(&self.db)
        .await
        .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(UserResponse {
            id:       user.id,
            username: user.username,
            email:    user.email,
        }))
    }

    async fn list_users(
        &self,
        req: Request<ListUsersRequest>,
    ) -> Result<Response<ListUsersResponse>, Status> {
        let r      = req.into_inner();
        let limit  = r.per_page.max(1).min(100) as i64;
        let offset = ((r.page.max(1) - 1) * r.per_page) as i64;

        let rows = sqlx::query!(
            "SELECT id, username, email FROM users LIMIT $1 OFFSET $2",
            limit, offset
        )
        .fetch_all(&self.db)
        .await
        .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(ListUsersResponse {
            users: rows.into_iter().map(|r| UserResponse {
                id: r.id, username: r.username, email: r.email
            }).collect(),
            total: limit as i32,
        }))
    }

    async fn delete_user(
        &self,
        req: Request<DeleteUserRequest>,
    ) -> Result<Response<DeleteUserResponse>, Status> {
        let id = req.into_inner().id;
        sqlx::query!("DELETE FROM users WHERE id = $1", id)
            .execute(&self.db)
            .await
            .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(DeleteUserResponse { success: true }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
    let addr = "0.0.0.0:50051".parse()?;

    Server::builder()
        .add_service(UserServiceServer::new(UserServiceImpl { db }))
        .serve(addr)
        .await?;

    Ok(())
}

5.3 gRPC 客户端(服务间调用)

use user_proto::user_service_client::UserServiceClient;

async fn call_user_service() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = UserServiceClient::connect("http://user-service:50051").await?;

    // 获取用户
    let response = client
        .get_user(tonic::Request::new(GetUserRequest { id: 1 }))
        .await?;
    println!("用户: {:?}", response.into_inner());

    // 创建用户
    let user = client
        .create_user(tonic::Request::new(CreateUserRequest {
            username: "alice".to_string(),
            email:    "alice@example.com".to_string(),
        }))
        .await?
        .into_inner();
    println!("创建成功: {:?}", user);

    Ok(())
}

5.4 gRPC 拦截器(中间件)

use tonic::{Request, Status};

// 认证拦截器
fn auth_interceptor(req: Request<()>) -> Result<Request<()>, Status> {
    let token = req
        .metadata()
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .and_then(|v| v.strip_prefix("Bearer "))
        .ok_or_else(|| Status::unauthenticated("缺少 Authorization header"))?;

    // 验证 JWT
    verify_jwt(token).map_err(|_| Status::unauthenticated("Token 无效"))?;

    Ok(req)
}

// 注册拦截器
Server::builder()
    .add_service(
        UserServiceServer::with_interceptor(UserServiceImpl { db }, auth_interceptor)
    )
    .serve(addr)
    .await?;

6. 服务发现与注册

6.1 为什么需要服务发现

问题:微服务实例 IP 和端口动态变化(扩缩容、重启、故障迁移)
     不能在配置文件中写死 "user-service:8080"

解决:
  服务注册:启动时向注册中心登记(IP、端口、健康检查接口)
  服务发现:调用方从注册中心查询可用实例列表
  心跳维护:定时上报健康状态,注册中心剔除不健康实例

注册中心选型:
  Consul  ──▶ 功能全面(服务发现 + 配置 + 健康检查)
  etcd    ──▶ 强一致,Kubernetes 内置
  Nacos   ──▶ 阿里开源,Java 生态友好
  ZooKeeper ──▶ 经典,较重

6.2 Consul 服务注册(Rust)

[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde   = { version = "1", features = ["derive"] }
use reqwest::Client;
use serde::Serialize;
use std::time::Duration;
use tokio::time;

#[derive(Serialize)]
struct ConsulRegistration<'a> {
    #[serde(rename = "ID")]
    id:      &'a str,
    #[serde(rename = "Name")]
    name:    &'a str,
    #[serde(rename = "Address")]
    address: &'a str,
    #[serde(rename = "Port")]
    port:    u16,
    #[serde(rename = "Tags")]
    tags:    Vec<&'a str>,
    #[serde(rename = "Check")]
    check:   ConsulHealthCheck<'a>,
}

#[derive(Serialize)]
struct ConsulHealthCheck<'a> {
    #[serde(rename = "HTTP")]
    http:     &'a str,
    #[serde(rename = "Interval")]
    interval: &'a str,
    #[serde(rename = "Timeout")]
    timeout:  &'a str,
}

struct ServiceRegistry {
    client:      Client,
    consul_url:  String,
    service_id:  String,
}

impl ServiceRegistry {
    async fn register(&self) -> anyhow::Result<()> {
        let reg = ConsulRegistration {
            id:      &self.service_id,
            name:    "user-service",
            address: "10.0.0.1",
            port:    8080,
            tags:    vec!["v1", "rust"],
            check:   ConsulHealthCheck {
                http:     "http://10.0.0.1:8080/health",
                interval: "10s",
                timeout:  "3s",
            },
        };

        self.client
            .put(format!("{}/v1/agent/service/register", self.consul_url))
            .json(&reg)
            .send()
            .await?
            .error_for_status()?;

        tracing::info!("服务注册成功: {}", self.service_id);
        Ok(())
    }

    async fn deregister(&self) -> anyhow::Result<()> {
        self.client
            .put(format!("{}/v1/agent/service/deregister/{}", self.consul_url, self.service_id))
            .send()
            .await?
            .error_for_status()?;

        tracing::info!("服务注销成功: {}", self.service_id);
        Ok(())
    }
}

// 在 main 中优雅关闭时注销
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let registry = ServiceRegistry {
        client:     Client::new(),
        consul_url: "http://consul:8500".to_string(),
        service_id: format!("user-service-{}", uuid::Uuid::new_v4()),
    };

    registry.register().await?;

    // 监听 SIGTERM/SIGINT
    tokio::signal::ctrl_c().await?;

    registry.deregister().await?;
    Ok(())
}

6.3 服务发现 + 客户端负载均衡

use rand::seq::SliceRandom;

struct ServiceDiscovery {
    client:     Client,
    consul_url: String,
}

impl ServiceDiscovery {
    async fn get_instances(&self, service: &str) -> anyhow::Result<Vec<String>> {
        #[derive(serde::Deserialize)]
        struct ServiceEntry {
            #[serde(rename = "Service")]
            service: ServiceInfo,
        }
        #[derive(serde::Deserialize)]
        struct ServiceInfo {
            #[serde(rename = "Address")]
            address: String,
            #[serde(rename = "Port")]
            port:    u16,
        }

        let entries: Vec<ServiceEntry> = self.client
            .get(format!("{}/v1/health/service/{}?passing=true", self.consul_url, service))
            .send()
            .await?
            .json()
            .await?;

        let addresses = entries
            .into_iter()
            .map(|e| format!("http://{}:{}", e.service.address, e.service.port))
            .collect();

        Ok(addresses)
    }

    // 随机负载均衡
    async fn pick_instance(&self, service: &str) -> anyhow::Result<String> {
        let instances = self.get_instances(service).await?;
        instances
            .choose(&mut rand::thread_rng())
            .cloned()
            .ok_or_else(|| anyhow::anyhow!("没有可用的 {} 实例", service))
    }
}

7. API 网关

7.1 网关职责

客户端                    API Gateway                    内部服务
  │                           │                              │
  │── GET /api/orders ──────▶ │── 认证 (JWT验证) ──────────▶ │
  │                           │── 限流 (100req/s/IP) ──────▶ │
  │                           │── 路由 (→ order-service) ──▶ │
  │                           │── 熔断 (服务不可用时降级) ──▶ │
  │                           │── 日志 & 链路追踪 ─────────▶ │
  │◀── 200 OK ─────────────── │◀─────────────────────────── │

7.2 用 Axum 实现轻量 API 网关

use axum::{
    Router,
    extract::{Request, State},
    http::{HeaderMap, Uri},
    response::{IntoResponse, Response},
    routing::any,
    body::Body,
};
use reqwest::Client;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone)]
struct GatewayState {
    client:   Client,
    routes:   Arc<HashMap<String, String>>, // prefix → upstream
}

async fn proxy_handler(
    State(state): State<Arc<GatewayState>>,
    uri:          Uri,
    headers:      HeaderMap,
    req:          Request,
) -> Result<Response, AppError> {
    let path = uri.path();

    // 路由匹配
    let upstream = state.routes
        .iter()
        .find(|(prefix, _)| path.starts_with(prefix.as_str()))
        .map(|(_, upstream)| upstream)
        .ok_or_else(|| AppError::NotFound("路由未匹配".into()))?;

    let target_url = format!("{}{}", upstream, uri.path_and_query().map(|p| p.as_str()).unwrap_or(""));

    // 转发请求
    let method  = req.method().clone();
    let body    = axum::body::to_bytes(req.into_body(), usize::MAX).await?;

    let mut builder = state.client.request(method, &target_url);
    for (name, value) in headers.iter() {
        builder = builder.header(name, value);
    }

    let upstream_resp = builder.body(body).send().await?;

    // 转发响应
    let status  = upstream_resp.status();
    let headers = upstream_resp.headers().clone();
    let body    = upstream_resp.bytes().await?;

    let mut resp = Response::new(Body::from(body));
    *resp.status_mut() = status;
    resp.headers_mut().extend(headers);

    Ok(resp)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut routes = HashMap::new();
    routes.insert("/api/users".to_string(),  "http://user-service:8080".to_string());
    routes.insert("/api/orders".to_string(), "http://order-service:8080".to_string());
    routes.insert("/api/pay".to_string(),    "http://pay-service:8080".to_string());

    let state = Arc::new(GatewayState {
        client: Client::builder().timeout(std::time::Duration::from_secs(30)).build()?,
        routes: Arc::new(routes),
    });

    let app = Router::new()
        .route("/*path", any(proxy_handler))
        .with_state(state);

    axum::serve(TcpListener::bind("0.0.0.0:80").await?, app).await?;
    Ok(())
}

8. 熔断器与限流

8.1 熔断器(Circuit Breaker)

三种状态:
  Closed(关闭)  ──▶ 正常工作,记录失败次数
  Open(开启)    ──▶ 失败率超阈值,直接返回错误,不调用下游
  Half-Open(半开)──▶ 超时后尝试放行少量请求,成功则关闭,失败则重新开启

状态转换:
  Closed ──[失败率>50%]──▶ Open ──[30s后]──▶ Half-Open ──[成功]──▶ Closed
                                                          ──[失败]──▶ Open
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

#[derive(Debug, Clone, PartialEq)]
enum CircuitState {
    Closed,
    Open { until: Instant },
    HalfOpen,
}

struct CircuitBreaker {
    state:         Arc<RwLock<CircuitState>>,
    failure_count: Arc<AtomicU32>,
    success_count: Arc<AtomicU32>,
    threshold:     u32,       // 失败次数阈值
    reset_timeout: Duration,  // Open 状态持续时间
    half_open_max: u32,       // Half-Open 允许的最大请求数
}

impl CircuitBreaker {
    fn new(threshold: u32, reset_timeout: Duration) -> Self {
        Self {
            state:         Arc::new(RwLock::new(CircuitState::Closed)),
            failure_count: Arc::new(AtomicU32::new(0)),
            success_count: Arc::new(AtomicU32::new(0)),
            threshold,
            reset_timeout,
            half_open_max: 3,
        }
    }

    async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitError<E>>
    where
        F:   FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        // 检查状态
        {
            let state = self.state.read().await;
            match &*state {
                CircuitState::Open { until } => {
                    if Instant::now() < *until {
                        return Err(CircuitError::Open); // 快速失败
                    }
                    // 超时,转为 Half-Open
                    drop(state);
                    *self.state.write().await = CircuitState::HalfOpen;
                }
                CircuitState::HalfOpen => {
                    let successes = self.success_count.load(Ordering::Relaxed);
                    if successes >= self.half_open_max {
                        return Err(CircuitError::Open);
                    }
                }
                CircuitState::Closed => {}
            }
        }

        // 执行调用
        match f().await {
            Ok(val) => {
                self.on_success().await;
                Ok(val)
            }
            Err(e) => {
                self.on_failure().await;
                Err(CircuitError::Downstream(e))
            }
        }
    }

    async fn on_success(&self) {
        let mut state = self.state.write().await;
        if *state == CircuitState::HalfOpen {
            let count = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
            if count >= self.half_open_max {
                *state = CircuitState::Closed;
                self.failure_count.store(0, Ordering::Relaxed);
                self.success_count.store(0, Ordering::Relaxed);
                tracing::info!("熔断器关闭,恢复正常");
            }
        }
    }

    async fn on_failure(&self) {
        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
        if failures >= self.threshold {
            let mut state = self.state.write().await;
            *state = CircuitState::Open {
                until: Instant::now() + self.reset_timeout,
            };
            tracing::warn!("熔断器开启!失败次数: {}", failures);
        }
    }
}

#[derive(Debug)]
enum CircuitError<E> {
    Open,           // 熔断器开启,快速失败
    Downstream(E),  // 下游实际错误
}

// 使用示例
let cb = Arc::new(CircuitBreaker::new(5, Duration::from_secs(30)));

let result = cb.call(|| async {
    reqwest::get("http://user-service:8080/api/users/1").await
}).await;

match result {
    Ok(resp)                      => println!("成功: {:?}", resp),
    Err(CircuitError::Open)       => println!("熔断器开启,降级处理"),
    Err(CircuitError::Downstream(e)) => println!("下游错误: {}", e),
}

8.2 限流(Rate Limiting)

use tower::ServiceBuilder;
use tower_http::limit::RequestBodyLimitLayer;

// Tower 中间件限流(基于 tower::limit)
let app = Router::new()
    .route("/api", get(handler))
    .layer(
        ServiceBuilder::new()
            // 限制并发请求数
            .concurrency_limit(100)
            // 请求超时
            .timeout(Duration::from_secs(30))
            // 请求体大小限制
            .layer(RequestBodyLimitLayer::new(10 * 1024 * 1024)) // 10MB
    );
// 基于 Redis 的分布式限流(滑动窗口)
use redis::AsyncCommands;

struct RateLimiter {
    redis: redis::Client,
    limit: u64,           // 窗口内最大请求数
    window: u64,          // 窗口大小(秒)
}

impl RateLimiter {
    async fn is_allowed(&self, key: &str) -> anyhow::Result<bool> {
        let mut conn = self.redis.get_async_connection().await?;
        let redis_key = format!("rate_limit:{}", key);
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)?.as_millis() as u64;

        let window_start = now - self.window * 1000;

        // Lua 脚本保证原子性
        let script = r#"
            local key    = KEYS[1]
            local now    = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])
            local limit  = tonumber(ARGV[3])

            redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
            local count = redis.call('ZCARD', key)

            if count < limit then
                redis.call('ZADD', key, now, now)
                redis.call('EXPIRE', key, window + 1)
                return 1
            end
            return 0
        "#;

        let allowed: i32 = redis::Script::new(script)
            .key(&redis_key)
            .arg(now)
            .arg(self.window)
            .arg(self.limit)
            .invoke_async(&mut conn)
            .await?;

        Ok(allowed == 1)
    }
}

// 限流中间件
async fn rate_limit_middleware(
    State(limiter): State<Arc<RateLimiter>>,
    req: Request,
    next: Next,
) -> Response {
    // 以 IP 为限流 key
    let ip = req.headers()
        .get("x-forwarded-for")
        .and_then(|v| v.to_str().ok())
        .unwrap_or("unknown");

    match limiter.is_allowed(ip).await {
        Ok(true)  => next.run(req).await,
        Ok(false) => (
            StatusCode::TOO_MANY_REQUESTS,
            Json(serde_json::json!({"error": "请求过于频繁,请稍后再试"})),
        ).into_response(),
        Err(e) => {
            tracing::error!("限流检查失败: {}", e);
            next.run(req).await // 限流组件故障时放行(fail-open)
        }
    }
}

9. 链路追踪与可观测性

9.1 三大支柱

可观测性三支柱:
  Logs(日志)    ──▶ 结构化 JSON 日志,包含 trace_id
  Metrics(指标) ──▶ QPS、延迟 P99、错误率、资源使用率
  Traces(链路)  ──▶ 跨服务调用链,定位性能瓶颈

9.2 结构化日志(tracing)

use tracing::{info, warn, error, instrument, Span};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn init_tracing() {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::new(
                std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into())
            )
        )
        .with(tracing_subscriber::fmt::layer().json()) // JSON 格式,便于 ELK 收集
        .init();
}

// #[instrument] 自动记录函数入参和耗时
#[instrument(skip(db), fields(user.id = %user_id))]
async fn get_user_by_id(user_id: i64, db: &sqlx::PgPool) -> anyhow::Result<User> {
    info!("开始查询用户");

    let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", user_id)
        .fetch_optional(db)
        .await?;

    match user {
        Some(u) => {
            info!(username = %u.username, "查询成功");
            Ok(u)
        }
        None => {
            warn!("用户不存在");
            Err(anyhow::anyhow!("用户 {} 不存在", user_id))
        }
    }
}

9.3 OpenTelemetry 分布式追踪

[dependencies]
opentelemetry       = "0.22"
opentelemetry-otlp  = { version = "0.15", features = ["grpc-tonic"] }
opentelemetry_sdk   = "0.22"
tracing-opentelemetry = "0.23"
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;

fn init_telemetry() -> anyhow::Result<()> {
    // 发送到 Jaeger/Tempo(通过 OTLP 协议)
    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_endpoint("http://jaeger:4317")
        )
        .with_trace_config(
            opentelemetry_sdk::trace::config()
                .with_resource(opentelemetry_sdk::Resource::new(vec![
                    opentelemetry::KeyValue::new("service.name", "user-service"),
                    opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
                ]))
        )
        .install_batch(opentelemetry_sdk::runtime::Tokio)?;

    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer().json())
        .with(OpenTelemetryLayer::new(tracer))  // 自动将 span 上报到 Jaeger
        .init();

    Ok(())
}

// 手动创建 span
async fn process_order(order_id: i64) {
    let span = tracing::info_span!("process_order", order.id = order_id);
    let _guard = span.enter();

    // 子 span
    {
        let _s = tracing::info_span!("validate_inventory").entered();
        // ...
    }
    {
        let _s = tracing::info_span!("charge_payment").entered();
        // ...
    }
}

9.4 Prometheus 指标

use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;

fn init_metrics() {
    PrometheusBuilder::new()
        .with_http_listener(([0, 0, 0, 0], 9090)) // /metrics 接口
        .install()
        .expect("Prometheus 初始化失败");
}

// 在业务代码中埋点
async fn handle_request(req: Request) -> Response {
    let start = Instant::now();

    counter!("http.requests.total",
        "method" => req.method().to_string(),
        "path"   => req.uri().path().to_string(),
    );

    let resp = process(req).await;

    histogram!("http.request.duration_seconds",
        start.elapsed().as_secs_f64(),
        "status" => resp.status().as_u16().to_string(),
    );

    if resp.status().is_server_error() {
        counter!("http.errors.total");
    }

    gauge!("active.connections", -1.0); // 请求结束,减少活跃连接数

    resp
}

10. 配置中心

10.1 分层配置加载

[dependencies]
config = "0.14"
serde  = { version = "1", features = ["derive"] }
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
    pub server:   ServerConfig,
    pub database: DatabaseConfig,
    pub redis:    RedisConfig,
    pub jwt:      JwtConfig,
}

#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
    pub host: String,
    pub port: u16,
}

#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
    pub url:          String,
    pub max_conn:     u32,
    pub min_conn:     u32,
}

#[derive(Debug, Deserialize, Clone)]
pub struct JwtConfig {
    pub secret:     String,
    pub expires_in: u64,  // 秒
}

impl Settings {
    pub fn load() -> Result<Self, ConfigError> {
        let env = std::env::var("APP_ENV").unwrap_or_else(|_| "development".into());

        Config::builder()
            // 1. 默认配置
            .add_source(File::with_name("config/default"))
            // 2. 环境特定配置(覆盖默认)
            .add_source(File::with_name(&format!("config/{}", env)).required(false))
            // 3. 本地配置(覆盖,不提交 git)
            .add_source(File::with_name("config/local").required(false))
            // 4. 环境变量(最高优先级)APP_SERVER__PORT=9090
            .add_source(Environment::with_prefix("APP").separator("__"))
            .build()?
            .try_deserialize()
    }
}
# config/default.yaml
server:
  host: "0.0.0.0"
  port: 8080

database:
  url: "postgres://postgres:password@localhost/app"
  max_conn: 20
  min_conn: 5

jwt:
  secret: "change-me-in-production"
  expires_in: 86400

10.2 动态配置(Consul KV / etcd)

use etcd_client::{Client, WatchOptions};

async fn watch_config(key: &str) -> anyhow::Result<()> {
    let mut client = Client::connect(["http://etcd:2379"], None).await?;

    // 获取当前值
    let resp = client.get(key, None).await?;
    if let Some(kv) = resp.kvs().first() {
        println!("当前配置: {}", kv.value_str()?);
    }

    // 监听变化(热更新)
    let (_, mut stream) = client.watch(key, Some(WatchOptions::new())).await?;
    while let Some(resp) = stream.message().await? {
        for event in resp.events() {
            if let Some(kv) = event.kv() {
                println!("配置更新: {} = {}", kv.key_str()?, kv.value_str()?);
                // 通知应用层重新加载配置
                // config_tx.send(kv.value_str()?.to_string()).await?;
            }
        }
    }

    Ok(())
}

11. 数据层设计

11.1 每个服务独立数据库

原则:服务间绝不共享数据库,各自拥有独立的 schema 或独立的数据库实例

user-service   ──▶ postgres://user-db/users
order-service  ──▶ postgres://order-db/orders
pay-service    ──▶ postgres://pay-db/payments

跨服务查询通过 API 调用实现,不用 JOIN

11.2 sqlx 数据访问层

use sqlx::{PgPool, PgTransaction};

// Repository 模式:隔离数据访问逻辑
struct UserRepository {
    pool: PgPool,
}

impl UserRepository {
    async fn find_by_id(&self, id: i64) -> anyhow::Result<Option<User>> {
        Ok(sqlx::query_as!(User,
            "SELECT id, username, email, created_at FROM users WHERE id = $1",
            id
        )
        .fetch_optional(&self.pool)
        .await?)
    }

    async fn find_by_email(&self, email: &str) -> anyhow::Result<Option<User>> {
        Ok(sqlx::query_as!(User,
            "SELECT id, username, email, created_at FROM users WHERE email = $1",
            email
        )
        .fetch_optional(&self.pool)
        .await?)
    }

    // 在事务中执行
    async fn create_with_tx(&self, tx: &mut PgTransaction<'_>, req: &CreateUserReq) -> anyhow::Result<User> {
        Ok(sqlx::query_as!(User,
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email, created_at",
            req.username, req.email
        )
        .fetch_one(&mut **tx)
        .await?)
    }
}

// 事务使用示例
async fn register_user(pool: &PgPool, req: CreateUserReq) -> anyhow::Result<User> {
    let mut tx = pool.begin().await?;

    let repo = UserRepository { pool: pool.clone() };

    // 检查 email 唯一性
    if repo.find_by_email(&req.email).await?.is_some() {
        return Err(anyhow::anyhow!("邮箱已注册"));
    }

    let user = repo.create_with_tx(&mut tx, &req).await?;

    // 发送欢迎邮件事件(写 outbox,同一事务)
    sqlx::query!(
        "INSERT INTO outbox (topic, payload) VALUES ($1, $2)",
        "user.registered",
        serde_json::to_string(&user)?
    )
    .execute(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(user)
}

11.3 数据库迁移(sqlx migrate)

# 创建迁移文件
sqlx migrate add create_users_table

# 执行迁移
sqlx migrate run

# 回滚
sqlx migrate revert
-- migrations/20260101_create_users_table.sql
CREATE TABLE users (
    id         BIGSERIAL PRIMARY KEY,
    username   VARCHAR(50)  NOT NULL,
    email      VARCHAR(100) NOT NULL UNIQUE,
    created_at TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users(email);
// 应用启动时自动执行迁移
sqlx::migrate!("./migrations")
    .run(&pool)
    .await
    .expect("数据库迁移失败");

12. 服务间异步通信(Event-Driven)

12.1 事件驱动架构

同步调用:                       事件驱动:
  OrderService ──HTTP──▶ UserService    OrderService ──[order.created]──▶ MQ
                                                                          ├──▶ UserService(更新积分)
                                        耦合,链式依赖                    ├──▶ NotifyService(发通知)
                                                                          └──▶ AnalyticsService(统计)
                                                                          解耦,扇出,异步

12.2 事件定义规范

use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};

// 基础事件结构(所有事件的公共信息)
#[derive(Debug, Serialize, Deserialize)]
pub struct DomainEvent<T> {
    pub id:         String,            // 事件唯一 ID(幂等键)
    pub event_type: String,            // "order.created" / "user.registered"
    pub version:    u32,               // schema 版本,用于兼容演进
    pub source:     String,            // 来源服务
    pub occurred_at: DateTime<Utc>,   // 事件发生时间
    pub data:       T,                 // 业务数据
}

// 具体业务事件
#[derive(Debug, Serialize, Deserialize)]
pub struct OrderCreatedEvent {
    pub order_id: i64,
    pub user_id:  i64,
    pub items:    Vec<OrderItem>,
    pub total:    f64,
}

// 构造函数
impl<T: Serialize> DomainEvent<T> {
    pub fn new(event_type: &str, source: &str, data: T) -> Self {
        Self {
            id:          uuid::Uuid::new_v4().to_string(),
            event_type:  event_type.to_string(),
            version:     1,
            source:      source.to_string(),
            occurred_at: Utc::now(),
            data,
        }
    }
}

12.3 事件发布(结合 Outbox 模式)

// 业务操作 + 写 outbox,同一数据库事务,保证原子性
pub async fn create_order(pool: &PgPool, req: CreateOrderReq) -> anyhow::Result<Order> {
    let mut tx = pool.begin().await?;

    // 写订单
    let order = sqlx::query_as!(Order,
        "INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *",
        req.user_id, req.total
    ).fetch_one(&mut *tx).await?;

    // 写 outbox(同一事务)
    let event = DomainEvent::new(
        "order.created",
        "order-service",
        OrderCreatedEvent { order_id: order.id, user_id: order.user_id, items: req.items, total: order.total },
    );
    sqlx::query!(
        "INSERT INTO outbox (id, topic, payload, status) VALUES ($1, $2, $3, 'pending')",
        event.id,
        "order.created",
        serde_json::to_value(&event)?
    ).execute(&mut *tx).await?;

    tx.commit().await?;

    Ok(order)
}

// Outbox Relay:轮询 outbox,发送到 Kafka
pub async fn outbox_relay(pool: &PgPool, producer: &FutureProducer) {
    loop {
        let rows = sqlx::query!(
            "SELECT id, topic, payload FROM outbox WHERE status = 'pending' ORDER BY created_at LIMIT 50 FOR UPDATE SKIP LOCKED"
        )
        .fetch_all(pool)
        .await
        .unwrap_or_default();

        for row in rows {
            let record = FutureRecord::to(&row.topic)
                .key(&row.id)
                .payload(&row.payload.to_string());

            let status = match producer.send(record, Duration::from_secs(5)).await {
                Ok(_)  => "sent",
                Err(_) => "failed",
            };

            sqlx::query!("UPDATE outbox SET status = $1 WHERE id = $2", status, row.id)
                .execute(pool)
                .await
                .ok();
        }

        tokio::time::sleep(Duration::from_millis(500)).await;
    }
}

13. 分布式事务

13.1 Saga 模式(推荐)

Saga = 一系列本地事务,每个本地事务完成后发布事件触发下一步
失败时执行补偿事务(撤销前面的操作)

下单 Saga:
  1. OrderService  创建订单(PENDING)  ──▶ 发布 [OrderCreated]
  2. InventoryService 锁定库存           ──▶ 发布 [InventoryReserved]
  3. PayService    扣款                  ──▶ 发布 [PaymentCompleted]
  4. OrderService  更新订单为 CONFIRMED

失败补偿(反向执行):
  扣款失败 ──▶ 发布 [PaymentFailed]
           ──▶ InventoryService 释放库存
           ──▶ OrderService 取消订单
// Saga 状态机实现
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SagaState {
    OrderCreated,
    InventoryReserved,
    PaymentProcessed,
    Completed,
    // 补偿状态
    CompensatingInventory,
    CompensatingOrder,
    Failed,
}

pub struct OrderSaga {
    pub saga_id:   String,
    pub order_id:  i64,
    pub state:     SagaState,
}

impl OrderSaga {
    pub fn next_state(&self, event: &str) -> Option<SagaState> {
        match (&self.state, event) {
            (SagaState::OrderCreated,      "inventory.reserved")  => Some(SagaState::InventoryReserved),
            (SagaState::InventoryReserved, "payment.completed")   => Some(SagaState::PaymentProcessed),
            (SagaState::PaymentProcessed,  "order.confirmed")     => Some(SagaState::Completed),
            // 失败补偿
            (SagaState::InventoryReserved, "payment.failed")      => Some(SagaState::CompensatingInventory),
            (SagaState::OrderCreated,      "inventory.failed")    => Some(SagaState::CompensatingOrder),
            _ => None,
        }
    }
}

14. 容器化与部署

14.1 多阶段 Dockerfile(最小镜像)

# ---- 构建阶段 ----
FROM rust:1.78-slim AS builder

WORKDIR /app

# 依赖缓存层(只有 Cargo.toml 变化才重新下载)
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && echo "fn main(){}" > src/main.rs
RUN cargo build --release
RUN rm src/main.rs

# 编译实际代码
COPY src ./src
RUN touch src/main.rs && cargo build --release

# ---- 运行阶段(最小镜像)----
FROM debian:bookworm-slim

RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY --from=builder /app/target/release/user-service .
COPY config ./config

# 非 root 用户运行
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8080
HEALTHCHECK --interval=10s --timeout=3s CMD curl -f http://localhost:8080/health || exit 1

ENTRYPOINT ["./user-service"]

14.2 Docker Compose(本地开发环境)

# docker-compose.yml
version: "3.8"

services:
  user-service:
    build: ./user-service
    ports:
      - "8080:8080"
    environment:
      - DATABASE_URL=postgres://postgres:password@postgres:5432/users
      - REDIS_URL=redis://redis:6379
      - APP_ENV=development
    depends_on:
      postgres:
        condition: service_healthy
    restart: unless-stopped

  order-service:
    build: ./order-service
    ports:
      - "8081:8080"
    environment:
      - DATABASE_URL=postgres://postgres:password@postgres:5432/orders
      - KAFKA_BROKERS=kafka:9092
    depends_on: [postgres, kafka]

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_PASSWORD: password
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  kafka:
    image: confluentinc/cp-kafka:latest
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    depends_on: [zookeeper]

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686" # UI
      - "4317:4317" # OTLP gRPC

volumes:
  pgdata:

14.3 Kubernetes 部署

# k8s/user-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
        - name: user-service
          image: myregistry/user-service:v1.2.0
          ports:
            - containerPort: 8080
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: user-service-secrets
                  key: database-url
          resources:
            requests:
              memory: "64Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "500m"
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 20

---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
    - port: 80
      targetPort: 8080
  type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: user-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: user-service
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

15. 安全:认证与授权

15.1 JWT 认证中间件

[dependencies]
jsonwebtoken = "9"
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
    pub sub:   i64,              // 用户 ID
    pub email: String,
    pub roles: Vec<String>,
    pub exp:   usize,            // 过期时间(Unix 时间戳)
    pub iat:   usize,            // 签发时间
}

pub fn create_token(claims: &Claims, secret: &[u8]) -> anyhow::Result<String> {
    Ok(encode(
        &Header::default(),
        claims,
        &EncodingKey::from_secret(secret),
    )?)
}

pub fn verify_token(token: &str, secret: &[u8]) -> anyhow::Result<Claims> {
    let data = decode::<Claims>(
        token,
        &DecodingKey::from_secret(secret),
        &Validation::default(),
    )?;
    Ok(data.claims)
}

// JWT 认证中间件
pub async fn auth_middleware(
    State(secret): State<Arc<String>>,
    mut req: Request,
    next: Next,
) -> Result<Response, AppError> {
    let token = req
        .headers()
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .and_then(|v| v.strip_prefix("Bearer "))
        .ok_or(AppError::Unauthorized)?;

    let claims = verify_token(token, secret.as_bytes())
        .map_err(|_| AppError::Unauthorized)?;

    // 将 claims 注入请求扩展,后续 handler 可提取
    req.extensions_mut().insert(claims);

    Ok(next.run(req).await)
}

// Handler 中提取已认证用户
async fn get_profile(
    Extension(claims): Extension<Claims>,
    State(state): State<Arc<AppState>>,
) -> Result<JsonResponse<User>, AppError> {
    let user = state.user_repo.find_by_id(claims.sub).await?
        .ok_or(AppError::NotFound("用户不存在".into()))?;
    Ok(JsonResponse(user))
}

15.2 RBAC 权限控制

// 基于角色的访问控制
fn require_role(role: &'static str) -> impl Fn(Extension<Claims>, Request, Next) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send>> + Clone {
    move |Extension(claims): Extension<Claims>, req: Request, next: Next| {
        Box::pin(async move {
            if claims.roles.contains(&role.to_string()) {
                next.run(req).await
            } else {
                (StatusCode::FORBIDDEN, "权限不足").into_response()
            }
        })
    }
}

// 路由级别授权
let admin_routes = Router::new()
    .route("/admin/users", get(list_all_users))
    .route_layer(middleware::from_fn_with_state(state.clone(), auth_middleware))
    .route_layer(middleware::from_fn(require_role("admin")));

16. 生产实践与最佳实践

优雅关闭

use tokio::signal;

async fn graceful_shutdown() {
    let ctrl_c = async {
        signal::ctrl_c().await.expect("CTRL+C 监听失败");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("SIGTERM 监听失败")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c    => tracing::info!("收到 CTRL+C"),
        _ = terminate => tracing::info!("收到 SIGTERM"),
    }

    tracing::info!("开始优雅关闭...");
}

// 在 main 中使用
axum::serve(listener, app)
    .with_graceful_shutdown(graceful_shutdown())
    .await?;

健康检查接口

#[derive(Serialize)]
struct HealthResponse {
    status:   &'static str,
    database: &'static str,
    version:  &'static str,
}

async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    // 检查数据库连接
    let db_ok = sqlx::query("SELECT 1")
        .fetch_one(&state.db)
        .await
        .is_ok();

    let status = if db_ok { "ok" } else { "degraded" };
    let http_status = if db_ok { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE };

    (http_status, Json(HealthResponse {
        status,
        database: if db_ok { "ok" } else { "error" },
        version: env!("CARGO_PKG_VERSION"),
    }))
}

常见反模式

❌ 服务间共享数据库          → 每个服务独立数据库
❌ 链式同步调用(A→B→C→D)  → 超时雪崩,改用异步事件
❌ 无熔断器直接调用下游      → 下游故障会拖垮调用方
❌ 日志不带 trace_id         → 跨服务问题无法排查
❌ 配置硬编码在代码中         → 环境变量 + 配置文件
❌ 容器内用 root 运行        → 安全风险,用非特权用户
❌ 忽略 graceful shutdown    → 正在处理的请求被强制中断
❌ 不做健康检查              → k8s 无法判断服务是否就绪

架构演进路径

第一阶段:单体起步
  一个 Axum 服务,一个数据库,快速验证业务

第二阶段:功能模块化
  同进程内,代码按 module 隔离,为拆分做准备

第三阶段:垂直拆分(按团队 / 流量热点)
  高频访问的模块率先独立为微服务(如用户服务、商品服务)

第四阶段:完整微服务 + 服务网格
  服务发现 + API 网关 + Sidecar(Istio/Linkerd)

原则:不要提前过度设计,根据实际痛点驱动演进