微服务 —— 结合 Rust 实践
1. 微服务简介
微服务架构(Microservices Architecture)将单体应用拆分为一组小型、独立部署的服务,每个服务围绕单一业务能力构建,通过网络相互通信。
单体 vs 微服务
单体架构:
┌──────────────────────────────┐
│ 用户模块 │ 订单模块 │ 支付模块 │
│ 商品模块 │ 通知模块 │ 报表模块 │
└──────────────────────────────┘
单一进程部署
缺点:代码耦合、扩展困难、技术栈单一、一处故障全站宕机
微服务架构:
[用户服务] ──┐
[订单服务] ──┼──▶ [API Gateway] ──▶ Client
[支付服务] ──┤
[通知服务] ──┘
每个服务独立部署、独立扩展、独立技术栈
微服务的收益与代价
| 收益 | 代价 |
|---|---|
| 独立部署,快速迭代 | 分布式系统复杂度大幅增加 |
| 按需扩展,资源利用率高 | 服务间网络通信引入延迟和故障点 |
| 技术栈自由 | 需要服务发现、链路追踪等基础设施 |
| 故障隔离,一个服务宕机不影响全局 | 分布式事务难以处理 |
| 团队可独立开发(康威定律) | 运维复杂度高(需要容器编排) |
为什么选 Rust 做微服务?
性能:接近 C 的运行效率,极低内存占用(镜像小、冷启动快)
安全:编译期内存安全,无 GC 停顿,适合低延迟要求
并发:async/await + Tokio,高并发无需多线程
生态:Axum / Actix-web / tonic / sqlx 已足够生产使用
2. 核心概念与设计原则
设计原则
单一职责(SRP) 每个服务只做一件事,围绕业务能力划分
高内聚低耦合 服务内部紧密协作,服务间依赖最小化
独立部署 服务可以单独构建、测试、上线,不依赖其他服务
去中心化 数据存储独立,避免共享数据库
容错设计 假设网络和依赖服务随时会失败,设计降级策略
可观测性 日志、指标、链路追踪三位一体
服务拆分策略
按业务领域(DDD 限界上下文):
电商平台 →
用户域:注册、登录、个人信息
商品域:商品管理、库存、价格
订单域:下单、订单状态机
支付域:支付渠道、对账
通知域:短信、邮件、Push
按变更频率:频繁变更的模块单独拆分
按团队边界:一个团队负责一个或多个服务
按性能需求:高并发的服务单独扩展
服务通信模式
同步通信(请求-响应):
REST/HTTP ──▶ 简单、通用,适合对外 API
gRPC ──▶ 高性能、强类型,适合内部服务间
GraphQL ──▶ 灵活查询,适合 BFF 层
异步通信(事件驱动):
消息队列(Kafka/RabbitMQ)──▶ 解耦,削峰,最终一致性
事件总线 ──▶ 服务间事件广播
3. Rust 微服务技术栈全景
# 核心框架
axum = "0.7" # HTTP Web 框架(Tower 生态)
tonic = "0.11" # gRPC 框架
tower = "0.4" # 中间件抽象层
tower-http = "0.5" # HTTP 中间件(限流、压缩、CORS 等)
# 异步运行时
tokio = { version = "1", features = ["full"] }
# 序列化
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# 数据库
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "migrate"] }
sea-orm = "0.12" # 全功能 ORM(可选)
# 可观测性
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
opentelemetry = "0.22"
metrics = "0.22"
metrics-exporter-prometheus = "0.13"
# 服务发现
consul-client = "0.5" # Consul
etcd-client = "0.13" # etcd
# 熔断 / 限流
tower = { version = "0.4", features = ["limit", "timeout", "retry"] }
# JWT 认证
jsonwebtoken = "9"
# 配置
config = "0.14"
4. HTTP 服务框架(Axum)
4.1 基础服务结构
// src/main.rs
use axum::{
Router,
routing::{get, post, put, delete},
extract::{Path, Query, State, Json},
http::StatusCode,
response::{IntoResponse, Json as JsonResponse},
middleware,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::net::TcpListener;
#[derive(Clone)]
struct AppState {
db: sqlx::PgPool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 初始化日志
tracing_subscriber::fmt()
.with_env_filter("info,tower_http=debug")
.json()
.init();
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
let state = Arc::new(AppState { db });
let app = Router::new()
.route("/health", get(health_check))
.nest("/api/v1", api_routes())
.with_state(state)
.layer(
tower_http::trace::TraceLayer::new_for_http()
)
.layer(
tower_http::cors::CorsLayer::permissive()
);
let listener = TcpListener::bind("0.0.0.0:8080").await?;
tracing::info!("服务启动于 0.0.0.0:8080");
axum::serve(listener, app).await?;
Ok(())
}
fn api_routes() -> Router<Arc<AppState>> {
Router::new()
.route("/users", get(list_users).post(create_user))
.route("/users/:id", get(get_user).put(update_user).delete(delete_user))
.route("/orders", post(create_order))
}
async fn health_check() -> impl IntoResponse {
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
}
4.2 请求处理与提取器
#[derive(Debug, Serialize, Deserialize)]
struct User {
id: i64,
username: String,
email: String,
}
#[derive(Debug, Deserialize)]
struct CreateUserRequest {
username: String,
email: String,
password: String,
}
#[derive(Debug, Deserialize)]
struct ListQuery {
page: Option<u32>,
per_page: Option<u32>,
status: Option<String>,
}
// GET /api/v1/users?page=1&per_page=20
async fn list_users(
State(state): State<Arc<AppState>>,
Query(params): Query<ListQuery>,
) -> Result<JsonResponse<Vec<User>>, AppError> {
let page = params.page.unwrap_or(1);
let per_page = params.per_page.unwrap_or(20).min(100);
let offset = ((page - 1) * per_page) as i64;
let users = sqlx::query_as!(
User,
"SELECT id, username, email FROM users LIMIT $1 OFFSET $2",
per_page as i64,
offset,
)
.fetch_all(&state.db)
.await?;
Ok(JsonResponse(users))
}
// GET /api/v1/users/:id
async fn get_user(
State(state): State<Arc<AppState>>,
Path(id): Path<i64>,
) -> Result<JsonResponse<User>, AppError> {
let user = sqlx::query_as!(User,
"SELECT id, username, email FROM users WHERE id = $1",
id
)
.fetch_optional(&state.db)
.await?
.ok_or(AppError::NotFound("用户不存在".into()))?;
Ok(JsonResponse(user))
}
// POST /api/v1/users
async fn create_user(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateUserRequest>,
) -> Result<impl IntoResponse, AppError> {
// 输入验证
if req.username.is_empty() || req.email.is_empty() {
return Err(AppError::BadRequest("username 和 email 不能为空".into()));
}
let user = sqlx::query_as!(
User,
"INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id, username, email",
req.username,
req.email,
bcrypt_hash(&req.password),
)
.fetch_one(&state.db)
.await?;
Ok((StatusCode::CREATED, JsonResponse(user)))
}
4.3 统一错误处理
use axum::{
response::{IntoResponse, Response},
http::StatusCode,
Json,
};
#[derive(Debug, thiserror::Error)]
enum AppError {
#[error("资源不存在: {0}")]
NotFound(String),
#[error("请求参数错误: {0}")]
BadRequest(String),
#[error("未授权")]
Unauthorized,
#[error("权限不足")]
Forbidden,
#[error("数据库错误: {0}")]
Database(#[from] sqlx::Error),
#[error("内部错误: {0}")]
Internal(#[from] anyhow::Error),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match &self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND, msg.clone()),
AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg.clone()),
AppError::Unauthorized => (StatusCode::UNAUTHORIZED, "未授权".into()),
AppError::Forbidden => (StatusCode::FORBIDDEN, "权限不足".into()),
AppError::Database(e) => {
tracing::error!("数据库错误: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "服务器内部错误".into())
}
AppError::Internal(e) => {
tracing::error!("内部错误: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "服务器内部错误".into())
}
};
(status, Json(serde_json::json!({
"error": status.as_u16(),
"message": message,
}))).into_response()
}
}
4.4 中间件
use axum::{
extract::Request,
middleware::Next,
response::Response,
http::HeaderMap,
};
use std::time::Instant;
// 请求耗时日志中间件
async fn logging_middleware(req: Request, next: Next) -> Response {
let method = req.method().clone();
let uri = req.uri().clone();
let start = Instant::now();
let response = next.run(req).await;
tracing::info!(
method = %method,
uri = %uri,
status = response.status().as_u16(),
latency = ?start.elapsed(),
"HTTP request"
);
response
}
// 请求 ID 注入中间件
async fn request_id_middleware(mut req: Request, next: Next) -> Response {
let id = uuid::Uuid::new_v4().to_string();
req.headers_mut().insert(
"x-request-id",
id.parse().unwrap(),
);
next.run(req).await
}
// 注册中间件
let app = Router::new()
.route("/api", get(handler))
.layer(middleware::from_fn(logging_middleware))
.layer(middleware::from_fn(request_id_middleware));
5. gRPC 服务(tonic)
5.1 定义 Proto 文件
// proto/user.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc ListUsers (ListUsersRequest) returns (ListUsersResponse);
rpc CreateUser (CreateUserRequest) returns (UserResponse);
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}
message GetUserRequest {
int64 id = 1;
}
message CreateUserRequest {
string username = 1;
string email = 2;
}
message UserResponse {
int64 id = 1;
string username = 2;
string email = 3;
}
message ListUsersRequest {
int32 page = 1;
int32 per_page = 2;
}
message ListUsersResponse {
repeated UserResponse users = 1;
int32 total = 2;
}
message DeleteUserRequest {
int64 id = 1;
}
message DeleteUserResponse {
bool success = 1;
}
# Cargo.toml
[dependencies]
tonic = "0.11"
prost = "0.12"
[build-dependencies]
tonic-build = "0.11"
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile(&["proto/user.proto"], &["proto"])?;
Ok(())
}
5.2 gRPC 服务端
use tonic::{transport::Server, Request, Response, Status};
// 引入 proto 生成的代码
pub mod user_proto {
tonic::include_proto!("user");
}
use user_proto::{
user_service_server::{UserService, UserServiceServer},
*,
};
#[derive(Debug, Clone)]
struct UserServiceImpl {
db: sqlx::PgPool,
}
#[tonic::async_trait]
impl UserService for UserServiceImpl {
async fn get_user(
&self,
req: Request<GetUserRequest>,
) -> Result<Response<UserResponse>, Status> {
let id = req.into_inner().id;
let user = sqlx::query!(
"SELECT id, username, email FROM users WHERE id = $1",
id
)
.fetch_optional(&self.db)
.await
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found(format!("用户 {} 不存在", id)))?;
Ok(Response::new(UserResponse {
id: user.id,
username: user.username,
email: user.email,
}))
}
async fn create_user(
&self,
req: Request<CreateUserRequest>,
) -> Result<Response<UserResponse>, Status> {
let r = req.into_inner();
if r.username.is_empty() {
return Err(Status::invalid_argument("username 不能为空"));
}
let user = sqlx::query!(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email",
r.username, r.email
)
.fetch_one(&self.db)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(UserResponse {
id: user.id,
username: user.username,
email: user.email,
}))
}
async fn list_users(
&self,
req: Request<ListUsersRequest>,
) -> Result<Response<ListUsersResponse>, Status> {
let r = req.into_inner();
let limit = r.per_page.max(1).min(100) as i64;
let offset = ((r.page.max(1) - 1) * r.per_page) as i64;
let rows = sqlx::query!(
"SELECT id, username, email FROM users LIMIT $1 OFFSET $2",
limit, offset
)
.fetch_all(&self.db)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ListUsersResponse {
users: rows.into_iter().map(|r| UserResponse {
id: r.id, username: r.username, email: r.email
}).collect(),
total: limit as i32,
}))
}
async fn delete_user(
&self,
req: Request<DeleteUserRequest>,
) -> Result<Response<DeleteUserResponse>, Status> {
let id = req.into_inner().id;
sqlx::query!("DELETE FROM users WHERE id = $1", id)
.execute(&self.db)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(DeleteUserResponse { success: true }))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
let addr = "0.0.0.0:50051".parse()?;
Server::builder()
.add_service(UserServiceServer::new(UserServiceImpl { db }))
.serve(addr)
.await?;
Ok(())
}
5.3 gRPC 客户端(服务间调用)
use user_proto::user_service_client::UserServiceClient;
async fn call_user_service() -> Result<(), Box<dyn std::error::Error>> {
let mut client = UserServiceClient::connect("http://user-service:50051").await?;
// 获取用户
let response = client
.get_user(tonic::Request::new(GetUserRequest { id: 1 }))
.await?;
println!("用户: {:?}", response.into_inner());
// 创建用户
let user = client
.create_user(tonic::Request::new(CreateUserRequest {
username: "alice".to_string(),
email: "alice@example.com".to_string(),
}))
.await?
.into_inner();
println!("创建成功: {:?}", user);
Ok(())
}
5.4 gRPC 拦截器(中间件)
use tonic::{Request, Status};
// 认证拦截器
fn auth_interceptor(req: Request<()>) -> Result<Request<()>, Status> {
let token = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or_else(|| Status::unauthenticated("缺少 Authorization header"))?;
// 验证 JWT
verify_jwt(token).map_err(|_| Status::unauthenticated("Token 无效"))?;
Ok(req)
}
// 注册拦截器
Server::builder()
.add_service(
UserServiceServer::with_interceptor(UserServiceImpl { db }, auth_interceptor)
)
.serve(addr)
.await?;
6. 服务发现与注册
6.1 为什么需要服务发现
问题:微服务实例 IP 和端口动态变化(扩缩容、重启、故障迁移)
不能在配置文件中写死 "user-service:8080"
解决:
服务注册:启动时向注册中心登记(IP、端口、健康检查接口)
服务发现:调用方从注册中心查询可用实例列表
心跳维护:定时上报健康状态,注册中心剔除不健康实例
注册中心选型:
Consul ──▶ 功能全面(服务发现 + 配置 + 健康检查)
etcd ──▶ 强一致,Kubernetes 内置
Nacos ──▶ 阿里开源,Java 生态友好
ZooKeeper ──▶ 经典,较重
6.2 Consul 服务注册(Rust)
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1", features = ["derive"] }
use reqwest::Client;
use serde::Serialize;
use std::time::Duration;
use tokio::time;
#[derive(Serialize)]
struct ConsulRegistration<'a> {
#[serde(rename = "ID")]
id: &'a str,
#[serde(rename = "Name")]
name: &'a str,
#[serde(rename = "Address")]
address: &'a str,
#[serde(rename = "Port")]
port: u16,
#[serde(rename = "Tags")]
tags: Vec<&'a str>,
#[serde(rename = "Check")]
check: ConsulHealthCheck<'a>,
}
#[derive(Serialize)]
struct ConsulHealthCheck<'a> {
#[serde(rename = "HTTP")]
http: &'a str,
#[serde(rename = "Interval")]
interval: &'a str,
#[serde(rename = "Timeout")]
timeout: &'a str,
}
struct ServiceRegistry {
client: Client,
consul_url: String,
service_id: String,
}
impl ServiceRegistry {
async fn register(&self) -> anyhow::Result<()> {
let reg = ConsulRegistration {
id: &self.service_id,
name: "user-service",
address: "10.0.0.1",
port: 8080,
tags: vec!["v1", "rust"],
check: ConsulHealthCheck {
http: "http://10.0.0.1:8080/health",
interval: "10s",
timeout: "3s",
},
};
self.client
.put(format!("{}/v1/agent/service/register", self.consul_url))
.json(®)
.send()
.await?
.error_for_status()?;
tracing::info!("服务注册成功: {}", self.service_id);
Ok(())
}
async fn deregister(&self) -> anyhow::Result<()> {
self.client
.put(format!("{}/v1/agent/service/deregister/{}", self.consul_url, self.service_id))
.send()
.await?
.error_for_status()?;
tracing::info!("服务注销成功: {}", self.service_id);
Ok(())
}
}
// 在 main 中优雅关闭时注销
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let registry = ServiceRegistry {
client: Client::new(),
consul_url: "http://consul:8500".to_string(),
service_id: format!("user-service-{}", uuid::Uuid::new_v4()),
};
registry.register().await?;
// 监听 SIGTERM/SIGINT
tokio::signal::ctrl_c().await?;
registry.deregister().await?;
Ok(())
}
6.3 服务发现 + 客户端负载均衡
use rand::seq::SliceRandom;
struct ServiceDiscovery {
client: Client,
consul_url: String,
}
impl ServiceDiscovery {
async fn get_instances(&self, service: &str) -> anyhow::Result<Vec<String>> {
#[derive(serde::Deserialize)]
struct ServiceEntry {
#[serde(rename = "Service")]
service: ServiceInfo,
}
#[derive(serde::Deserialize)]
struct ServiceInfo {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "Port")]
port: u16,
}
let entries: Vec<ServiceEntry> = self.client
.get(format!("{}/v1/health/service/{}?passing=true", self.consul_url, service))
.send()
.await?
.json()
.await?;
let addresses = entries
.into_iter()
.map(|e| format!("http://{}:{}", e.service.address, e.service.port))
.collect();
Ok(addresses)
}
// 随机负载均衡
async fn pick_instance(&self, service: &str) -> anyhow::Result<String> {
let instances = self.get_instances(service).await?;
instances
.choose(&mut rand::thread_rng())
.cloned()
.ok_or_else(|| anyhow::anyhow!("没有可用的 {} 实例", service))
}
}
7. API 网关
7.1 网关职责
客户端 API Gateway 内部服务
│ │ │
│── GET /api/orders ──────▶ │── 认证 (JWT验证) ──────────▶ │
│ │── 限流 (100req/s/IP) ──────▶ │
│ │── 路由 (→ order-service) ──▶ │
│ │── 熔断 (服务不可用时降级) ──▶ │
│ │── 日志 & 链路追踪 ─────────▶ │
│◀── 200 OK ─────────────── │◀─────────────────────────── │
7.2 用 Axum 实现轻量 API 网关
use axum::{
Router,
extract::{Request, State},
http::{HeaderMap, Uri},
response::{IntoResponse, Response},
routing::any,
body::Body,
};
use reqwest::Client;
use std::{collections::HashMap, sync::Arc};
#[derive(Clone)]
struct GatewayState {
client: Client,
routes: Arc<HashMap<String, String>>, // prefix → upstream
}
async fn proxy_handler(
State(state): State<Arc<GatewayState>>,
uri: Uri,
headers: HeaderMap,
req: Request,
) -> Result<Response, AppError> {
let path = uri.path();
// 路由匹配
let upstream = state.routes
.iter()
.find(|(prefix, _)| path.starts_with(prefix.as_str()))
.map(|(_, upstream)| upstream)
.ok_or_else(|| AppError::NotFound("路由未匹配".into()))?;
let target_url = format!("{}{}", upstream, uri.path_and_query().map(|p| p.as_str()).unwrap_or(""));
// 转发请求
let method = req.method().clone();
let body = axum::body::to_bytes(req.into_body(), usize::MAX).await?;
let mut builder = state.client.request(method, &target_url);
for (name, value) in headers.iter() {
builder = builder.header(name, value);
}
let upstream_resp = builder.body(body).send().await?;
// 转发响应
let status = upstream_resp.status();
let headers = upstream_resp.headers().clone();
let body = upstream_resp.bytes().await?;
let mut resp = Response::new(Body::from(body));
*resp.status_mut() = status;
resp.headers_mut().extend(headers);
Ok(resp)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut routes = HashMap::new();
routes.insert("/api/users".to_string(), "http://user-service:8080".to_string());
routes.insert("/api/orders".to_string(), "http://order-service:8080".to_string());
routes.insert("/api/pay".to_string(), "http://pay-service:8080".to_string());
let state = Arc::new(GatewayState {
client: Client::builder().timeout(std::time::Duration::from_secs(30)).build()?,
routes: Arc::new(routes),
});
let app = Router::new()
.route("/*path", any(proxy_handler))
.with_state(state);
axum::serve(TcpListener::bind("0.0.0.0:80").await?, app).await?;
Ok(())
}
8. 熔断器与限流
8.1 熔断器(Circuit Breaker)
三种状态:
Closed(关闭) ──▶ 正常工作,记录失败次数
Open(开启) ──▶ 失败率超阈值,直接返回错误,不调用下游
Half-Open(半开)──▶ 超时后尝试放行少量请求,成功则关闭,失败则重新开启
状态转换:
Closed ──[失败率>50%]──▶ Open ──[30s后]──▶ Half-Open ──[成功]──▶ Closed
──[失败]──▶ Open
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq)]
enum CircuitState {
Closed,
Open { until: Instant },
HalfOpen,
}
struct CircuitBreaker {
state: Arc<RwLock<CircuitState>>,
failure_count: Arc<AtomicU32>,
success_count: Arc<AtomicU32>,
threshold: u32, // 失败次数阈值
reset_timeout: Duration, // Open 状态持续时间
half_open_max: u32, // Half-Open 允许的最大请求数
}
impl CircuitBreaker {
fn new(threshold: u32, reset_timeout: Duration) -> Self {
Self {
state: Arc::new(RwLock::new(CircuitState::Closed)),
failure_count: Arc::new(AtomicU32::new(0)),
success_count: Arc::new(AtomicU32::new(0)),
threshold,
reset_timeout,
half_open_max: 3,
}
}
async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
// 检查状态
{
let state = self.state.read().await;
match &*state {
CircuitState::Open { until } => {
if Instant::now() < *until {
return Err(CircuitError::Open); // 快速失败
}
// 超时,转为 Half-Open
drop(state);
*self.state.write().await = CircuitState::HalfOpen;
}
CircuitState::HalfOpen => {
let successes = self.success_count.load(Ordering::Relaxed);
if successes >= self.half_open_max {
return Err(CircuitError::Open);
}
}
CircuitState::Closed => {}
}
}
// 执行调用
match f().await {
Ok(val) => {
self.on_success().await;
Ok(val)
}
Err(e) => {
self.on_failure().await;
Err(CircuitError::Downstream(e))
}
}
}
async fn on_success(&self) {
let mut state = self.state.write().await;
if *state == CircuitState::HalfOpen {
let count = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.half_open_max {
*state = CircuitState::Closed;
self.failure_count.store(0, Ordering::Relaxed);
self.success_count.store(0, Ordering::Relaxed);
tracing::info!("熔断器关闭,恢复正常");
}
}
}
async fn on_failure(&self) {
let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= self.threshold {
let mut state = self.state.write().await;
*state = CircuitState::Open {
until: Instant::now() + self.reset_timeout,
};
tracing::warn!("熔断器开启!失败次数: {}", failures);
}
}
}
#[derive(Debug)]
enum CircuitError<E> {
Open, // 熔断器开启,快速失败
Downstream(E), // 下游实际错误
}
// 使用示例
let cb = Arc::new(CircuitBreaker::new(5, Duration::from_secs(30)));
let result = cb.call(|| async {
reqwest::get("http://user-service:8080/api/users/1").await
}).await;
match result {
Ok(resp) => println!("成功: {:?}", resp),
Err(CircuitError::Open) => println!("熔断器开启,降级处理"),
Err(CircuitError::Downstream(e)) => println!("下游错误: {}", e),
}
8.2 限流(Rate Limiting)
use tower::ServiceBuilder;
use tower_http::limit::RequestBodyLimitLayer;
// Tower 中间件限流(基于 tower::limit)
let app = Router::new()
.route("/api", get(handler))
.layer(
ServiceBuilder::new()
// 限制并发请求数
.concurrency_limit(100)
// 请求超时
.timeout(Duration::from_secs(30))
// 请求体大小限制
.layer(RequestBodyLimitLayer::new(10 * 1024 * 1024)) // 10MB
);
// 基于 Redis 的分布式限流(滑动窗口)
use redis::AsyncCommands;
struct RateLimiter {
redis: redis::Client,
limit: u64, // 窗口内最大请求数
window: u64, // 窗口大小(秒)
}
impl RateLimiter {
async fn is_allowed(&self, key: &str) -> anyhow::Result<bool> {
let mut conn = self.redis.get_async_connection().await?;
let redis_key = format!("rate_limit:{}", key);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?.as_millis() as u64;
let window_start = now - self.window * 1000;
// Lua 脚本保证原子性
let script = r#"
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window + 1)
return 1
end
return 0
"#;
let allowed: i32 = redis::Script::new(script)
.key(&redis_key)
.arg(now)
.arg(self.window)
.arg(self.limit)
.invoke_async(&mut conn)
.await?;
Ok(allowed == 1)
}
}
// 限流中间件
async fn rate_limit_middleware(
State(limiter): State<Arc<RateLimiter>>,
req: Request,
next: Next,
) -> Response {
// 以 IP 为限流 key
let ip = req.headers()
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
match limiter.is_allowed(ip).await {
Ok(true) => next.run(req).await,
Ok(false) => (
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({"error": "请求过于频繁,请稍后再试"})),
).into_response(),
Err(e) => {
tracing::error!("限流检查失败: {}", e);
next.run(req).await // 限流组件故障时放行(fail-open)
}
}
}
9. 链路追踪与可观测性
9.1 三大支柱
可观测性三支柱:
Logs(日志) ──▶ 结构化 JSON 日志,包含 trace_id
Metrics(指标) ──▶ QPS、延迟 P99、错误率、资源使用率
Traces(链路) ──▶ 跨服务调用链,定位性能瓶颈
9.2 结构化日志(tracing)
use tracing::{info, warn, error, instrument, Span};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into())
)
)
.with(tracing_subscriber::fmt::layer().json()) // JSON 格式,便于 ELK 收集
.init();
}
// #[instrument] 自动记录函数入参和耗时
#[instrument(skip(db), fields(user.id = %user_id))]
async fn get_user_by_id(user_id: i64, db: &sqlx::PgPool) -> anyhow::Result<User> {
info!("开始查询用户");
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", user_id)
.fetch_optional(db)
.await?;
match user {
Some(u) => {
info!(username = %u.username, "查询成功");
Ok(u)
}
None => {
warn!("用户不存在");
Err(anyhow::anyhow!("用户 {} 不存在", user_id))
}
}
}
9.3 OpenTelemetry 分布式追踪
[dependencies]
opentelemetry = "0.22"
opentelemetry-otlp = { version = "0.15", features = ["grpc-tonic"] }
opentelemetry_sdk = "0.22"
tracing-opentelemetry = "0.23"
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;
fn init_telemetry() -> anyhow::Result<()> {
// 发送到 Jaeger/Tempo(通过 OTLP 协议)
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://jaeger:4317")
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "user-service"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().json())
.with(OpenTelemetryLayer::new(tracer)) // 自动将 span 上报到 Jaeger
.init();
Ok(())
}
// 手动创建 span
async fn process_order(order_id: i64) {
let span = tracing::info_span!("process_order", order.id = order_id);
let _guard = span.enter();
// 子 span
{
let _s = tracing::info_span!("validate_inventory").entered();
// ...
}
{
let _s = tracing::info_span!("charge_payment").entered();
// ...
}
}
9.4 Prometheus 指标
use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;
fn init_metrics() {
PrometheusBuilder::new()
.with_http_listener(([0, 0, 0, 0], 9090)) // /metrics 接口
.install()
.expect("Prometheus 初始化失败");
}
// 在业务代码中埋点
async fn handle_request(req: Request) -> Response {
let start = Instant::now();
counter!("http.requests.total",
"method" => req.method().to_string(),
"path" => req.uri().path().to_string(),
);
let resp = process(req).await;
histogram!("http.request.duration_seconds",
start.elapsed().as_secs_f64(),
"status" => resp.status().as_u16().to_string(),
);
if resp.status().is_server_error() {
counter!("http.errors.total");
}
gauge!("active.connections", -1.0); // 请求结束,减少活跃连接数
resp
}
10. 配置中心
10.1 分层配置加载
[dependencies]
config = "0.14"
serde = { version = "1", features = ["derive"] }
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub redis: RedisConfig,
pub jwt: JwtConfig,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub url: String,
pub max_conn: u32,
pub min_conn: u32,
}
#[derive(Debug, Deserialize, Clone)]
pub struct JwtConfig {
pub secret: String,
pub expires_in: u64, // 秒
}
impl Settings {
pub fn load() -> Result<Self, ConfigError> {
let env = std::env::var("APP_ENV").unwrap_or_else(|_| "development".into());
Config::builder()
// 1. 默认配置
.add_source(File::with_name("config/default"))
// 2. 环境特定配置(覆盖默认)
.add_source(File::with_name(&format!("config/{}", env)).required(false))
// 3. 本地配置(覆盖,不提交 git)
.add_source(File::with_name("config/local").required(false))
// 4. 环境变量(最高优先级)APP_SERVER__PORT=9090
.add_source(Environment::with_prefix("APP").separator("__"))
.build()?
.try_deserialize()
}
}
# config/default.yaml
server:
host: "0.0.0.0"
port: 8080
database:
url: "postgres://postgres:password@localhost/app"
max_conn: 20
min_conn: 5
jwt:
secret: "change-me-in-production"
expires_in: 86400
10.2 动态配置(Consul KV / etcd)
use etcd_client::{Client, WatchOptions};
async fn watch_config(key: &str) -> anyhow::Result<()> {
let mut client = Client::connect(["http://etcd:2379"], None).await?;
// 获取当前值
let resp = client.get(key, None).await?;
if let Some(kv) = resp.kvs().first() {
println!("当前配置: {}", kv.value_str()?);
}
// 监听变化(热更新)
let (_, mut stream) = client.watch(key, Some(WatchOptions::new())).await?;
while let Some(resp) = stream.message().await? {
for event in resp.events() {
if let Some(kv) = event.kv() {
println!("配置更新: {} = {}", kv.key_str()?, kv.value_str()?);
// 通知应用层重新加载配置
// config_tx.send(kv.value_str()?.to_string()).await?;
}
}
}
Ok(())
}
11. 数据层设计
11.1 每个服务独立数据库
原则:服务间绝不共享数据库,各自拥有独立的 schema 或独立的数据库实例
user-service ──▶ postgres://user-db/users
order-service ──▶ postgres://order-db/orders
pay-service ──▶ postgres://pay-db/payments
跨服务查询通过 API 调用实现,不用 JOIN
11.2 sqlx 数据访问层
use sqlx::{PgPool, PgTransaction};
// Repository 模式:隔离数据访问逻辑
struct UserRepository {
pool: PgPool,
}
impl UserRepository {
async fn find_by_id(&self, id: i64) -> anyhow::Result<Option<User>> {
Ok(sqlx::query_as!(User,
"SELECT id, username, email, created_at FROM users WHERE id = $1",
id
)
.fetch_optional(&self.pool)
.await?)
}
async fn find_by_email(&self, email: &str) -> anyhow::Result<Option<User>> {
Ok(sqlx::query_as!(User,
"SELECT id, username, email, created_at FROM users WHERE email = $1",
email
)
.fetch_optional(&self.pool)
.await?)
}
// 在事务中执行
async fn create_with_tx(&self, tx: &mut PgTransaction<'_>, req: &CreateUserReq) -> anyhow::Result<User> {
Ok(sqlx::query_as!(User,
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, username, email, created_at",
req.username, req.email
)
.fetch_one(&mut **tx)
.await?)
}
}
// 事务使用示例
async fn register_user(pool: &PgPool, req: CreateUserReq) -> anyhow::Result<User> {
let mut tx = pool.begin().await?;
let repo = UserRepository { pool: pool.clone() };
// 检查 email 唯一性
if repo.find_by_email(&req.email).await?.is_some() {
return Err(anyhow::anyhow!("邮箱已注册"));
}
let user = repo.create_with_tx(&mut tx, &req).await?;
// 发送欢迎邮件事件(写 outbox,同一事务)
sqlx::query!(
"INSERT INTO outbox (topic, payload) VALUES ($1, $2)",
"user.registered",
serde_json::to_string(&user)?
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(user)
}
11.3 数据库迁移(sqlx migrate)
# 创建迁移文件
sqlx migrate add create_users_table
# 执行迁移
sqlx migrate run
# 回滚
sqlx migrate revert
-- migrations/20260101_create_users_table.sql
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
// 应用启动时自动执行迁移
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("数据库迁移失败");
12. 服务间异步通信(Event-Driven)
12.1 事件驱动架构
同步调用: 事件驱动:
OrderService ──HTTP──▶ UserService OrderService ──[order.created]──▶ MQ
├──▶ UserService(更新积分)
耦合,链式依赖 ├──▶ NotifyService(发通知)
└──▶ AnalyticsService(统计)
解耦,扇出,异步
12.2 事件定义规范
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
// 基础事件结构(所有事件的公共信息)
#[derive(Debug, Serialize, Deserialize)]
pub struct DomainEvent<T> {
pub id: String, // 事件唯一 ID(幂等键)
pub event_type: String, // "order.created" / "user.registered"
pub version: u32, // schema 版本,用于兼容演进
pub source: String, // 来源服务
pub occurred_at: DateTime<Utc>, // 事件发生时间
pub data: T, // 业务数据
}
// 具体业务事件
#[derive(Debug, Serialize, Deserialize)]
pub struct OrderCreatedEvent {
pub order_id: i64,
pub user_id: i64,
pub items: Vec<OrderItem>,
pub total: f64,
}
// 构造函数
impl<T: Serialize> DomainEvent<T> {
pub fn new(event_type: &str, source: &str, data: T) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
event_type: event_type.to_string(),
version: 1,
source: source.to_string(),
occurred_at: Utc::now(),
data,
}
}
}
12.3 事件发布(结合 Outbox 模式)
// 业务操作 + 写 outbox,同一数据库事务,保证原子性
pub async fn create_order(pool: &PgPool, req: CreateOrderReq) -> anyhow::Result<Order> {
let mut tx = pool.begin().await?;
// 写订单
let order = sqlx::query_as!(Order,
"INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *",
req.user_id, req.total
).fetch_one(&mut *tx).await?;
// 写 outbox(同一事务)
let event = DomainEvent::new(
"order.created",
"order-service",
OrderCreatedEvent { order_id: order.id, user_id: order.user_id, items: req.items, total: order.total },
);
sqlx::query!(
"INSERT INTO outbox (id, topic, payload, status) VALUES ($1, $2, $3, 'pending')",
event.id,
"order.created",
serde_json::to_value(&event)?
).execute(&mut *tx).await?;
tx.commit().await?;
Ok(order)
}
// Outbox Relay:轮询 outbox,发送到 Kafka
pub async fn outbox_relay(pool: &PgPool, producer: &FutureProducer) {
loop {
let rows = sqlx::query!(
"SELECT id, topic, payload FROM outbox WHERE status = 'pending' ORDER BY created_at LIMIT 50 FOR UPDATE SKIP LOCKED"
)
.fetch_all(pool)
.await
.unwrap_or_default();
for row in rows {
let record = FutureRecord::to(&row.topic)
.key(&row.id)
.payload(&row.payload.to_string());
let status = match producer.send(record, Duration::from_secs(5)).await {
Ok(_) => "sent",
Err(_) => "failed",
};
sqlx::query!("UPDATE outbox SET status = $1 WHERE id = $2", status, row.id)
.execute(pool)
.await
.ok();
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
13. 分布式事务
13.1 Saga 模式(推荐)
Saga = 一系列本地事务,每个本地事务完成后发布事件触发下一步
失败时执行补偿事务(撤销前面的操作)
下单 Saga:
1. OrderService 创建订单(PENDING) ──▶ 发布 [OrderCreated]
2. InventoryService 锁定库存 ──▶ 发布 [InventoryReserved]
3. PayService 扣款 ──▶ 发布 [PaymentCompleted]
4. OrderService 更新订单为 CONFIRMED
失败补偿(反向执行):
扣款失败 ──▶ 发布 [PaymentFailed]
──▶ InventoryService 释放库存
──▶ OrderService 取消订单
// Saga 状态机实现
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SagaState {
OrderCreated,
InventoryReserved,
PaymentProcessed,
Completed,
// 补偿状态
CompensatingInventory,
CompensatingOrder,
Failed,
}
pub struct OrderSaga {
pub saga_id: String,
pub order_id: i64,
pub state: SagaState,
}
impl OrderSaga {
pub fn next_state(&self, event: &str) -> Option<SagaState> {
match (&self.state, event) {
(SagaState::OrderCreated, "inventory.reserved") => Some(SagaState::InventoryReserved),
(SagaState::InventoryReserved, "payment.completed") => Some(SagaState::PaymentProcessed),
(SagaState::PaymentProcessed, "order.confirmed") => Some(SagaState::Completed),
// 失败补偿
(SagaState::InventoryReserved, "payment.failed") => Some(SagaState::CompensatingInventory),
(SagaState::OrderCreated, "inventory.failed") => Some(SagaState::CompensatingOrder),
_ => None,
}
}
}
14. 容器化与部署
14.1 多阶段 Dockerfile(最小镜像)
# ---- 构建阶段 ----
FROM rust:1.78-slim AS builder
WORKDIR /app
# 依赖缓存层(只有 Cargo.toml 变化才重新下载)
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && echo "fn main(){}" > src/main.rs
RUN cargo build --release
RUN rm src/main.rs
# 编译实际代码
COPY src ./src
RUN touch src/main.rs && cargo build --release
# ---- 运行阶段(最小镜像)----
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY --from=builder /app/target/release/user-service .
COPY config ./config
# 非 root 用户运行
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8080
HEALTHCHECK --interval=10s --timeout=3s CMD curl -f http://localhost:8080/health || exit 1
ENTRYPOINT ["./user-service"]
14.2 Docker Compose(本地开发环境)
# docker-compose.yml
version: "3.8"
services:
user-service:
build: ./user-service
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgres://postgres:password@postgres:5432/users
- REDIS_URL=redis://redis:6379
- APP_ENV=development
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
order-service:
build: ./order-service
ports:
- "8081:8080"
environment:
- DATABASE_URL=postgres://postgres:password@postgres:5432/orders
- KAFKA_BROKERS=kafka:9092
depends_on: [postgres, kafka]
postgres:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: password
volumes:
- pgdata:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
depends_on: [zookeeper]
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
volumes:
pgdata:
14.3 Kubernetes 部署
# k8s/user-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: myregistry/user-service:v1.2.0
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: user-service-secrets
key: database-url
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: user-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: user-service
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
15. 安全:认证与授权
15.1 JWT 认证中间件
[dependencies]
jsonwebtoken = "9"
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
pub sub: i64, // 用户 ID
pub email: String,
pub roles: Vec<String>,
pub exp: usize, // 过期时间(Unix 时间戳)
pub iat: usize, // 签发时间
}
pub fn create_token(claims: &Claims, secret: &[u8]) -> anyhow::Result<String> {
Ok(encode(
&Header::default(),
claims,
&EncodingKey::from_secret(secret),
)?)
}
pub fn verify_token(token: &str, secret: &[u8]) -> anyhow::Result<Claims> {
let data = decode::<Claims>(
token,
&DecodingKey::from_secret(secret),
&Validation::default(),
)?;
Ok(data.claims)
}
// JWT 认证中间件
pub async fn auth_middleware(
State(secret): State<Arc<String>>,
mut req: Request,
next: Next,
) -> Result<Response, AppError> {
let token = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or(AppError::Unauthorized)?;
let claims = verify_token(token, secret.as_bytes())
.map_err(|_| AppError::Unauthorized)?;
// 将 claims 注入请求扩展,后续 handler 可提取
req.extensions_mut().insert(claims);
Ok(next.run(req).await)
}
// Handler 中提取已认证用户
async fn get_profile(
Extension(claims): Extension<Claims>,
State(state): State<Arc<AppState>>,
) -> Result<JsonResponse<User>, AppError> {
let user = state.user_repo.find_by_id(claims.sub).await?
.ok_or(AppError::NotFound("用户不存在".into()))?;
Ok(JsonResponse(user))
}
15.2 RBAC 权限控制
// 基于角色的访问控制
fn require_role(role: &'static str) -> impl Fn(Extension<Claims>, Request, Next) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send>> + Clone {
move |Extension(claims): Extension<Claims>, req: Request, next: Next| {
Box::pin(async move {
if claims.roles.contains(&role.to_string()) {
next.run(req).await
} else {
(StatusCode::FORBIDDEN, "权限不足").into_response()
}
})
}
}
// 路由级别授权
let admin_routes = Router::new()
.route("/admin/users", get(list_all_users))
.route_layer(middleware::from_fn_with_state(state.clone(), auth_middleware))
.route_layer(middleware::from_fn(require_role("admin")));
16. 生产实践与最佳实践
优雅关闭
use tokio::signal;
async fn graceful_shutdown() {
let ctrl_c = async {
signal::ctrl_c().await.expect("CTRL+C 监听失败");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("SIGTERM 监听失败")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => tracing::info!("收到 CTRL+C"),
_ = terminate => tracing::info!("收到 SIGTERM"),
}
tracing::info!("开始优雅关闭...");
}
// 在 main 中使用
axum::serve(listener, app)
.with_graceful_shutdown(graceful_shutdown())
.await?;
健康检查接口
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
database: &'static str,
version: &'static str,
}
async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// 检查数据库连接
let db_ok = sqlx::query("SELECT 1")
.fetch_one(&state.db)
.await
.is_ok();
let status = if db_ok { "ok" } else { "degraded" };
let http_status = if db_ok { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE };
(http_status, Json(HealthResponse {
status,
database: if db_ok { "ok" } else { "error" },
version: env!("CARGO_PKG_VERSION"),
}))
}
常见反模式
❌ 服务间共享数据库 → 每个服务独立数据库
❌ 链式同步调用(A→B→C→D) → 超时雪崩,改用异步事件
❌ 无熔断器直接调用下游 → 下游故障会拖垮调用方
❌ 日志不带 trace_id → 跨服务问题无法排查
❌ 配置硬编码在代码中 → 环境变量 + 配置文件
❌ 容器内用 root 运行 → 安全风险,用非特权用户
❌ 忽略 graceful shutdown → 正在处理的请求被强制中断
❌ 不做健康检查 → k8s 无法判断服务是否就绪
架构演进路径
第一阶段:单体起步
一个 Axum 服务,一个数据库,快速验证业务
第二阶段:功能模块化
同进程内,代码按 module 隔离,为拆分做准备
第三阶段:垂直拆分(按团队 / 流量热点)
高频访问的模块率先独立为微服务(如用户服务、商品服务)
第四阶段:完整微服务 + 服务网格
服务发现 + API 网关 + Sidecar(Istio/Linkerd)
原则:不要提前过度设计,根据实际痛点驱动演进