Skip to content

Instantly share code, notes, and snippets.

@justincormack
Created January 29, 2026 11:59
Show Gist options
  • Select an option

  • Save justincormack/59a7f3e781ca044f9194eaa624aaa0c0 to your computer and use it in GitHub Desktop.

Select an option

Save justincormack/59a7f3e781ca044f9194eaa624aaa0c0 to your computer and use it in GitHub Desktop.
rustfs conditional PUT concurrency issues
// Conditional PUT Race Condition Test
//
// This test verifies whether RustFS correctly handles concurrent conditional PUT
// requests from multiple nodes. With If-None-Match: "*", only ONE writer should
// succeed when creating a new object - all others should get 412 Precondition Failed.
//
// Usage:
// cargo run --bin conditional_put_race_test
//
// Or run the test directly:
// cargo test --package rustfs_e2e_test --lib conditional_put_race_test -- --nocapture
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::Client;
use bytes::Bytes;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Barrier;
const ACCESS_KEY: &str = "rustfsadmin";
const SECRET_KEY: &str = "rustfsadmin";
const BUCKET: &str = "test-bucket";
/// Create an S3 client pointing to a specific endpoint
async fn create_client(endpoint: &str) -> Client {
let creds = Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static");
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(Region::new("us-east-1"))
.credentials_provider(creds)
.endpoint_url(endpoint)
.load()
.await;
Client::from_conf(
aws_sdk_s3::Config::from(&config)
.to_builder()
.force_path_style(true)
.build(),
)
}
/// Ensure the test bucket exists
async fn ensure_bucket(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match client.create_bucket().bucket(BUCKET).send().await {
Ok(_) => Ok(()),
Err(SdkError::ServiceError(e)) => {
let code = e.err().meta().code().unwrap_or("");
if code == "BucketAlreadyExists" || code == "BucketAlreadyOwnedByYou" {
Ok(())
} else {
Err(e.into_err().into())
}
}
Err(e) => Err(e.into()),
}
}
/// Delete object if it exists
async fn cleanup_object(client: &Client, key: &str) {
let _ = client.delete_object().bucket(BUCKET).key(key).send().await;
}
/// Attempt conditional PUT with If-None-Match: "*"
/// Returns Ok(true) if succeeded, Ok(false) if got 412, Err for other errors
async fn conditional_put(
client: &Client,
key: &str,
data: &[u8],
client_id: usize,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let result = client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from(data.to_vec()).into())
.if_none_match("*")
.send()
.await;
match result {
Ok(resp) => {
println!(
" Client {} SUCCEEDED - ETag: {}",
client_id,
resp.e_tag().unwrap_or("none")
);
Ok(true)
}
Err(SdkError::ServiceError(e)) => {
let code = e.err().meta().code().unwrap_or("");
if code == "PreconditionFailed" {
println!(" Client {} got 412 PreconditionFailed (correct rejection)", client_id);
Ok(false)
} else {
println!(" Client {} got error: {}", client_id, code);
Err(e.into_err().into())
}
}
Err(e) => {
println!(" Client {} got error: {}", client_id, e);
Err(e.into())
}
}
}
/// Run a single race test iteration
async fn run_race_test(
clients: &[Client],
test_key: &str,
iteration: usize,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
// Clean up any existing object
cleanup_object(&clients[0], test_key).await;
// Small delay to ensure deletion propagates
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Verify object doesn't exist
let head_result = clients[0]
.head_object()
.bucket(BUCKET)
.key(test_key)
.send()
.await;
if head_result.is_ok() {
println!("Warning: Object still exists after cleanup, skipping iteration {}", iteration);
return Ok(0);
}
println!("\n=== Iteration {} ===", iteration);
println!("Launching {} concurrent conditional PUTs to different nodes...", clients.len());
// Use a barrier to synchronize all clients
let barrier = Arc::new(Barrier::new(clients.len()));
let test_key = test_key.to_string();
let mut handles = vec![];
for (i, client) in clients.iter().enumerate() {
let client = client.clone();
let barrier = barrier.clone();
let key = test_key.clone();
let data = format!("data from client {}", i).into_bytes();
let handle = tokio::spawn(async move {
// Wait for all clients to be ready
barrier.wait().await;
// All clients fire simultaneously
conditional_put(&client, &key, &data, i).await
});
handles.push(handle);
}
// Collect results
let mut success_count = 0;
for handle in handles {
match handle.await {
Ok(Ok(true)) => success_count += 1,
Ok(Ok(false)) => {} // Expected rejection
Ok(Err(e)) => println!(" Error: {}", e),
Err(e) => println!(" Task error: {}", e),
}
}
println!("Result: {} out of {} succeeded", success_count, clients.len());
if success_count > 1 {
println!(">>> RACE CONDITION DETECTED! Multiple writers succeeded when only 1 should have.");
} else if success_count == 1 {
println!(">>> Correct behavior: exactly 1 writer succeeded.");
} else {
println!(">>> Unexpected: no writers succeeded.");
}
Ok(success_count)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Configure your endpoints here
let endpoints = vec![
"http://localhost:9000",
"http://localhost:9001",
"http://localhost:9002",
"http://localhost:9003",
];
println!("Conditional PUT Race Condition Test");
println!("====================================");
println!("Endpoints: {:?}", endpoints);
println!();
// Create clients for each endpoint
let mut clients = vec![];
for endpoint in &endpoints {
clients.push(create_client(endpoint).await);
}
// Ensure bucket exists (use first client)
println!("Ensuring test bucket '{}' exists...", BUCKET);
ensure_bucket(&clients[0]).await?;
println!("Bucket ready.\n");
// Run multiple iterations to increase chance of catching race
let iterations = 10;
let mut total_races_detected = 0;
let mut total_correct = 0;
let start = Instant::now();
for i in 1..=iterations {
let test_key = format!("race-test-{}-{}", std::process::id(), i);
match run_race_test(&clients, &test_key, i).await {
Ok(success_count) => {
if success_count > 1 {
total_races_detected += 1;
} else if success_count == 1 {
total_correct += 1;
}
}
Err(e) => {
println!("Iteration {} failed: {}", i, e);
}
}
// Cleanup
cleanup_object(&clients[0], &test_key).await;
// Small delay between iterations
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
let elapsed = start.elapsed();
println!("\n");
println!("====================================");
println!("SUMMARY");
println!("====================================");
println!("Total iterations: {}", iterations);
println!("Correct (1 winner): {}", total_correct);
println!("Race conditions: {}", total_races_detected);
println!("Time elapsed: {:?}", elapsed);
println!();
if total_races_detected > 0 {
println!("CONCLUSION: Race condition confirmed!");
println!("Conditional PUT is NOT safe for multi-node concurrent writes.");
println!("This affects Iceberg/Delta Lake table commits and similar use cases.");
} else {
println!("CONCLUSION: No race conditions detected in {} iterations.", iterations);
println!("This doesn't prove safety - try more iterations or adjust timing.");
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore = "requires running RustFS cluster"]
async fn test_conditional_put_race() {
let endpoints = vec![
"http://localhost:9000",
"http://localhost:9001",
"http://localhost:9002",
"http://localhost:9003",
];
let mut clients = vec![];
for endpoint in &endpoints {
clients.push(create_client(endpoint).await);
}
ensure_bucket(&clients[0]).await.expect("Failed to create bucket");
let mut races_detected = 0;
for i in 1..=20 {
let test_key = format!("test-race-{}", i);
if let Ok(count) = run_race_test(&clients, &test_key, i).await {
if count > 1 {
races_detected += 1;
}
}
cleanup_object(&clients[0], &test_key).await;
}
// We expect races to be detected given the architecture
println!("\nRaces detected: {}/20", races_detected);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment