Skip to content

Instantly share code, notes, and snippets.

@c4pt0r
Created February 6, 2026 22:33
Show Gist options
  • Select an option

  • Save c4pt0r/59abb72a22df781d9eed72768f850b4f to your computer and use it in GitHub Desktop.

Select an option

Save c4pt0r/59abb72a22df781d9eed72768f850b4f to your computer and use it in GitHub Desktop.
FS9 百万用户规模改造设计文档

FS9 百万用户规模改造设计文档

版本: 1.0
日期: 2026-02-06
状态: DRAFT — 待评审


目录


1. 概述

目标

将 FS9 从"可运行的开发阶段"提升至"支撑百万级用户的生产系统"。
改造分两个阶段:

阶段 目标 改造项
Phase 1 (P0) 不做就不能上线 5 项
Phase 2 (P1) 百万用户前必须完成 7 项

当前架构快照

Client → [LB] → fs9-server (Axum) → VfsRouter → Provider (memfs/pagefs/localfs/plugin)
                     ↓
                fs9-meta (SQLite)

核心瓶颈:fs9-server 有状态(Handle 在内存)、无可观测性、无容错韧性。

设计原则

  1. 最小侵入: 优先改 middleware 层和配置,避免改动 FsProvider trait
  2. 渐进式: 每个改造项可独立合并、独立回滚
  3. 向后兼容: 不破坏现有 API 契约和 SDK 版本
  4. 可验证: 每项改造附带明确的验收测试

2. Phase 1 — P0 上线必需

2.1 优雅停机与信号处理

现状

server/src/main.rs:107:

axum::serve(listener, app).await.unwrap();

SIGTERM 立即终止进程。进行中的请求被中断,open 的 file handle 永远不会被 close,provider 侧资源泄漏。

设计

                    SIGTERM/SIGINT
                         │
                         ▼
              ┌─────────────────────┐
              │  Shutdown Signal    │
              │  (tokio::signal)    │
              └──────────┬──────────┘
                         │
              ┌──────────▼──────────┐
              │  Stop accepting     │
              │  new connections    │
              └──────────┬──────────┘
                         │
              ┌──────────▼──────────┐
              │  Drain in-flight    │
              │  requests (30s)     │
              └──────────┬──────────┘
                         │
              ┌──────────▼──────────┐
              │  Close all handles  │
              │  per namespace      │
              └──────────┬──────────┘
                         │
              ┌──────────▼──────────┐
              │  Unload plugins     │
              └──────────┬──────────┘
                         │
              ┌──────────▼──────────┐
              │  Exit(0)            │
              └─────────────────────┘

涉及文件

文件 改动
server/src/main.rs 添加 shutdown signal handler + graceful shutdown 逻辑
server/src/namespace.rs 新增 NamespaceManager::drain_all() 方法
core/src/handle.rs 新增 HandleRegistry::close_all() 方法
config/src/types.rs 新增 shutdown_timeout_secs 配置项

实现细节

server/src/main.rs:

// 替换现有的 axum::serve 调用
let shutdown_timeout = Duration::from_secs(
    config.server.shutdown_timeout_secs.unwrap_or(30)
);

axum::serve(listener, app)
    .with_graceful_shutdown(shutdown_signal(shutdown_timeout, state.clone()))
    .await
    .unwrap();

