版本: 1.0
日期: 2026-02-06
状态: DRAFT — 待评审
将 FS9 从"可运行的开发阶段"提升至"支撑百万级用户的生产系统"。
改造分两个阶段:
| 阶段 | 目标 | 改造项 |
|---|---|---|
| Phase 1 (P0) | 不做就不能上线 | 5 项 |
| Phase 2 (P1) | 百万用户前必须完成 | 7 项 |
Client → [LB] → fs9-server (Axum) → VfsRouter → Provider (memfs/pagefs/localfs/plugin)
↓
fs9-meta (SQLite)
核心瓶颈:fs9-server 有状态(Handle 在内存)、无可观测性、无容错韧性。
- 最小侵入: 优先改 middleware 层和配置,避免改动 FsProvider trait
- 渐进式: 每个改造项可独立合并、独立回滚
- 向后兼容: 不破坏现有 API 契约和 SDK 版本
- 可验证: 每项改造附带明确的验收测试
server/src/main.rs:107:
axum::serve(listener, app).await.unwrap();SIGTERM 立即终止进程。进行中的请求被中断,open 的 file handle 永远不会被 close,provider 侧资源泄漏。
SIGTERM/SIGINT
│
▼
┌─────────────────────┐
│ Shutdown Signal │
│ (tokio::signal) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Stop accepting │
│ new connections │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Drain in-flight │
│ requests (30s) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Close all handles │
│ per namespace │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Unload plugins │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Exit(0) │
└─────────────────────┘
| 文件 | 改动 |
|---|---|
server/src/main.rs |
添加 shutdown signal handler + graceful shutdown 逻辑 |
server/src/namespace.rs |
新增 NamespaceManager::drain_all() 方法 |
core/src/handle.rs |
新增 HandleRegistry::close_all() 方法 |
config/src/types.rs |
新增 shutdown_timeout_secs 配置项 |
server/src/main.rs:
// 替换现有的 axum::serve 调用
let shutdown_timeout = Duration::from_secs(
config.server.shutdown_timeout_secs.unwrap_or(30)
);
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(shutdown_timeout, state.clone()))
.await
.unwrap();
async fn shutdown_signal(timeout: Duration, state: Arc<AppState>) {
let ctrl_c = async {
tokio::signal::ctrl_c().await.expect("install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => tracing::info!("Received Ctrl+C, shutting down"),
() = terminate => tracing::info!("Received SIGTERM, shutting down"),
}
// Drain phase: close all open handles across all namespaces
tracing::info!("Draining open handles...");
state.namespace_manager.drain_all().await;
tracing::info!("Shutdown complete");
}core/src/handle.rs — 新增 close_all:
/// Close all handles in the registry. Used during graceful shutdown.
/// Returns count of handles closed.
pub async fn close_all(&self) -> usize {
let mut total = 0;
for shard in &self.shards {
let handles: Vec<(HandleId, HandleState)> = {
let mut map = shard.handles.write().await;
map.drain().collect()
};
// 锁已释放,逐个 close
for (id, state) in handles {
let _ = state.provider.close(state.provider_handle, true).await;
total += 1;
tracing::debug!(handle_id = id, path = %state.path, "Closed handle during shutdown");
}
}
total
}server/src/namespace.rs — 新增 drain_all:
impl NamespaceManager {
/// Drain all namespaces: close all open handles. Used during graceful shutdown.
pub async fn drain_all(&self) {
let namespaces: Vec<Arc<Namespace>> = {
self.namespaces.read().await
.values()
.map(|(ns, _)| ns.clone())
.collect()
};
for ns in namespaces {
let count = ns.handle_registry.close_all().await;
if count > 0 {
tracing::info!(namespace = %ns.name, closed = count, "Drained handles");
}
}
}
}server:
shutdown_timeout_secs: 30 # 新增,默认 30 秒-
kill -SIGTERM <pid>后,正在执行的 read/write 请求能正常完成 - 所有 open handle 被 close(通过日志验证 "Drained handles" 输出)
- 新连接在 shutdown 阶段被拒绝(503)
- 超过
shutdown_timeout_secs后强制退出
server/src/main.rs:100 仅有全局并发限制:
.layer(ConcurrencyLimitLayer::new(max_concurrent)) // 默认 1000任意单租户可耗尽所有 1000 并发槽,形成"吵闹邻居"问题。
Request → Auth Middleware → Rate Limit Middleware → Handler
│ │
│ ├─ 全局: 总并发 ≤ N
│ ├─ Per-NS: 每 namespace QPS ≤ M
│ └─ Per-User: 每 user QPS ≤ K
│
└─ 提取 namespace + user_id
采用两层限流:
- 全局层(已有):
ConcurrencyLimitLayer保护总体资源 - 租户层(新增): 基于
governorcrate 的令牌桶算法,按(namespace, user_id)分桶
| 文件 | 改动 |
|---|---|
server/Cargo.toml |
新增 governor 依赖 |
server/src/rate_limit.rs |
新文件: 限流 middleware 实现 |
server/src/main.rs |
插入限流 layer |
server/src/lib.rs |
导出新模块 |
config/src/types.rs |
新增限流配置项 |
server/src/rate_limit.rs(新文件):
use axum::{body::Body, extract::Request, http::StatusCode, middleware::Next, response::Response};
use governor::{DefaultKeyedRateLimiter, Quota, RateLimiter};
use governor::clock::DefaultClock;
use governor::state::keyed::DashMapStateStore;
use std::num::NonZeroU32;
use std::sync::Arc;
use crate::auth::RequestContext;
/// Composite key for rate limiting: (namespace, user_id)
type RateLimitKey = String;
#[derive(Clone)]
pub struct RateLimitState {
/// Per-namespace limiter
ns_limiter: Arc<DefaultKeyedRateLimiter<String>>,
/// Per-user limiter (key = "ns:user")
user_limiter: Arc<DefaultKeyedRateLimiter<String>>,
enabled: bool,
}
impl RateLimitState {
pub fn new(ns_qps: u32, user_qps: u32) -> Self {
let ns_limiter = Arc::new(RateLimiter::dashmap(
Quota::per_second(NonZeroU32::new(ns_qps).unwrap_or(NonZeroU32::new(1000).unwrap())),
));
let user_limiter = Arc::new(RateLimiter::dashmap(
Quota::per_second(NonZeroU32::new(user_qps).unwrap_or(NonZeroU32::new(100).unwrap())),
));
Self {
ns_limiter,
user_limiter,
enabled: true,
}
}
pub fn disabled() -> Self {
Self {
ns_limiter: Arc::new(RateLimiter::dashmap(
Quota::per_second(NonZeroU32::new(1).unwrap()),
)),
user_limiter: Arc::new(RateLimiter::dashmap(
Quota::per_second(NonZeroU32::new(1).unwrap()),
)),
enabled: false,
}
}
}
pub async fn rate_limit_middleware(
axum::extract::State(state): axum::extract::State<RateLimitState>,
request: Request<Body>,
next: Next,
) -> Response {
if !state.enabled {
return next.run(request).await;
}
// Skip rate limiting for health checks
if request.uri().path() == "/health" {
return next.run(request).await;
}
// Extract context (set by auth middleware which runs before this)
let ctx = request.extensions().get::<RequestContext>();
if let Some(ctx) = ctx {
// Check namespace-level limit
if state.ns_limiter.check_key(&ctx.ns).is_err() {
return (
StatusCode::TOO_MANY_REQUESTS,
"Namespace rate limit exceeded",
).into_response();
}
// Check user-level limit
let user_key = format!("{}:{}", ctx.ns, ctx.user_id);
if state.user_limiter.check_key(&user_key).is_err() {
return (
StatusCode::TOO_MANY_REQUESTS,
"User rate limit exceeded",
).into_response();
}
}
next.run(request).await
}server/src/main.rs — 限流 layer 插入位置(在 auth 之后):
// Layer 顺序(从外到内):
// TraceLayer → ConcurrencyLimit → Timeout → RateLimit → Auth → Handler
let rate_limit_state = RateLimitState::new(
config.server.rate_limit.namespace_qps.unwrap_or(1000),
config.server.rate_limit.user_qps.unwrap_or(100),
);
let app = api::create_router(state)
.layer(middleware::from_fn_with_state(
rate_limit_state,
rate_limit::rate_limit_middleware,
))
.layer(middleware::from_fn_with_state(
auth_middleware_state,
auth::auth_middleware,
))
.layer(TimeoutLayer::new(request_timeout))
.layer(ConcurrencyLimitLayer::new(max_concurrent))
.layer(TraceLayer::new_for_http());注意: Rate limit 层必须在 auth 层之上(即在请求流中 auth 先执行),因为限流需要
RequestContext中的 namespace 和 user_id。
server:
rate_limit:
namespace_qps: 1000 # 每 namespace 每秒最大请求数,默认 1000
user_qps: 100 # 每 user 每秒最大请求数,默认 100- 单用户超过
user_qps后收到429 Too Many Requests - 同 namespace 其他用户不受影响
- Health endpoint 不受限流影响
- 限流关闭时(配置不设置或
enabled: false)无性能开销
零可观测性指标。无法监控 QPS、延迟分位数、错误率、handle 数量等关键指标。
采用 metrics facade + metrics-exporter-prometheus 方案。
┌─────────────────────────┐
│ fs9-server │
│ │
Request ──────► │ ┌──────────────────┐ │ GET /metrics
│ │ Metrics Middleware│ │ ◄──── Prometheus
│ │ (per-request) │ │
│ └──────┬───────────┘ │
│ │ │
│ ┌──────▼───────────┐ │
│ │ metrics crate │ │
│ │ (facade/macros) │ │
│ └──────┬───────────┘ │
│ │ │
│ ┌──────▼───────────┐ │
│ │ Prometheus │ │
│ │ Exporter (HTTP) │ │
│ └─────────────────┘ │
└─────────────────────────┘
| 指标名 | 类型 | Labels | 说明 |
|---|---|---|---|
fs9_http_requests_total |
Counter | method, path, status, namespace |
请求总数 |
fs9_http_request_duration_seconds |
Histogram | method, path, namespace |
请求延迟 |
fs9_active_handles |
Gauge | namespace |
当前打开的 handle 数 |
fs9_active_namespaces |
Gauge | — | 活跃 namespace 数 |
fs9_token_cache_hits_total |
Counter | — | Token 缓存命中 |
fs9_token_cache_misses_total |
Counter | — | Token 缓存未命中 |
fs9_token_cache_size |
Gauge | — | Token 缓存条目数 |
fs9_meta_requests_total |
Counter | endpoint, status |
到 meta 服务的请求数 |
fs9_meta_request_duration_seconds |
Histogram | endpoint |
到 meta 服务的延迟 |
fs9_provider_operations_total |
Counter | operation, provider, namespace |
Provider 操作计数 |
fs9_provider_operation_duration_seconds |
Histogram | operation, provider |
Provider 操作延迟 |
| 文件 | 改动 |
|---|---|
server/Cargo.toml |
新增 metrics, metrics-exporter-prometheus 依赖 |
server/src/metrics.rs |
新文件: 指标定义 + metrics middleware + exporter 初始化 |
server/src/main.rs |
初始化 exporter,挂载 /metrics endpoint |
server/src/api/mod.rs |
添加 /metrics route |
server/src/token_cache.rs |
在 get/set 中埋点 |
server/src/meta_client.rs |
在 HTTP 调用中埋点 |
server/src/lib.rs |
导出新模块 |
server/src/metrics.rs(新文件):
use axum::{body::Body, extract::Request, middleware::Next, response::Response};
use metrics::{counter, gauge, histogram};
use std::time::Instant;
use crate::auth::RequestContext;
/// Initialize the Prometheus metrics exporter.
/// Returns the PrometheusHandle for installing the /metrics endpoint.
pub fn init_exporter() -> metrics_exporter_prometheus::PrometheusHandle {
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
builder
.install_recorder()
.expect("failed to install Prometheus recorder")
}
/// Axum middleware that records per-request metrics.
pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
let method = request.method().clone();
let path = request.uri().path().to_string();
// Normalize path: strip dynamic segments for cardinality control
let path_label = normalize_path(&path);
let ns = request
.extensions()
.get::<RequestContext>()
.map(|ctx| ctx.ns.clone())
.unwrap_or_else(|| "unknown".to_string());
let start = Instant::now();
let response = next.run(request).await;
let duration = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();
let labels = [
("method", method.to_string()),
("path", path_label),
("status", status),
("namespace", ns),
];
counter!("fs9_http_requests_total", &labels).increment(1);
histogram!("fs9_http_request_duration_seconds", &labels[..3]).record(duration);
response
}
/// Normalize API paths to reduce label cardinality.
/// `/api/v1/namespaces/myns` → `/api/v1/namespaces/:ns`
fn normalize_path(path: &str) -> String {
if path.starts_with("/api/v1/namespaces/") && path.matches('/').count() == 4 {
return "/api/v1/namespaces/:ns".to_string();
}
path.to_string()
}
/// Handler for GET /metrics — returns Prometheus text format.
pub async fn metrics_handler(
axum::extract::State(handle): axum::extract::State<metrics_exporter_prometheus::PrometheusHandle>,
) -> String {
handle.render()
}Token cache 埋点 — server/src/token_cache.rs:
// 在 get() 中:
pub async fn get(&self, token: &str) -> Option<CachedToken> {
let result = self.cache.get(token).await.and_then(|entry| { ... });
match &result {
Some(_) => metrics::counter!("fs9_token_cache_hits_total").increment(1),
None => metrics::counter!("fs9_token_cache_misses_total").increment(1),
}
result
}server:
metrics:
enabled: true # 默认 true
path: "/metrics" # 默认 /metrics-
GET /metrics返回 Prometheus text format - 发送 10 个请求后,
fs9_http_requests_total计数正确 - 延迟直方图包含合理的 bucket 分布
-
/metrics本身不被 metrics middleware 记录(避免递归) - Grafana 可正常导入并展示 dashboard
meta/src/db/mod.rs:43:
#[cfg(feature = "postgres")]
{
todo!("PostgreSQL support not yet implemented");
}SQLite 单节点最大 5 连接,无法支撑百万用户的 token 验证频率。
实现 PostgresStore,与 SqliteStore 接口完全对齐。
MetaStore (enum)
├── Sqlite(SqliteStore) ← 已有
└── Postgres(PostgresStore) ← 新增
| 文件 | 改动 |
|---|---|
meta/Cargo.toml |
确认 postgres feature 已声明(已有) |
meta/src/db/postgres.rs |
新文件: PostgresStore 完整实现 |
meta/src/db/mod.rs |
添加 #[cfg(feature = "postgres")] mod postgres; + MetaStore 枚举分支 |
meta/src/db/postgres.rs 核心结构:
use sqlx::{postgres::PgPoolOptions, PgPool};
pub struct PostgresStore {
pool: PgPool,
}
impl PostgresStore {
pub async fn connect(dsn: &str) -> Result<Self> {
let pool = PgPoolOptions::new()
.max_connections(50) // 生产建议 50-200
.min_connections(5) // 保持最小连接避免冷启动
.acquire_timeout(Duration::from_secs(5))
.idle_timeout(Duration::from_secs(300))
.connect(dsn)
.await
.map_err(|e| MetaError::Database(format!("PG connect failed: {e}")))?;
Ok(Self { pool })
}
}SQL 差异处理:
| 操作 | SQLite | PostgreSQL |
|---|---|---|
| 占位符 | ? |
$1, $2, ... |
| 自增 ID | 不使用(UUID) | 不使用(UUID) |
| 时间类型 | TEXT (ISO8601) | TIMESTAMPTZ |
| 布尔 | INTEGER | BOOLEAN |
| JSON | TEXT | JSONB |
由于 FS9 的 meta service 使用 UUID 主键和文本时间戳,大部分 SQL 可以几乎原样复用。
主要差异在:
- 占位符语法
?→$N - 时间列类型建议改为
TIMESTAMPTZ(但可继续用 TEXT 保持一致性,代价是无法用 PG 时间函数)
推荐方案: 时间列使用 TIMESTAMPTZ,利用 sqlx 的 chrono::DateTime<Utc> 自动绑定。
meta/src/db/mod.rs — 扩展 MetaStore enum:
#[derive(Clone)]
pub enum MetaStore {
#[cfg(feature = "sqlite")]
Sqlite(SqliteStore),
#[cfg(feature = "postgres")]
Postgres(PostgresStore),
}
// 每个方法增加分支:
pub async fn create_namespace(&self, name: &str, created_by: &str) -> Result<Namespace> {
match self {
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.create_namespace(name, created_by).await,
#[cfg(feature = "postgres")]
Self::Postgres(store) => store.create_namespace(name, created_by).await,
}
}Migration: PostgresStore 的 migrate() 使用相同表结构,但使用 PG 原生类型:
CREATE TABLE IF NOT EXISTS namespaces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT NOT NULL,
updated_at TIMESTAMPTZ,
deleted_at TIMESTAMPTZ
);
-- 索引与 SQLite 版本一致
CREATE INDEX IF NOT EXISTS idx_namespaces_name ON namespaces(name);# meta.yaml
database:
# SQLite (默认,开发/测试)
dsn: "sqlite:fs9-meta.db"
# PostgreSQL (生产)
# dsn: "postgres://fs9:password@pg-host:5432/fs9_meta"
# max_connections: 50 # 新增
# min_connections: 5 # 新增-
cargo build -p fs9-meta --features postgres编译通过 - 所有现有 SQLite 测试在 Postgres 后端上同样通过
- 连接池配置可调
- 使用 Postgres 时
fs9-meta可承受 1000+ QPS 的 token validation
Axum 默认 body limit 2MB(axum 0.7+ 默认开启),但未被显式配置,行为不透明。
写入大文件时用户得到的错误信息不友好。
显式设置全局 body limit + write 端点单独的更大 limit。
| 文件 | 改动 |
|---|---|
server/src/main.rs |
添加 DefaultBodyLimit layer |
server/src/api/mod.rs |
write 路由单独设置更大 limit |
config/src/types.rs |
新增 max_body_size 配置 |
server/src/main.rs:
use axum::extract::DefaultBodyLimit;
// 全局默认限制: 2MB(给 JSON API 请求用)
let default_body_limit = config.server.max_body_size_bytes.unwrap_or(2 * 1024 * 1024);
// Write 端点限制: 256MB(大文件上传)
let write_body_limit = config.server.max_write_size_bytes.unwrap_or(256 * 1024 * 1024);
let app = api::create_router(state, write_body_limit)
.layer(DefaultBodyLimit::max(default_body_limit))
// ... 其他 layersserver/src/api/mod.rs — write 路由单独 limit:
pub fn create_router(state: Arc<AppState>, write_body_limit: usize) -> Router {
Router::new()
// ... 其他路由
.route("/api/v1/write", post(handlers::write)
.layer(DefaultBodyLimit::max(write_body_limit)))
// ...
.with_state(state)
}server:
max_body_size_bytes: 2097152 # 2MB,默认值,控制 JSON API 请求
max_write_size_bytes: 268435456 # 256MB,默认值,控制文件写入- 超过 limit 的请求返回
413 Payload Too Large - write 端点可以上传 > 2MB 但 ≤ 256MB 的文件
- 其他端点(open/close/stat 等)body > 2MB 被拒绝
HandleRegistry 和 HandleMap 在内存中,多实例部署时 handle 不跨实例共享。
不改应用代码。通过 LB 层 + 文档解决。
┌──────────────────┐
│ Load Balancer │
│ │
│ hash(namespace) │──► 固定路由到同一实例
│ 或 cookie-based │
└──────┬───────────┘
│
┌──────────┼──────────┐
│ │ │
┌──────▼──┐ ┌─────▼───┐ ┌───▼──────┐
│ Server1 │ │ Server2 │ │ Server3 │
│ NS: a,b │ │ NS: c,d │ │ NS: e,f │
└─────────┘ └─────────┘ └──────────┘
路由策略: hash(JWT.ns claim) % instance_count
- 优点: 零代码改动,同一 namespace 的所有请求路由到同一实例
- 缺点: namespace 粒度的"热点"可能导致不均衡
- 缓解: 监控每实例 handle 数(§2.3 metrics),配合 namespace 再分配
| 交付物 | 说明 |
|---|---|
docs/deployment/sticky-session.md |
部署指南文档 |
docs/deployment/nginx-sticky.conf |
Nginx 配置示例 |
docs/deployment/envoy-sticky.yaml |
Envoy 配置示例 |
| Health check 增强 | /health 返回 instance_id 用于 debug |
upstream fs9_servers {
# 基于 $http_authorization 中 JWT 的 ns claim 做一致性 hash
# 简化方案: 基于 Authorization header hash
hash $http_authorization consistent;
server fs9-1:9999;
server fs9-2:9999;
server fs9-3:9999;
}- Nginx/Envoy 配置文档完成并验证
- 同一 namespace 的请求稳定路由到同一实例(日志验证)
- 实例下线后,LB 自动将 namespace 迁移到其他实例
- Health endpoint 返回 instance_id
server/src/meta_client.rs 硬编码 10s timeout,无重试,无熔断。
server/src/auth.rs:382-389 meta 不可用时 fallback 到本地 JWT,但每个请求仍尝试一次 HTTP 调用(10s timeout 浪费)。
Token Validation Request
│
▼
┌─────────────┐ CLOSED ┌──────────────┐
│ Circuit │──────────────►│ Meta Client │
│ Breaker │ │ HTTP Call │
│ │◄──────────────│ │
│ OPEN: │ success/ └──────────────┘
│ skip HTTP, │ failure
│ 直接 local │
│ JWT decode │
└─────┬───────┘
│ OPEN / HALF_OPEN
▼
┌─────────────┐
│ Local JWT │
│ Decode │
│ (fallback) │
└─────────────┘
熔断器状态机:
- CLOSED: 正常转发请求到 meta。连续 5 次失败 → OPEN
- OPEN: 跳过 meta,直接 local JWT decode。持续 30s → HALF_OPEN
- HALF_OPEN: 放过 1 个请求到 meta。成功 → CLOSED,失败 → OPEN
| 文件 | 改动 |
|---|---|
server/src/circuit_breaker.rs |
新文件: 通用熔断器实现 |
server/src/meta_client.rs |
集成熔断器 + 重试逻辑 |
server/src/auth.rs |
使用熔断器包装的 meta client |
server/src/state.rs |
AppState 持有 CircuitBreaker |
server/src/lib.rs |
导出新模块 |
config/src/types.rs |
新增熔断器配置 |
server/src/circuit_breaker.rs(新文件):
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
pub struct CircuitBreaker {
state: RwLock<CircuitState>,
failure_count: AtomicU32,
failure_threshold: u32,
recovery_timeout: Duration,
last_failure: RwLock<Option<Instant>>,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self { ... }
pub async fn state(&self) -> CircuitState { ... }
pub async fn record_success(&self) { ... }
pub async fn record_failure(&self) { ... }
pub async fn allow_request(&self) -> bool { ... }
}重试策略: 在 MetaClient::validate_token 中:
impl MetaClient {
pub async fn validate_token_with_retry(
&self,
token: &str,
circuit_breaker: &CircuitBreaker,
) -> Result<ValidateResponse, MetaClientError> {
if !circuit_breaker.allow_request().await {
return Err(MetaClientError::CircuitOpen);
}
let mut last_err = None;
for attempt in 0..3 {
if attempt > 0 {
// 指数退避: 100ms, 200ms
tokio::time::sleep(Duration::from_millis(100 * (1 << attempt))).await;
}
match self.validate_token(token).await {
Ok(resp) => {
circuit_breaker.record_success().await;
return Ok(resp);
}
Err(e) if e.is_transient() => {
last_err = Some(e);
continue;
}
Err(e) => {
// 非瞬态错误(如 401)不重试
circuit_breaker.record_failure().await;
return Err(e);
}
}
}
circuit_breaker.record_failure().await;
Err(last_err.unwrap())
}
}server:
meta:
circuit_breaker:
failure_threshold: 5 # 连续失败 N 次后熔断
recovery_timeout_secs: 30 # 熔断持续时间
retry:
max_attempts: 3 # 最大重试次数(含首次)
base_delay_ms: 100 # 退避基础延迟- Meta 服务关闭后,前 5 个请求逐个失败,第 6 个起直接 fallback(无 10s 等待)
- 熔断 30s 后自动尝试恢复
- Meta 恢复后,下一个 half-open 请求成功即恢复正常
- 瞬态错误(网络超时)触发重试,非瞬态错误(401)不重试
- Metrics 中可观测到
fs9_circuit_breaker_state指标
auth.rs:208: 过期 token 7 天内可 refresh,被盗窗口过大- Token cache 无主动失效机制,被盗 token 在 TTL(5min)内持续有效
- 无 token 黑名单
Token 生命周期:
┌──────────────────────────────────────────────┐
│ 签发(iat) 过期(exp) 宽限期结束 │
│ │ │ │ │
│ │ 正常使用 │ 可refresh │ 彻底失效 │
│ │◄────────────────►│◄────────►│ │
│ │ │ 4小时 │ │
└──────────────────────────────────────────────┘
Token 撤销:
┌──────────────┐ revoke ┌──────────────┐
│ Admin / User │────────►│ Revocation │
│ │ │ Set (moka) │
└──────────────┘ └──────┬───────┘
│
┌──────▼───────┐
│ Auth MW 检查 │
│ token 是否在 │
│ revocation │
│ set 中 │
└──────────────┘
| 文件 | 改动 |
|---|---|
server/src/auth.rs |
7 天 → 4 小时;增加 revocation set 检查 |
server/src/token_revocation.rs |
新文件: Token 撤销集合 |
server/src/state.rs |
AppState 持有 RevocationSet |
server/src/api/handlers.rs |
新增 POST /api/v1/auth/revoke 端点 |
server/src/api/mod.rs |
注册新路由 |
宽限期收紧: auth.rs 中修改:
let grace_period = 4 * 60 * 60; // 7 天 → 4 小时server/src/token_revocation.rs(新文件):
use moka::future::Cache;
use std::time::Duration;
/// In-memory set of revoked token JTI (JWT ID) or token hash.
/// TTL = max possible token lifetime (exp + grace period) 保证撤销生效。
pub struct RevocationSet {
revoked: Cache<String, ()>,
}
impl RevocationSet {
pub fn new() -> Self {
Self {
// 撤销记录保留 25 小时(24h max token TTL + 1h buffer)
revoked: Cache::builder()
.max_capacity(500_000)
.time_to_live(Duration::from_secs(25 * 3600))
.build(),
}
}
pub async fn revoke(&self, token_hash: &str) {
self.revoked.insert(token_hash.to_string(), ()).await;
}
pub async fn is_revoked(&self, token_hash: &str) -> bool {
self.revoked.get(token_hash).await.is_some()
}
}auth middleware 增加检查:
// 在 token cache hit 和 JWT decode 成功后,都检查 revocation set
let token_hash = sha256_short(token); // 取前 16 bytes 作为 key
if state.app_state.revocation_set.is_revoked(&token_hash).await {
return unauthorized("Token has been revoked");
}新增 API:
POST /api/v1/auth/revoke
Authorization: Bearer <admin-token>
Body: { "token": "<token-to-revoke>" }
Response: 204 No Content
server:
auth:
refresh_grace_period_hours: 4 # 默认 4 小时(原 168 小时 = 7 天)
revocation_capacity: 500000 # 撤销集合最大条目数- 过期超过 4 小时的 token 无法 refresh
-
POST /api/v1/auth/revoke后,被撤销 token 立即返回 401 - Token cache 中的已撤销 token 也被拒绝
- 撤销记录在 25 小时后自动过期(moka TTL)
core/src/handle.rs:177-185:
let mut handles = shard.handles.write().await; // 获取写锁
for id in to_close {
if let Some(state) = handles.remove(&id) {
let _ = state.provider.close(state.provider_handle, false).await; // 持锁 await!
// ...
}
}写锁在 provider.close().await 期间持有。如果 provider(如 ProxyFs)close 慢(网络超时 10s),该 shard 的所有 handle 操作被阻塞。
拆分为"收集"和"关闭"两步,锁只在收集阶段持有:
pub async fn cleanup_stale(&self) -> Vec<HandleId> {
let now = Instant::now();
let mut all_closed = Vec::new();
for shard in &self.shards {
// Step 1: 在读锁下识别过期 handle
let stale_ids: Vec<HandleId> = {
let handles = shard.handles.read().await;
let mut ids = Vec::new();
for (id, state) in handles.iter() {
let last_access = *state.last_access.read().await;
if now.duration_since(last_access) > self.ttl {
ids.push(*id);
}
}
ids
};
// 读锁已释放
if stale_ids.is_empty() {
continue;
}
// Step 2: 在写锁下批量 remove(不 await close)
let removed: Vec<(HandleId, HandleState)> = {
let mut handles = shard.handles.write().await;
stale_ids.iter()
.filter_map(|id| handles.remove(id).map(|s| (*id, s)))
.collect()
};
// 写锁已释放
// Step 3: 无锁状态下逐个 close
for (id, state) in removed {
let _ = state.provider.close(state.provider_handle, false).await;
all_closed.push(id);
}
}
all_closed
}| 文件 | 改动 |
|---|---|
core/src/handle.rs |
重写 cleanup_stale 方法 |
- 现有
cleanup_stale单元测试继续通过 - 新增测试:在 cleanup 期间并发注册 handle 不被阻塞
- Provider close 耗时不影响同 shard 的 handle 操作延迟
server/src/namespace.rs:103:
pub struct NamespaceManager {
namespaces: RwLock<HashMap<String, (Arc<Namespace>, NamespaceInfo)>>,
// ...
}单个 RwLock<HashMap> 管理所有 namespace。百万租户每请求都需 read() 查找 namespace,高并发下读锁获取也有开销。
替换为 dashmap::DashMap,零锁读取。
use dashmap::DashMap;
pub struct NamespaceManager {
namespaces: DashMap<String, (Arc<Namespace>, NamespaceInfo)>,
handle_ttl: Duration,
}DashMap 内部使用分片 HashMap + 细粒度锁,读操作几乎无开销。
| 文件 | 改动 |
|---|---|
server/Cargo.toml |
新增 dashmap 依赖 |
server/src/namespace.rs |
RwLock<HashMap> → DashMap,API 全部改为同步(去掉 .read().await) |
核心变更点:
// Before:
pub async fn get(&self, name: &str) -> Option<Arc<Namespace>> {
self.namespaces.read().await
.get(name).map(|(ns, _)| ns.clone())
}
// After:
pub fn get(&self, name: &str) -> Option<Arc<Namespace>> {
self.namespaces.get(name).map(|r| r.value().0.clone())
}注意: get 从 async fn 变为 fn。调用方(handlers.rs 中的 resolve_ns)需要同步更新,这是纯机械改动。
- 所有现有 namespace 相关测试通过
- 并发创建 1000 个 namespace 无 panic
-
resolve_ns调用延迟下降(benchmark 验证)
TraceLayer 仅记录 HTTP 日志,无 trace context 跨 fs9-server → fs9-meta 传播。
Client ──(trace-id in header)──► fs9-server ──(trace-id propagated)──► fs9-meta
│ │ │
│ ┌─────────────┐ │ ┌─────────────┐ │
└───►│ Span: │ └───►│ Span: │ │
│ HTTP req │ │ meta call │ │
│ duration │ │ duration │ │
└─────────────┘ └─────────────┘
│
┌───────────▼──────────┐
│ OTLP Collector │
│ (Jaeger / Tempo) │
└──────────────────────┘
| 文件 | 改动 |
|---|---|
server/Cargo.toml |
新增 opentelemetry, opentelemetry-otlp, tracing-opentelemetry |
server/src/main.rs |
初始化 OTLP exporter + tracing layer |
server/src/meta_client.rs |
注入 trace context headers(W3C TraceContext) |
meta/Cargo.toml |
新增同样的 OTEL 依赖 |
meta/src/main.rs |
初始化 OTLP exporter |
server/src/main.rs — tracing 初始化变更:
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;
fn init_tracing(config: &Fs9Config) {
let otel_endpoint = config.server.tracing.otlp_endpoint
.as_deref()
.unwrap_or("http://localhost:4317");
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(otel_endpoint)
.build()
.expect("OTLP exporter");
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();
let tracer = provider.tracer("fs9-server");
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(tracing_subscriber::EnvFilter::new(&filter))
.with(OpenTelemetryLayer::new(tracer))
.init();
}meta_client.rs — trace context 传播:
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
impl MetaClient {
pub async fn validate_token(&self, token: &str) -> Result<ValidateResponse, MetaClientError> {
let mut req = self.client.post(&url).json(&body);
// 注入 W3C TraceContext headers
let propagator = TraceContextPropagator::new();
let cx = opentelemetry::Context::current();
let mut headers = HashMap::new();
propagator.inject_context(&cx, &mut headers);
for (k, v) in headers {
req = req.header(&k, &v);
}
req.send().await?
}
}server:
tracing:
otlp_endpoint: "http://localhost:4317" # gRPC OTLP collector
enabled: true # 默认 true
sample_rate: 0.1 # 采样率,1.0 = 全量- Jaeger/Tempo 中可看到
fs9-server和fs9-meta的关联 trace - 单个请求的 span 包含: HTTP handler → meta validation → provider operation
- 采样率可配置,0.1 时仅 10% 请求产生 trace
- OTLP endpoint 不可达时不影响正常请求处理
handlers.rs:160:
let data = ns.vfs.read(&Handle::new(handle_id), req.offset, req.size).await?;
Ok((StatusCode::OK, data)) // 整个 Bytes 一次性返回读取 100MB 文件 = 100MB 内存分配。百万用户场景下几个大文件请求就能 OOM。
分层改造,Phase 2 仅改 HTTP 层,不动 FsProvider trait。
Phase 2 (HTTP 层)
┌──────────────────────────┐
Client ◄────────│ Chunked Transfer-Encoding│
│ (Stream<Bytes>) │
└──────────┬───────────────┘
│ 分块调用
┌──────────▼───────────────┐
│ Loop: │
│ read(handle, offset, │
│ CHUNK_SIZE) │
│ yield chunk │
│ offset += chunk.len() │
│ Until: chunk.len() < CS │
│ or offset >= size │
└──────────────────────────┘
CHUNK_SIZE: 256KB(兼顾吞吐和内存)
| 文件 | 改动 |
|---|---|
server/src/api/handlers.rs |
read handler 改为 streaming response |
server/src/api/models.rs |
ReadRequest 增加可选 stream: bool 参数 |
handlers.rs — streaming read:
use axum::body::Body;
use futures::stream;
use tokio_stream::StreamExt;
const STREAM_CHUNK_SIZE: usize = 256 * 1024; // 256KB
pub async fn read(
State(state): State<Arc<AppState>>,
Extension(ctx): Extension<RequestContext>,
Json(req): Json<ReadRequest>,
) -> AppResult<Response> {
let ns = resolve_ns(&state, &ctx).await?;
let handle_id = ns.handle_map.read().await
.get_id(&req.handle_id)
.ok_or_else(|| FsError::invalid_argument("invalid handle_id"))?;
let total_size = req.size;
// 小请求(≤ 1MB): 保持原有行为,一次性返回
if total_size <= 1024 * 1024 {
let data = ns.vfs.read(&Handle::new(handle_id), req.offset, total_size).await?;
return Ok((StatusCode::OK, data).into_response());
}
// 大请求: 流式返回
let vfs = ns.vfs.clone();
let handle = Handle::new(handle_id);
let mut offset = req.offset;
let end_offset = req.offset + total_size as u64;
let stream = async_stream::stream! {
while offset < end_offset {
let remaining = (end_offset - offset) as usize;
let chunk_size = remaining.min(STREAM_CHUNK_SIZE);
match vfs.read(&handle, offset, chunk_size).await {
Ok(data) => {
let len = data.len();
if len == 0 { break; }
offset += len as u64;
yield Ok::<_, std::io::Error>(data);
if len < chunk_size { break; } // EOF
}
Err(e) => {
yield Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()));
break;
}
}
}
};
Ok(Response::builder()
.status(StatusCode::OK)
.header("transfer-encoding", "chunked")
.body(Body::from_stream(stream))
.unwrap())
}- 请求 size ≤ 1MB: 行为不变(一次性
Bytes返回) - 请求 size > 1MB: 自动切换为 chunked streaming
- Client 端:
reqwest默认支持 chunked 解码,Rust/Python client 无需改动
- 读取 100MB 文件时,server 进程内存增长 < 10MB
- 1MB 以下请求行为和延迟不变
- Python client 和 Rust client 均能正常接收 streaming response
- 中途断连时 stream 正确终止,无资源泄漏
Phase 1(可并行):
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 2.1 停机 │ │ 2.2 限流 │ │ 2.3 指标 │ │ 2.4 PG │ │ 2.5 Body │
└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │ │ │
└──────────────┴──────┬───────┴──────────────┴──────────────┘
│
Phase 2: │ 依赖
┌──────────────────────────▼────────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │3.4 锁优化│ │3.5 NS分片│ ← 可立即开始,无依赖 │
│ └──────────┘ └──────────┘ │
│ │
│ ┌──────────┐ │
│ │3.2 熔断器│ ← 依赖 2.3(需要 metrics 监控熔断器状态) │
│ └──────────┘ │
│ │
│ ┌──────────┐ │
│ │3.3 Token │ ← 可独立,但建议在 2.3 之后(需要监控撤销操作) │
│ └──────────┘ │
│ │
│ ┌──────────┐ │
│ │3.6 OTEL │ ← 依赖 2.3(共享 tracing 初始化基础设施) │
│ └──────────┘ │
│ │
│ ┌──────────┐ │
│ │3.7 流式 │ ← 可独立 │
│ └──────────┘ │
│ │
│ ┌──────────┐ │
│ │3.1 Sticky│ ← 依赖 2.3(需要 per-instance metrics 验证) │
│ └──────────┘ │
└────────────────────────────────────────────────────────────────────┘
建议实施顺序:
| 顺序 | 项目 | 预估工时 | 依赖 |
|---|---|---|---|
| 1 | 2.5 请求体大小限制 | 0.5d | 无 |
| 2 | 2.1 优雅停机 | 1d | 无 |
| 3 | 2.3 Prometheus Metrics | 2d | 无 |
| 4 | 2.2 Per-Tenant 限流 | 1d | 无 |
| 5 | 2.4 PostgreSQL | 3d | 无 |
| 6 | 3.4 cleanup_stale 锁优化 | 0.5d | 无 |
| 7 | 3.5 NamespaceManager 分片 | 1d | 无 |
| 8 | 3.2 Meta Client 熔断器 | 2d | 2.3 |
| 9 | 3.3 Token 撤销 | 1.5d | 无 |
| 10 | 3.7 流式大文件 | 2d | 无 |
| 11 | 3.6 OpenTelemetry | 2d | 2.3 |
| 12 | 3.1 Sticky Session | 1d | 2.3 |
| 总计 | ~17.5d |
# Rate limiting
governor = { version = "0.7", features = ["dashmap"] }
# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"
# Namespace sharding
dashmap = "6"
# Streaming response
async-stream = "0.3"
# OpenTelemetry (Phase 2)
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
tracing-opentelemetry = "0.28"# 已有 postgres feature,无需新增依赖
# OpenTelemetry (Phase 2, 同 server)
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
tracing-opentelemetry = "0.28"所有新增配置项及其默认值:
server:
# Phase 1
shutdown_timeout_secs: 30
max_body_size_bytes: 2097152 # 2MB
max_write_size_bytes: 268435456 # 256MB
rate_limit:
namespace_qps: 1000
user_qps: 100
metrics:
enabled: true
path: "/metrics"
# Phase 2
meta:
circuit_breaker:
failure_threshold: 5
recovery_timeout_secs: 30
retry:
max_attempts: 3
base_delay_ms: 100
auth:
refresh_grace_period_hours: 4
revocation_capacity: 500000
tracing:
otlp_endpoint: "http://localhost:4317"
enabled: true
sample_rate: 0.1
database:
# meta 服务
dsn: "postgres://..."
max_connections: 50
min_connections: 5-
make check通过(lint + test) -
SIGTERM后优雅停机,无资源泄漏 -
GET /metrics返回 Prometheus 格式指标 - 单租户超限后返回 429,不影响其他租户
- fs9-meta 可使用 PostgreSQL 后端运行所有 E2E 测试
- 超大 body 返回 413
- fs9-meta 不可用时,server 在 5 次失败后自动熔断,无 10s 延迟
- 被撤销 token 即时失效(< 1s)
-
cleanup_stale期间其他 handle 操作无延迟抖动 - Jaeger 中可查看跨服务 trace
- 100MB 文件读取时内存增长 < 10MB
- 多实例部署下同一 namespace 稳定路由到同一实例
| 指标 | 目标 |
|---|---|
| 单实例 QPS (read) | > 10,000 |
| P99 延迟 (stat) | < 10ms |
| 并发 namespace | > 10,000 |
| 并发 handle | > 100,000 |
| Token 验证延迟 (cached) | < 1ms |
| Token 验证延迟 (meta) | < 50ms |
| 优雅停机时间 | < 30s |