async fn shutdown_signal(timeout: Duration, state: Arc<AppState>) {
    let ctrl_c = async {
        tokio::signal::ctrl_c().await.expect("install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
            .expect("install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        () = ctrl_c => tracing::info!("Received Ctrl+C, shutting down"),
        () = terminate => tracing::info!("Received SIGTERM, shutting down"),
    }

    // Drain phase: close all open handles across all namespaces
    tracing::info!("Draining open handles...");
    state.namespace_manager.drain_all().await;
    tracing::info!("Shutdown complete");
}

core/src/handle.rs — 新增 close_all:

/// Close all handles in the registry. Used during graceful shutdown.
/// Returns count of handles closed.
pub async fn close_all(&self) -> usize {
    let mut total = 0;
    for shard in &self.shards {
        let handles: Vec<(HandleId, HandleState)> = {
            let mut map = shard.handles.write().await;
            map.drain().collect()
        };
        // 锁已释放,逐个 close
        for (id, state) in handles {
            let _ = state.provider.close(state.provider_handle, true).await;
            total += 1;
            tracing::debug!(handle_id = id, path = %state.path, "Closed handle during shutdown");
        }
    }
    total
}

server/src/namespace.rs — 新增 drain_all:

impl NamespaceManager {
    /// Drain all namespaces: close all open handles. Used during graceful shutdown.
    pub async fn drain_all(&self) {
        let namespaces: Vec<Arc<Namespace>> = {
            self.namespaces.read().await
                .values()
                .map(|(ns, _)| ns.clone())
                .collect()
        };
        for ns in namespaces {
            let count = ns.handle_registry.close_all().await;
            if count > 0 {
                tracing::info!(namespace = %ns.name, closed = count, "Drained handles");
            }
        }
    }
}

配置

server:
  shutdown_timeout_secs: 30   # 新增,默认 30 秒

验收标准

  • kill -SIGTERM <pid> 后,正在执行的 read/write 请求能正常完成
  • 所有 open handle 被 close(通过日志验证 "Drained handles" 输出)
  • 新连接在 shutdown 阶段被拒绝(503)
  • 超过 shutdown_timeout_secs 后强制退出

2.2 Per-Tenant 限流

现状

server/src/main.rs:100 仅有全局并发限制:

.layer(ConcurrencyLimitLayer::new(max_concurrent))  // 默认 1000

任意单租户可耗尽所有 1000 并发槽,形成"吵闹邻居"问题。

设计

Request → Auth Middleware → Rate Limit Middleware → Handler
                │                    │
                │                    ├─ 全局: 总并发 ≤ N
                │                    ├─ Per-NS: 每 namespace QPS ≤ M
                │                    └─ Per-User: 每 user QPS ≤ K
                │
                └─ 提取 namespace + user_id

采用两层限流

  1. 全局层(已有): ConcurrencyLimitLayer 保护总体资源
  2. 租户层(新增): 基于 governor crate 的令牌桶算法,按 (namespace, user_id) 分桶

涉及文件

文件 改动
server/Cargo.toml 新增 governor 依赖
server/src/rate_limit.rs 新文件: 限流 middleware 实现
server/src/main.rs 插入限流 layer
server/src/lib.rs 导出新模块
config/src/types.rs 新增限流配置项

实现细节

server/src/rate_limit.rs(新文件):

use axum::{body::Body, extract::Request, http::StatusCode, middleware::Next, response::Response};
use governor::{DefaultKeyedRateLimiter, Quota, RateLimiter};
use governor::clock::DefaultClock;
use governor::state::keyed::DashMapStateStore;
use std::num::NonZeroU32;
use std::sync::Arc;

use crate::auth::RequestContext;

/// Composite key for rate limiting: (namespace, user_id)
type RateLimitKey = String;

#[derive(Clone)]
pub struct RateLimitState {
    /// Per-namespace limiter
    ns_limiter: Arc<DefaultKeyedRateLimiter<String>>,
    /// Per-user limiter (key = "ns:user")
    user_limiter: Arc<DefaultKeyedRateLimiter<String>>,
    enabled: bool,
}

impl RateLimitState {
    pub fn new(ns_qps: u32, user_qps: u32) -> Self {
        let ns_limiter = Arc::new(RateLimiter::dashmap(
            Quota::per_second(NonZeroU32::new(ns_qps).unwrap_or(NonZeroU32::new(1000).unwrap())),
        ));
        let user_limiter = Arc::new(RateLimiter::dashmap(
            Quota::per_second(NonZeroU32::new(user_qps).unwrap_or(NonZeroU32::new(100).unwrap())),
        ));
        Self {
            ns_limiter,
            user_limiter,
            enabled: true,
        }
    }

    pub fn disabled() -> Self {
        Self {
            ns_limiter: Arc::new(RateLimiter::dashmap(
                Quota::per_second(NonZeroU32::new(1).unwrap()),
            )),
            user_limiter: Arc::new(RateLimiter::dashmap(
                Quota::per_second(NonZeroU32::new(1).unwrap()),
            )),
            enabled: false,
        }
    }
}

pub async fn rate_limit_middleware(
    axum::extract::State(state): axum::extract::State<RateLimitState>,
    request: Request<Body>,
    next: Next,
) -> Response {
    if !state.enabled {
        return next.run(request).await;
    }

    // Skip rate limiting for health checks
    if request.uri().path() == "/health" {
        return next.run(request).await;
    }

    // Extract context (set by auth middleware which runs before this)
    let ctx = request.extensions().get::<RequestContext>();
    if let Some(ctx) = ctx {
        // Check namespace-level limit
        if state.ns_limiter.check_key(&ctx.ns).is_err() {
            return (
                StatusCode::TOO_MANY_REQUESTS,
                "Namespace rate limit exceeded",
            ).into_response();
        }

        // Check user-level limit
        let user_key = format!("{}:{}", ctx.ns, ctx.user_id);
        if state.user_limiter.check_key(&user_key).is_err() {
            return (
                StatusCode::TOO_MANY_REQUESTS,
                "User rate limit exceeded",
            ).into_response();
        }
    }

    next.run(request).await
}

server/src/main.rs — 限流 layer 插入位置(在 auth 之后):

// Layer 顺序(从外到内):
// TraceLayer → ConcurrencyLimit → Timeout → RateLimit → Auth → Handler
let rate_limit_state = RateLimitState::new(
    config.server.rate_limit.namespace_qps.unwrap_or(1000),
    config.server.rate_limit.user_qps.unwrap_or(100),
);

let app = api::create_router(state)
    .layer(middleware::from_fn_with_state(
        rate_limit_state,
        rate_limit::rate_limit_middleware,
    ))
    .layer(middleware::from_fn_with_state(
        auth_middleware_state,
        auth::auth_middleware,
    ))
    .layer(TimeoutLayer::new(request_timeout))
    .layer(ConcurrencyLimitLayer::new(max_concurrent))
    .layer(TraceLayer::new_for_http());

注意: Rate limit 层必须在 auth 层之上(即在请求流中 auth 先执行),因为限流需要 RequestContext 中的 namespace 和 user_id。

配置

server:
  rate_limit:
    namespace_qps: 1000    # 每 namespace 每秒最大请求数,默认 1000
    user_qps: 100           # 每 user 每秒最大请求数,默认 100

验收标准

  • 单用户超过 user_qps 后收到 429 Too Many Requests
  • 同 namespace 其他用户不受影响
  • Health endpoint 不受限流影响
  • 限流关闭时(配置不设置或 enabled: false)无性能开销

2.3 Prometheus Metrics 接入

现状

零可观测性指标。无法监控 QPS、延迟分位数、错误率、handle 数量等关键指标。

设计

采用 metrics facade + metrics-exporter-prometheus 方案。

                    ┌─────────────────────────┐
                    │      fs9-server          │
                    │                          │
  Request ──────►   │  ┌──────────────────┐    │   GET /metrics
                    │  │ Metrics Middleware│    │ ◄──── Prometheus
                    │  │  (per-request)   │    │
                    │  └──────┬───────────┘    │
                    │         │                │
                    │  ┌──────▼───────────┐    │
                    │  │ metrics crate    │    │
                    │  │ (facade/macros)  │    │
                    │  └──────┬───────────┘    │
                    │         │                │
                    │  ┌──────▼───────────┐    │
                    │  │ Prometheus       │    │
                    │  │ Exporter (HTTP)  │    │
                    │  └─────────────────┘    │
                    └─────────────────────────┘

指标清单

指标名 类型 Labels 说明
fs9_http_requests_total Counter method, path, status, namespace 请求总数
fs9_http_request_duration_seconds Histogram method, path, namespace 请求延迟
fs9_active_handles Gauge namespace 当前打开的 handle 数
fs9_active_namespaces Gauge 活跃 namespace 数
fs9_token_cache_hits_total Counter Token 缓存命中
fs9_token_cache_misses_total Counter Token 缓存未命中
fs9_token_cache_size Gauge Token 缓存条目数
fs9_meta_requests_total Counter endpoint, status 到 meta 服务的请求数
fs9_meta_request_duration_seconds Histogram endpoint 到 meta 服务的延迟
fs9_provider_operations_total Counter operation, provider, namespace Provider 操作计数
fs9_provider_operation_duration_seconds Histogram operation, provider Provider 操作延迟

涉及文件

文件 改动
server/Cargo.toml 新增 metrics, metrics-exporter-prometheus 依赖
server/src/metrics.rs 新文件: 指标定义 + metrics middleware + exporter 初始化
server/src/main.rs 初始化 exporter,挂载 /metrics endpoint
server/src/api/mod.rs 添加 /metrics route
server/src/token_cache.rs get/set 中埋点
server/src/meta_client.rs 在 HTTP 调用中埋点
server/src/lib.rs 导出新模块

实现细节

server/src/metrics.rs(新文件):

use axum::{body::Body, extract::Request, middleware::Next, response::Response};
use metrics::{counter, gauge, histogram};
use std::time::Instant;

use crate::auth::RequestContext;

/// Initialize the Prometheus metrics exporter.
/// Returns the PrometheusHandle for installing the /metrics endpoint.
pub fn init_exporter() -> metrics_exporter_prometheus::PrometheusHandle {
    let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
    builder
        .install_recorder()
        .expect("failed to install Prometheus recorder")
}

/// Axum middleware that records per-request metrics.
pub async fn metrics_middleware(request: Request<Body>, next: Next) -> Response {
    let method = request.method().clone();
    let path = request.uri().path().to_string();
    // Normalize path: strip dynamic segments for cardinality control
    let path_label = normalize_path(&path);

    let ns = request
        .extensions()
        .get::<RequestContext>()
        .map(|ctx| ctx.ns.clone())
        .unwrap_or_else(|| "unknown".to_string());

    let start = Instant::now();
    let response = next.run(request).await;
    let duration = start.elapsed().as_secs_f64();
    let status = response.status().as_u16().to_string();

    let labels = [
        ("method", method.to_string()),
        ("path", path_label),
        ("status", status),
        ("namespace", ns),
    ];

    counter!("fs9_http_requests_total", &labels).increment(1);
    histogram!("fs9_http_request_duration_seconds", &labels[..3]).record(duration);

    response
}

/// Normalize API paths to reduce label cardinality.
/// `/api/v1/namespaces/myns` → `/api/v1/namespaces/:ns`
fn normalize_path(path: &str) -> String {
    if path.starts_with("/api/v1/namespaces/") && path.matches('/').count() == 4 {
        return "/api/v1/namespaces/:ns".to_string();
    }
    path.to_string()
}

/// Handler for GET /metrics — returns Prometheus text format.
pub async fn metrics_handler(
    axum::extract::State(handle): axum::extract::State<metrics_exporter_prometheus::PrometheusHandle>,
) -> String {
    handle.render()
}

Token cache 埋点server/src/token_cache.rs:

// 在 get() 中:
pub async fn get(&self, token: &str) -> Option<CachedToken> {
    let result = self.cache.get(token).await.and_then(|entry| { ... });
    match &result {
        Some(_) => metrics::counter!("fs9_token_cache_hits_total").increment(1),
        None => metrics::counter!("fs9_token_cache_misses_total").increment(1),
    }
    result
}

配置

server:
  metrics:
    enabled: true           # 默认 true
    path: "/metrics"        # 默认 /metrics

验收标准

  • GET /metrics 返回 Prometheus text format
  • 发送 10 个请求后,fs9_http_requests_total 计数正确
  • 延迟直方图包含合理的 bucket 分布
  • /metrics 本身不被 metrics middleware 记录(避免递归)
  • Grafana 可正常导入并展示 dashboard

2.4 PostgreSQL 后端完成

现状

meta/src/db/mod.rs:43:

#[cfg(feature = "postgres")]
{
    todo!("PostgreSQL support not yet implemented");
}

SQLite 单节点最大 5 连接,无法支撑百万用户的 token 验证频率。

设计

实现 PostgresStore,与 SqliteStore 接口完全对齐。

MetaStore (enum)
├── Sqlite(SqliteStore)     ← 已有
└── Postgres(PostgresStore)  ← 新增

涉及文件

文件 改动
meta/Cargo.toml 确认 postgres feature 已声明(已有)
meta/src/db/postgres.rs 新文件: PostgresStore 完整实现
meta/src/db/mod.rs 添加 #[cfg(feature = "postgres")] mod postgres; + MetaStore 枚举分支

实现细节

meta/src/db/postgres.rs 核心结构:

use sqlx::{postgres::PgPoolOptions, PgPool};

pub struct PostgresStore {
    pool: PgPool,
}

impl PostgresStore {
    pub async fn connect(dsn: &str) -> Result<Self> {
        let pool = PgPoolOptions::new()
            .max_connections(50)          // 生产建议 50-200
            .min_connections(5)           // 保持最小连接避免冷启动
            .acquire_timeout(Duration::from_secs(5))
            .idle_timeout(Duration::from_secs(300))
            .connect(dsn)
            .await
            .map_err(|e| MetaError::Database(format!("PG connect failed: {e}")))?;

        Ok(Self { pool })
    }
}

SQL 差异处理:

操作 SQLite PostgreSQL
占位符 ? $1, $2, ...
自增 ID 不使用(UUID) 不使用(UUID)
时间类型 TEXT (ISO8601) TIMESTAMPTZ
布尔 INTEGER BOOLEAN
JSON TEXT JSONB

由于 FS9 的 meta service 使用 UUID 主键和文本时间戳,大部分 SQL 可以几乎原样复用。
主要差异在:

  1. 占位符语法 ?$N
  2. 时间列类型建议改为 TIMESTAMPTZ(但可继续用 TEXT 保持一致性,代价是无法用 PG 时间函数)

推荐方案: 时间列使用 TIMESTAMPTZ,利用 sqlxchrono::DateTime<Utc> 自动绑定。

meta/src/db/mod.rs — 扩展 MetaStore enum:

#[derive(Clone)]
pub enum MetaStore {
    #[cfg(feature = "sqlite")]
    Sqlite(SqliteStore),
    #[cfg(feature = "postgres")]
    Postgres(PostgresStore),
}

// 每个方法增加分支:
pub async fn create_namespace(&self, name: &str, created_by: &str) -> Result<Namespace> {
    match self {
        #[cfg(feature = "sqlite")]
        Self::Sqlite(store) => store.create_namespace(name, created_by).await,
        #[cfg(feature = "postgres")]
        Self::Postgres(store) => store.create_namespace(name, created_by).await,
    }
}

Migration: PostgresStore 的 migrate() 使用相同表结构,但使用 PG 原生类型:

CREATE TABLE IF NOT EXISTS namespaces (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name TEXT UNIQUE NOT NULL,
    status TEXT NOT NULL DEFAULT 'active',
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_by TEXT NOT NULL,
    updated_at TIMESTAMPTZ,
    deleted_at TIMESTAMPTZ
);

-- 索引与 SQLite 版本一致
CREATE INDEX IF NOT EXISTS idx_namespaces_name ON namespaces(name);

配置

# meta.yaml
database:
  # SQLite (默认,开发/测试)
  dsn: "sqlite:fs9-meta.db"

  # PostgreSQL (生产)
  # dsn: "postgres://fs9:password@pg-host:5432/fs9_meta"
  # max_connections: 50    # 新增
  # min_connections: 5     # 新增

验收标准

  • cargo build -p fs9-meta --features postgres 编译通过
  • 所有现有 SQLite 测试在 Postgres 后端上同样通过
  • 连接池配置可调
  • 使用 Postgres 时 fs9-meta 可承受 1000+ QPS 的 token validation

2.5 请求体大小限制

现状

Axum 默认 body limit 2MB(axum 0.7+ 默认开启),但未被显式配置,行为不透明。
写入大文件时用户得到的错误信息不友好。

设计

显式设置全局 body limit + write 端点单独的更大 limit。

涉及文件

文件 改动
server/src/main.rs 添加 DefaultBodyLimit layer
server/src/api/mod.rs write 路由单独设置更大 limit
config/src/types.rs 新增 max_body_size 配置

实现细节

server/src/main.rs:

use axum::extract::DefaultBodyLimit;

// 全局默认限制: 2MB(给 JSON API 请求用)
let default_body_limit = config.server.max_body_size_bytes.unwrap_or(2 * 1024 * 1024);
// Write 端点限制: 256MB(大文件上传)
let write_body_limit = config.server.max_write_size_bytes.unwrap_or(256 * 1024 * 1024);

let app = api::create_router(state, write_body_limit)
    .layer(DefaultBodyLimit::max(default_body_limit))
    // ... 其他 layers

server/src/api/mod.rs — write 路由单独 limit:

pub fn create_router(state: Arc<AppState>, write_body_limit: usize) -> Router {
    Router::new()
        // ... 其他路由
        .route("/api/v1/write", post(handlers::write)
            .layer(DefaultBodyLimit::max(write_body_limit)))
        // ...
        .with_state(state)
}

配置

server:
  max_body_size_bytes: 2097152        # 2MB,默认值,控制 JSON API 请求
  max_write_size_bytes: 268435456     # 256MB,默认值,控制文件写入

验收标准

  • 超过 limit 的请求返回 413 Payload Too Large
  • write 端点可以上传 > 2MB 但 ≤ 256MB 的文件
  • 其他端点(open/close/stat 等)body > 2MB 被拒绝

3. Phase 2 — 百万用户前必须完成

3.1 Sticky Session 方案

现状

HandleRegistryHandleMap 在内存中,多实例部署时 handle 不跨实例共享。

设计

不改应用代码。通过 LB 层 + 文档解决。

                ┌──────────────────┐
                │   Load Balancer  │
                │                  │
                │  hash(namespace) │──► 固定路由到同一实例
                │  或 cookie-based │
                └──────┬───────────┘
                       │
            ┌──────────┼──────────┐
            │          │          │
     ┌──────▼──┐ ┌─────▼───┐ ┌───▼──────┐
     │ Server1 │ │ Server2 │ │ Server3  │
     │ NS: a,b │ │ NS: c,d │ │ NS: e,f  │
     └─────────┘ └─────────┘ └──────────┘

路由策略: hash(JWT.ns claim) % instance_count

  • 优点: 零代码改动,同一 namespace 的所有请求路由到同一实例
  • 缺点: namespace 粒度的"热点"可能导致不均衡
  • 缓解: 监控每实例 handle 数(§2.3 metrics),配合 namespace 再分配

交付物

交付物 说明
docs/deployment/sticky-session.md 部署指南文档
docs/deployment/nginx-sticky.conf Nginx 配置示例
docs/deployment/envoy-sticky.yaml Envoy 配置示例
Health check 增强 /health 返回 instance_id 用于 debug

Nginx 配置示例

upstream fs9_servers {
    # 基于 $http_authorization 中 JWT 的 ns claim 做一致性 hash
    # 简化方案: 基于 Authorization header hash
    hash $http_authorization consistent;

    server fs9-1:9999;
    server fs9-2:9999;
    server fs9-3:9999;
}

验收标准

  • Nginx/Envoy 配置文档完成并验证
  • 同一 namespace 的请求稳定路由到同一实例(日志验证)
  • 实例下线后,LB 自动将 namespace 迁移到其他实例
  • Health endpoint 返回 instance_id

3.2 Meta Client 熔断器与重试

现状

server/src/meta_client.rs 硬编码 10s timeout,无重试,无熔断。
server/src/auth.rs:382-389 meta 不可用时 fallback 到本地 JWT,但每个请求仍尝试一次 HTTP 调用(10s timeout 浪费)。

设计

Token Validation Request
          │
          ▼
    ┌─────────────┐    CLOSED     ┌──────────────┐
    │ Circuit     │──────────────►│  Meta Client  │
    │ Breaker     │               │  HTTP Call    │
    │             │◄──────────────│              │
    │  OPEN:      │   success/    └──────────────┘
    │  skip HTTP, │   failure
    │  直接 local │
    │  JWT decode │
    └─────┬───────┘
          │ OPEN / HALF_OPEN
          ▼
    ┌─────────────┐
    │ Local JWT   │
    │ Decode      │
    │ (fallback)  │
    └─────────────┘

熔断器状态机:

  • CLOSED: 正常转发请求到 meta。连续 5 次失败 → OPEN
  • OPEN: 跳过 meta,直接 local JWT decode。持续 30s → HALF_OPEN
  • HALF_OPEN: 放过 1 个请求到 meta。成功 → CLOSED,失败 → OPEN

涉及文件

文件 改动
server/src/circuit_breaker.rs 新文件: 通用熔断器实现
server/src/meta_client.rs 集成熔断器 + 重试逻辑
server/src/auth.rs 使用熔断器包装的 meta client
server/src/state.rs AppState 持有 CircuitBreaker
server/src/lib.rs 导出新模块
config/src/types.rs 新增熔断器配置

实现细节

server/src/circuit_breaker.rs(新文件):

use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
    Closed,
    Open,
    HalfOpen,
}

pub struct CircuitBreaker {
    state: RwLock<CircuitState>,
    failure_count: AtomicU32,
    failure_threshold: u32,
    recovery_timeout: Duration,
    last_failure: RwLock<Option<Instant>>,
}

impl CircuitBreaker {
    pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self { ... }
    pub async fn state(&self) -> CircuitState { ... }
    pub async fn record_success(&self) { ... }
    pub async fn record_failure(&self) { ... }
    pub async fn allow_request(&self) -> bool { ... }
}

重试策略: 在 MetaClient::validate_token 中:

impl MetaClient {
    pub async fn validate_token_with_retry(
        &self,
        token: &str,
        circuit_breaker: &CircuitBreaker,
    ) -> Result<ValidateResponse, MetaClientError> {
        if !circuit_breaker.allow_request().await {
            return Err(MetaClientError::CircuitOpen);
        }

        let mut last_err = None;
        for attempt in 0..3 {
            if attempt > 0 {
                // 指数退避: 100ms, 200ms
                tokio::time::sleep(Duration::from_millis(100 * (1 << attempt))).await;
            }
            match self.validate_token(token).await {
                Ok(resp) => {
                    circuit_breaker.record_success().await;
                    return Ok(resp);
                }
                Err(e) if e.is_transient() => {
                    last_err = Some(e);
                    continue;
                }
                Err(e) => {
                    // 非瞬态错误(如 401)不重试
                    circuit_breaker.record_failure().await;
                    return Err(e);
                }
            }
        }

        circuit_breaker.record_failure().await;
        Err(last_err.unwrap())
    }
}

配置

server:
  meta:
    circuit_breaker:
      failure_threshold: 5       # 连续失败 N 次后熔断
      recovery_timeout_secs: 30  # 熔断持续时间
    retry:
      max_attempts: 3            # 最大重试次数(含首次)
      base_delay_ms: 100         # 退避基础延迟

验收标准

  • Meta 服务关闭后,前 5 个请求逐个失败,第 6 个起直接 fallback(无 10s 等待)
  • 熔断 30s 后自动尝试恢复
  • Meta 恢复后,下一个 half-open 请求成功即恢复正常
  • 瞬态错误(网络超时)触发重试,非瞬态错误(401)不重试
  • Metrics 中可观测到 fs9_circuit_breaker_state 指标

3.3 Token 撤销与 Refresh 宽限期收紧

现状

  • auth.rs:208: 过期 token 7 天内可 refresh,被盗窗口过大
  • Token cache 无主动失效机制,被盗 token 在 TTL(5min)内持续有效
  • 无 token 黑名单

设计

Token 生命周期:
  ┌──────────────────────────────────────────────┐
  │ 签发(iat)          过期(exp)   宽限期结束     │
  │    │                  │          │            │
  │    │   正常使用        │ 可refresh │  彻底失效  │
  │    │◄────────────────►│◄────────►│            │
  │    │                  │  4小时    │            │
  └──────────────────────────────────────────────┘

Token 撤销:
  ┌──────────────┐  revoke  ┌──────────────┐
  │ Admin / User │────────►│ Revocation   │
  │              │          │ Set (moka)   │
  └──────────────┘          └──────┬───────┘
                                   │
                            ┌──────▼───────┐
                            │ Auth MW 检查  │
                            │ token 是否在  │
                            │ revocation   │
                            │ set 中       │
                            └──────────────┘

涉及文件

文件 改动
server/src/auth.rs 7 天 → 4 小时;增加 revocation set 检查
server/src/token_revocation.rs 新文件: Token 撤销集合
server/src/state.rs AppState 持有 RevocationSet
server/src/api/handlers.rs 新增 POST /api/v1/auth/revoke 端点
server/src/api/mod.rs 注册新路由

实现细节

宽限期收紧: auth.rs 中修改:

let grace_period = 4 * 60 * 60; // 7 天 → 4 小时

server/src/token_revocation.rs(新文件):

use moka::future::Cache;
use std::time::Duration;

/// In-memory set of revoked token JTI (JWT ID) or token hash.
/// TTL = max possible token lifetime (exp + grace period) 保证撤销生效。
pub struct RevocationSet {
    revoked: Cache<String, ()>,
}

impl RevocationSet {
    pub fn new() -> Self {
        Self {
            // 撤销记录保留 25 小时(24h max token TTL + 1h buffer)
            revoked: Cache::builder()
                .max_capacity(500_000)
                .time_to_live(Duration::from_secs(25 * 3600))
                .build(),
        }
    }

    pub async fn revoke(&self, token_hash: &str) {
        self.revoked.insert(token_hash.to_string(), ()).await;
    }

    pub async fn is_revoked(&self, token_hash: &str) -> bool {
        self.revoked.get(token_hash).await.is_some()
    }
}

auth middleware 增加检查:

// 在 token cache hit 和 JWT decode 成功后,都检查 revocation set
let token_hash = sha256_short(token);  // 取前 16 bytes 作为 key
if state.app_state.revocation_set.is_revoked(&token_hash).await {
    return unauthorized("Token has been revoked");
}

新增 API:

POST /api/v1/auth/revoke
Authorization: Bearer <admin-token>
Body: { "token": "<token-to-revoke>" }
Response: 204 No Content

配置

server:
  auth:
    refresh_grace_period_hours: 4    # 默认 4 小时(原 168 小时 = 7 天)
    revocation_capacity: 500000      # 撤销集合最大条目数

验收标准

  • 过期超过 4 小时的 token 无法 refresh
  • POST /api/v1/auth/revoke 后,被撤销 token 立即返回 401
  • Token cache 中的已撤销 token 也被拒绝
  • 撤销记录在 25 小时后自动过期(moka TTL)

3.4 cleanup_stale 锁优化

现状

core/src/handle.rs:177-185:

let mut handles = shard.handles.write().await;  // 获取写锁
for id in to_close {
    if let Some(state) = handles.remove(&id) {
        let _ = state.provider.close(state.provider_handle, false).await;  // 持锁 await!
        // ...
    }
}

写锁在 provider.close().await 期间持有。如果 provider(如 ProxyFs)close 慢(网络超时 10s),该 shard 的所有 handle 操作被阻塞。

设计

拆分为"收集"和"关闭"两步,锁只在收集阶段持有:

pub async fn cleanup_stale(&self) -> Vec<HandleId> {
    let now = Instant::now();
    let mut all_closed = Vec::new();

    for shard in &self.shards {
        // Step 1: 在读锁下识别过期 handle
        let stale_ids: Vec<HandleId> = {
            let handles = shard.handles.read().await;
            let mut ids = Vec::new();
            for (id, state) in handles.iter() {
                let last_access = *state.last_access.read().await;
                if now.duration_since(last_access) > self.ttl {
                    ids.push(*id);
                }
            }
            ids
        };
        // 读锁已释放

        if stale_ids.is_empty() {
            continue;
        }

        // Step 2: 在写锁下批量 remove(不 await close)
        let removed: Vec<(HandleId, HandleState)> = {
            let mut handles = shard.handles.write().await;
            stale_ids.iter()
                .filter_map(|id| handles.remove(id).map(|s| (*id, s)))
                .collect()
        };
        // 写锁已释放

        // Step 3: 无锁状态下逐个 close
        for (id, state) in removed {
            let _ = state.provider.close(state.provider_handle, false).await;
            all_closed.push(id);
        }
    }

    all_closed
}

涉及文件

文件 改动
core/src/handle.rs 重写 cleanup_stale 方法

验收标准

  • 现有 cleanup_stale 单元测试继续通过
  • 新增测试:在 cleanup 期间并发注册 handle 不被阻塞
  • Provider close 耗时不影响同 shard 的 handle 操作延迟

3.5 NamespaceManager 分片

现状

server/src/namespace.rs:103:

pub struct NamespaceManager {
    namespaces: RwLock<HashMap<String, (Arc<Namespace>, NamespaceInfo)>>,
    // ...
}

单个 RwLock<HashMap> 管理所有 namespace。百万租户每请求都需 read() 查找 namespace,高并发下读锁获取也有开销。

设计

替换为 dashmap::DashMap,零锁读取。

use dashmap::DashMap;

pub struct NamespaceManager {
    namespaces: DashMap<String, (Arc<Namespace>, NamespaceInfo)>,
    handle_ttl: Duration,
}

DashMap 内部使用分片 HashMap + 细粒度锁,读操作几乎无开销。

涉及文件

文件 改动
server/Cargo.toml 新增 dashmap 依赖
server/src/namespace.rs RwLock<HashMap>DashMap,API 全部改为同步(去掉 .read().await

实现细节

核心变更点:

// Before:
pub async fn get(&self, name: &str) -> Option<Arc<Namespace>> {
    self.namespaces.read().await
        .get(name).map(|(ns, _)| ns.clone())
}

// After:
pub fn get(&self, name: &str) -> Option<Arc<Namespace>> {
    self.namespaces.get(name).map(|r| r.value().0.clone())
}

注意: getasync fn 变为 fn。调用方(handlers.rs 中的 resolve_ns)需要同步更新,这是纯机械改动。

验收标准

  • 所有现有 namespace 相关测试通过
  • 并发创建 1000 个 namespace 无 panic
  • resolve_ns 调用延迟下降(benchmark 验证)

3.6 OpenTelemetry 分布式追踪

现状

TraceLayer 仅记录 HTTP 日志,无 trace context 跨 fs9-server → fs9-meta 传播。

设计

Client ──(trace-id in header)──► fs9-server ──(trace-id propagated)──► fs9-meta
          │                          │                                      │
          │    ┌─────────────┐       │    ┌─────────────┐                  │
          └───►│  Span:      │       └───►│  Span:      │                  │
               │  HTTP req   │            │  meta call  │                  │
               │  duration   │            │  duration   │                  │
               └─────────────┘            └─────────────┘
                                                │
                                    ┌───────────▼──────────┐
                                    │  OTLP Collector      │
                                    │  (Jaeger / Tempo)    │
                                    └──────────────────────┘

涉及文件

文件 改动
server/Cargo.toml 新增 opentelemetry, opentelemetry-otlp, tracing-opentelemetry
server/src/main.rs 初始化 OTLP exporter + tracing layer
server/src/meta_client.rs 注入 trace context headers(W3C TraceContext)
meta/Cargo.toml 新增同样的 OTEL 依赖
meta/src/main.rs 初始化 OTLP exporter

实现细节

server/src/main.rs — tracing 初始化变更:

use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;

fn init_tracing(config: &Fs9Config) {
    let otel_endpoint = config.server.tracing.otlp_endpoint
        .as_deref()
        .unwrap_or("http://localhost:4317");

    let exporter = opentelemetry_otlp::SpanExporter::builder()
        .with_tonic()
        .with_endpoint(otel_endpoint)
        .build()
        .expect("OTLP exporter");

    let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
        .with_batch_exporter(exporter)
        .build();

    let tracer = provider.tracer("fs9-server");

    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer())
        .with(tracing_subscriber::EnvFilter::new(&filter))
        .with(OpenTelemetryLayer::new(tracer))
        .init();
}

meta_client.rs — trace context 传播:

use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;

impl MetaClient {
    pub async fn validate_token(&self, token: &str) -> Result<ValidateResponse, MetaClientError> {
        let mut req = self.client.post(&url).json(&body);

        // 注入 W3C TraceContext headers
        let propagator = TraceContextPropagator::new();
        let cx = opentelemetry::Context::current();
        let mut headers = HashMap::new();
        propagator.inject_context(&cx, &mut headers);
        for (k, v) in headers {
            req = req.header(&k, &v);
        }

        req.send().await?
    }
}

配置

server:
  tracing:
    otlp_endpoint: "http://localhost:4317"   # gRPC OTLP collector
    enabled: true                             # 默认 true
    sample_rate: 0.1                          # 采样率,1.0 = 全量

验收标准

  • Jaeger/Tempo 中可看到 fs9-serverfs9-meta 的关联 trace
  • 单个请求的 span 包含: HTTP handler → meta validation → provider operation
  • 采样率可配置,0.1 时仅 10% 请求产生 trace
  • OTLP endpoint 不可达时不影响正常请求处理

3.7 流式大文件读写

现状

handlers.rs:160:

let data = ns.vfs.read(&Handle::new(handle_id), req.offset, req.size).await?;
Ok((StatusCode::OK, data))  // 整个 Bytes 一次性返回

读取 100MB 文件 = 100MB 内存分配。百万用户场景下几个大文件请求就能 OOM。

设计

分层改造,Phase 2 仅改 HTTP 层,不动 FsProvider trait。

                          Phase 2 (HTTP 层)
                    ┌──────────────────────────┐
   Client ◄────────│  Chunked Transfer-Encoding│
                    │  (Stream<Bytes>)          │
                    └──────────┬───────────────┘
                               │ 分块调用
                    ┌──────────▼───────────────┐
                    │  Loop:                    │
                    │    read(handle, offset,   │
                    │         CHUNK_SIZE)       │
                    │    yield chunk            │
                    │    offset += chunk.len()  │
                    │  Until: chunk.len() < CS  │
                    │         or offset >= size │
                    └──────────────────────────┘

CHUNK_SIZE: 256KB(兼顾吞吐和内存)

涉及文件

文件 改动
server/src/api/handlers.rs read handler 改为 streaming response
server/src/api/models.rs ReadRequest 增加可选 stream: bool 参数

实现细节

handlers.rs — streaming read:

use axum::body::Body;
use futures::stream;
use tokio_stream::StreamExt;

const STREAM_CHUNK_SIZE: usize = 256 * 1024; // 256KB

pub async fn read(
    State(state): State<Arc<AppState>>,
    Extension(ctx): Extension<RequestContext>,
    Json(req): Json<ReadRequest>,
) -> AppResult<Response> {
    let ns = resolve_ns(&state, &ctx).await?;
    let handle_id = ns.handle_map.read().await
        .get_id(&req.handle_id)
        .ok_or_else(|| FsError::invalid_argument("invalid handle_id"))?;

    let total_size = req.size;

    // 小请求(≤ 1MB): 保持原有行为,一次性返回
    if total_size <= 1024 * 1024 {
        let data = ns.vfs.read(&Handle::new(handle_id), req.offset, total_size).await?;
        return Ok((StatusCode::OK, data).into_response());
    }

    // 大请求: 流式返回
    let vfs = ns.vfs.clone();
    let handle = Handle::new(handle_id);
    let mut offset = req.offset;
    let end_offset = req.offset + total_size as u64;

    let stream = async_stream::stream! {
        while offset < end_offset {
            let remaining = (end_offset - offset) as usize;
            let chunk_size = remaining.min(STREAM_CHUNK_SIZE);

            match vfs.read(&handle, offset, chunk_size).await {
                Ok(data) => {
                    let len = data.len();
                    if len == 0 { break; }
                    offset += len as u64;
                    yield Ok::<_, std::io::Error>(data);
                    if len < chunk_size { break; }  // EOF
                }
                Err(e) => {
                    yield Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()));
                    break;
                }
            }
        }
    };

    Ok(Response::builder()
        .status(StatusCode::OK)
        .header("transfer-encoding", "chunked")
        .body(Body::from_stream(stream))
        .unwrap())
}

向后兼容性

  • 请求 size ≤ 1MB: 行为不变(一次性 Bytes 返回)
  • 请求 size > 1MB: 自动切换为 chunked streaming
  • Client 端: reqwest 默认支持 chunked 解码,Rust/Python client 无需改动

验收标准

  • 读取 100MB 文件时,server 进程内存增长 < 10MB
  • 1MB 以下请求行为和延迟不变
  • Python client 和 Rust client 均能正常接收 streaming response
  • 中途断连时 stream 正确终止,无资源泄漏

4. 依赖关系与实施顺序

Phase 1(可并行):
  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
  │ 2.1 停机 │   │ 2.2 限流 │   │ 2.3 指标 │   │ 2.4 PG   │   │ 2.5 Body │
  └────┬─────┘   └────┬─────┘   └────┬─────┘   └────┬─────┘   └────┬─────┘
       │              │              │              │              │
       └──────────────┴──────┬───────┴──────────────┴──────────────┘
                             │
Phase 2:                     │ 依赖
  ┌──────────────────────────▼────────────────────────────────────────┐
  │                                                                    │
  │  ┌──────────┐   ┌──────────┐                                      │
  │  │3.4 锁优化│   │3.5 NS分片│  ← 可立即开始,无依赖                 │
  │  └──────────┘   └──────────┘                                      │
  │                                                                    │
  │  ┌──────────┐                                                      │
  │  │3.2 熔断器│  ← 依赖 2.3(需要 metrics 监控熔断器状态)           │
  │  └──────────┘                                                      │
  │                                                                    │
  │  ┌──────────┐                                                      │
  │  │3.3 Token │  ← 可独立,但建议在 2.3 之后(需要监控撤销操作)     │
  │  └──────────┘                                                      │
  │                                                                    │
  │  ┌──────────┐                                                      │
  │  │3.6 OTEL  │  ← 依赖 2.3(共享 tracing 初始化基础设施)          │
  │  └──────────┘                                                      │
  │                                                                    │
  │  ┌──────────┐                                                      │
  │  │3.7 流式  │  ← 可独立                                           │
  │  └──────────┘                                                      │
  │                                                                    │
  │  ┌──────────┐                                                      │
  │  │3.1 Sticky│  ← 依赖 2.3(需要 per-instance metrics 验证)       │
  │  └──────────┘                                                      │
  └────────────────────────────────────────────────────────────────────┘

建议实施顺序:

顺序 项目 预估工时 依赖
1 2.5 请求体大小限制 0.5d
2 2.1 优雅停机 1d
3 2.3 Prometheus Metrics 2d
4 2.2 Per-Tenant 限流 1d
5 2.4 PostgreSQL 3d
6 3.4 cleanup_stale 锁优化 0.5d
7 3.5 NamespaceManager 分片 1d
8 3.2 Meta Client 熔断器 2d 2.3
9 3.3 Token 撤销 1.5d
10 3.7 流式大文件 2d
11 3.6 OpenTelemetry 2d 2.3
12 3.1 Sticky Session 1d 2.3
总计 ~17.5d

5. 新增依赖汇总

server/Cargo.toml

# Rate limiting
governor = { version = "0.7", features = ["dashmap"] }

# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

# Namespace sharding
dashmap = "6"

# Streaming response
async-stream = "0.3"

# OpenTelemetry (Phase 2)
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
tracing-opentelemetry = "0.28"

meta/Cargo.toml

# 已有 postgres feature,无需新增依赖
# OpenTelemetry (Phase 2, 同 server)
opentelemetry = "0.27"
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
tracing-opentelemetry = "0.28"

6. 配置变更汇总

所有新增配置项及其默认值:

server:
  # Phase 1
  shutdown_timeout_secs: 30
  max_body_size_bytes: 2097152        # 2MB
  max_write_size_bytes: 268435456     # 256MB

  rate_limit:
    namespace_qps: 1000
    user_qps: 100

  metrics:
    enabled: true
    path: "/metrics"

  # Phase 2
  meta:
    circuit_breaker:
      failure_threshold: 5
      recovery_timeout_secs: 30
    retry:
      max_attempts: 3
      base_delay_ms: 100

  auth:
    refresh_grace_period_hours: 4
    revocation_capacity: 500000

  tracing:
    otlp_endpoint: "http://localhost:4317"
    enabled: true
    sample_rate: 0.1

database:
  # meta 服务
  dsn: "postgres://..."
  max_connections: 50
  min_connections: 5

7. 验收标准

Phase 1 整体验收

  • make check 通过(lint + test)
  • SIGTERM 后优雅停机,无资源泄漏
  • GET /metrics 返回 Prometheus 格式指标
  • 单租户超限后返回 429,不影响其他租户
  • fs9-meta 可使用 PostgreSQL 后端运行所有 E2E 测试
  • 超大 body 返回 413

Phase 2 整体验收

  • fs9-meta 不可用时,server 在 5 次失败后自动熔断,无 10s 延迟
  • 被撤销 token 即时失效(< 1s)
  • cleanup_stale 期间其他 handle 操作无延迟抖动
  • Jaeger 中可查看跨服务 trace
  • 100MB 文件读取时内存增长 < 10MB
  • 多实例部署下同一 namespace 稳定路由到同一实例

性能基线(目标)

指标 目标
单实例 QPS (read) > 10,000
P99 延迟 (stat) < 10ms
并发 namespace > 10,000
并发 handle > 100,000
Token 验证延迟 (cached) < 1ms
Token 验证延迟 (meta) < 50ms
优雅停机时间 < 30s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment