Skip to content

Instantly share code, notes, and snippets.

@corylanou
Created September 23, 2025 15:21
Show Gist options
  • Select an option

  • Save corylanou/4c51f8dc384f63171fdb2c1fd70cb5cf to your computer and use it in GitHub Desktop.

Select an option

Save corylanou/4c51f8dc384f63171fdb2c1fd70cb5cf to your computer and use it in GitHub Desktop.
Litestream #752 reproduction and fix
package litestream
import (
"bytes"
"context"
"database/sql"
"encoding/binary"
"errors"
"fmt"
"hash/crc64"
"io"
"log/slog"
"os"
"path/filepath"
"slices"
"strconv"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/superfly/ltx"
"modernc.org/sqlite"
"github.com/benbjohnson/litestream/internal"
)
// Default DB settings.
const (
DefaultMonitorInterval = 1 * time.Second
DefaultCheckpointInterval = 1 * time.Minute
DefaultBusyTimeout = 1 * time.Second
DefaultMinCheckpointPageN = 1000
DefaultMaxCheckpointPageN = 10000
DefaultTruncatePageN = 500000
)
// DB represents a managed instance of a SQLite database in the file system.
type DB struct {
mu sync.RWMutex
path string // part to database
metaPath string // Path to the database metadata.
db *sql.DB // target database
f *os.File // long-running db file descriptor
rtx *sql.Tx // long running read transaction
pageSize int // page size, in bytes
notify chan struct{} // closes on WAL change
chkMu sync.RWMutex // checkpoint lock
// last file info for each level
maxLTXFileInfos struct {
sync.Mutex
m map[int]*ltx.FileInfo
}
fileInfo os.FileInfo // db info cached during init
dirInfo os.FileInfo // parent dir info cached during init
ctx context.Context
cancel func()
wg sync.WaitGroup
// Metrics
dbSizeGauge prometheus.Gauge
walSizeGauge prometheus.Gauge
totalWALBytesCounter prometheus.Counter
txIDGauge prometheus.Gauge
syncNCounter prometheus.Counter
syncErrorNCounter prometheus.Counter
syncSecondsCounter prometheus.Counter
checkpointNCounterVec *prometheus.CounterVec
checkpointErrorNCounterVec *prometheus.CounterVec
checkpointSecondsCounterVec *prometheus.CounterVec
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
// active transactions occurring at the same time.
MinCheckpointPageN int
// Maximum threshold of WAL size, in pages, before a forced checkpoint.
// A forced checkpoint will block new transactions and wait for existing
// transactions to finish before issuing a checkpoint and resetting the WAL.
//
// If zero, no checkpoints are forced. This can cause the WAL to grow
// unbounded if there are always read transactions occurring.
MaxCheckpointPageN int
// Threshold of WAL size, in pages, before a forced truncation checkpoint.
// A forced truncation checkpoint will block new transactions and wait for
// existing transactions to finish before issuing a checkpoint and
// truncating the WAL.
//
// If zero, no truncates are forced. This can cause the WAL to grow
// unbounded if there's a sudden spike of changes between other
// checkpoints.
TruncatePageN int
// Time between automatic checkpoints in the WAL. This is done to allow
// more fine-grained WAL files so that restores can be performed with
// better precision.
CheckpointInterval time.Duration
// Frequency at which to perform db sync.
MonitorInterval time.Duration
// The timeout to wait for EBUSY from SQLite.
BusyTimeout time.Duration
// Remote replica for the database.
// Must be set before calling Open().
Replica *Replica
// Where to send log messages, defaults to global slog with database epath.
Logger *slog.Logger
}
// NewDB returns a new instance of DB for a given path.
func NewDB(path string) *DB {
dir, file := filepath.Split(path)
db := &DB{
path: path,
metaPath: filepath.Join(dir, "."+file+MetaDirSuffix),
notify: make(chan struct{}),
MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
TruncatePageN: DefaultTruncatePageN,
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,
BusyTimeout: DefaultBusyTimeout,
Logger: slog.With("db", filepath.Base(path)),
}
db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo)
db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
db.txIDGauge = txIDIndexGaugeVec.WithLabelValues(db.path)
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.ctx, db.cancel = context.WithCancel(context.Background())
return db
}
// SQLDB returns a reference to the underlying sql.DB connection.
func (db *DB) SQLDB() *sql.DB {
return db.db
}
// Path returns the path to the database.
func (db *DB) Path() string {
return db.path
}
// WALPath returns the path to the database's WAL file.
func (db *DB) WALPath() string {
return db.path + "-wal"
}
// MetaPath returns the path to the database metadata.
func (db *DB) MetaPath() string {
return db.metaPath
}
// SetMetaPath sets the path to database metadata.
func (db *DB) SetMetaPath(path string) {
db.metaPath = path
}
// LTXDir returns path of the root LTX directory.
func (db *DB) LTXDir() string {
return filepath.Join(db.metaPath, "ltx")
}
// LTXLevelDir returns path of the given LTX compaction level.
// Panics if level is negative.
func (db *DB) LTXLevelDir(level int) string {
return filepath.Join(db.LTXDir(), strconv.Itoa(level))
}
// LTXPath returns the local path of a single LTX file.
// Panics if level or either txn ID is negative.
func (db *DB) LTXPath(level int, minTXID, maxTXID ltx.TXID) string {
assert(level >= 0, "level cannot be negative")
return filepath.Join(db.LTXLevelDir(level), ltx.FormatFilename(minTXID, maxTXID))
}
// MaxLTX returns the last LTX file written to level 0.
func (db *DB) MaxLTX() (minTXID, maxTXID ltx.TXID, err error) {
ents, err := os.ReadDir(db.LTXLevelDir(0))
if os.IsNotExist(err) {
return 0, 0, nil // no LTX files written
} else if err != nil {
return 0, 0, err
}
// Find highest txn ID.
for _, ent := range ents {
if min, max, err := ltx.ParseFilename(ent.Name()); err != nil {
continue // invalid LTX filename
} else if max > maxTXID {
minTXID, maxTXID = min, max
}
}
return minTXID, maxTXID, nil
}
// FileInfo returns the cached file stats for the database file when it was initialized.
func (db *DB) FileInfo() os.FileInfo {
return db.fileInfo
}
// DirInfo returns the cached file stats for the parent directory of the database file when it was initialized.
func (db *DB) DirInfo() os.FileInfo {
return db.dirInfo
}
// Pos returns the current replication position of the database.
func (db *DB) Pos() (ltx.Pos, error) {
minTXID, maxTXID, err := db.MaxLTX()
if err != nil {
return ltx.Pos{}, err
} else if minTXID == 0 {
return ltx.Pos{}, nil // no replication yet
}
f, err := os.Open(db.LTXPath(0, minTXID, maxTXID))
if err != nil {
return ltx.Pos{}, err
}
defer func() { _ = f.Close() }()
dec := ltx.NewDecoder(f)
if err := dec.Verify(); err != nil {
return ltx.Pos{}, fmt.Errorf("ltx verification failed: %w", err)
}
return dec.PostApplyPos(), nil
}
// Notify returns a channel that closes when the shadow WAL changes.
func (db *DB) Notify() <-chan struct{} {
db.mu.RLock()
defer db.mu.RUnlock()
return db.notify
}
// PageSize returns the page size of the underlying database.
// Only valid after database exists & Init() has successfully run.
func (db *DB) PageSize() int {
db.mu.RLock()
defer db.mu.RUnlock()
return db.pageSize
}
// Open initializes the background monitoring goroutine.
func (db *DB) Open() (err error) {
// Validate fields on database.
if db.MinCheckpointPageN <= 0 {
return fmt.Errorf("minimum checkpoint page count required")
}
// Clear old temporary files that my have been left from a crash.
if err := removeTmpFiles(db.metaPath); err != nil {
return fmt.Errorf("cannot remove tmp files: %w", err)
}
// Start monitoring SQLite database in a separate goroutine.
if db.MonitorInterval > 0 {
db.wg.Add(1)
go func() { defer db.wg.Done(); db.monitor() }()
}
return nil
}
// Close flushes outstanding WAL writes to replicas, releases the read lock,
// and closes the database. Takes a context for final sync.
func (db *DB) Close(ctx context.Context) (err error) {
db.cancel()
db.wg.Wait()
// Perform a final db sync, if initialized.
if db.db != nil {
if e := db.Sync(ctx); e != nil {
err = e
}
}
// Ensure replicas perform a final sync and stop replicating.
if db.Replica != nil {
if db.db != nil {
if e := db.Replica.Sync(ctx); e != nil && err == nil {
err = e
}
}
db.Replica.Stop(true)
}
// Release the read lock to allow other applications to handle checkpointing.
if db.rtx != nil {
if e := db.releaseReadLock(); e != nil && err == nil {
err = e
}
}
if db.db != nil {
if e := db.db.Close(); e != nil && err == nil {
err = e
}
}
if db.f != nil {
if e := db.f.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// setPersistWAL sets the PERSIST_WAL file control on the database connection.
// This prevents SQLite from removing the WAL file when connections close.
func (db *DB) setPersistWAL(ctx context.Context) error {
conn, err := db.db.Conn(ctx)
if err != nil {
return fmt.Errorf("get connection: %w", err)
}
defer conn.Close()
return conn.Raw(func(driverConn interface{}) error {
fc, ok := driverConn.(sqlite.FileControl)
if !ok {
return fmt.Errorf("driver does not implement FileControl")
}
_, err := fc.FileControlPersistWAL("main", 1)
if err != nil {
return fmt.Errorf("FileControlPersistWAL: %w", err)
}
return nil
})
}
// init initializes the connection to the database.
// Skipped if already initialized or if the database file does not exist.
func (db *DB) init(ctx context.Context) (err error) {
// Exit if already initialized.
if db.db != nil {
return nil
}
// Exit if no database file exists.
fi, err := os.Stat(db.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
db.fileInfo = fi
// Obtain permissions for parent directory.
if fi, err = os.Stat(filepath.Dir(db.path)); err != nil {
return err
}
db.dirInfo = fi
dsn := db.path
dsn += fmt.Sprintf("?_busy_timeout=%d", db.BusyTimeout.Milliseconds())
if db.db, err = sql.Open("sqlite", dsn); err != nil {
return err
}
// Set PERSIST_WAL to prevent WAL file removal when database connections close.
if err := db.setPersistWAL(ctx); err != nil {
return fmt.Errorf("set PERSIST_WAL: %w", err)
}
// Open long-running database file descriptor. Required for non-OFD locks.
if db.f, err = os.Open(db.path); err != nil {
return fmt.Errorf("open db file descriptor: %w", err)
}
// Ensure database is closed if init fails.
// Initialization can retry on next sync.
defer func() {
if err != nil {
_ = db.releaseReadLock()
db.db.Close()
db.f.Close()
db.db, db.f = nil, nil
}
}()
// Enable WAL and ensure it is set. New mode should be returned on success:
// https://www.sqlite.org/pragma.html#pragma_journal_mode
var mode string
if err := db.db.QueryRowContext(ctx, `PRAGMA journal_mode = wal;`).Scan(&mode); err != nil {
return err
} else if mode != "wal" {
return fmt.Errorf("enable wal failed, mode=%q", mode)
}
// Disable autocheckpoint for litestream's connection.
if _, err := db.db.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0;`); err != nil {
return fmt.Errorf("disable autocheckpoint: %w", err)
}
// Create a table to force writes to the WAL when empty.
// There should only ever be one row with id=1.
if _, err := db.db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS _litestream_seq (id INTEGER PRIMARY KEY, seq INTEGER);`); err != nil {
return fmt.Errorf("create _litestream_seq table: %w", err)
}
// Create a lock table to force write locks during sync.
// The sync write transaction always rolls back so no data should be in this table.
if _, err := db.db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS _litestream_lock (id INTEGER);`); err != nil {
return fmt.Errorf("create _litestream_lock table: %w", err)
}
// Start a long-running read transaction to prevent other transactions
// from checkpointing.
if err := db.acquireReadLock(ctx); err != nil {
return fmt.Errorf("acquire read lock: %w", err)
}
// Read page size.
if err := db.db.QueryRowContext(ctx, `PRAGMA page_size;`).Scan(&db.pageSize); err != nil {
return fmt.Errorf("read page size: %w", err)
} else if db.pageSize <= 0 {
return fmt.Errorf("invalid db page size: %d", db.pageSize)
}
// Ensure meta directory structure exists.
if err := internal.MkdirAll(db.metaPath, db.dirInfo); err != nil {
return err
}
// Ensure WAL has at least one frame in it.
if err := db.ensureWALExists(ctx); err != nil {
return fmt.Errorf("ensure wal exists: %w", err)
}
// If we have an existing replication files, ensure the headers match.
// if err := db.verifyHeadersMatch(); err != nil {
// return fmt.Errorf("cannot determine last wal position: %w", err)
// }
// TODO(gen): Generate diff of current LTX snapshot and save as next LTX file.
// Start replication.
if db.Replica != nil {
db.Replica.Start(db.ctx)
}
return nil
}
/*
// verifyHeadersMatch returns an error if
func (db *DB) verifyHeadersMatch() error {
pos, err := db.Pos()
if err != nil {
return false, fmt.Errorf("cannot determine position: %w", err)
} else if pos.TXID == 0 {
return true, nil // no replication performed yet
}
hdr0, err := readWALHeader(db.WALPath())
if os.IsNotExist(err) {
return false, fmt.Errorf("no wal: %w", err)
} else if err != nil {
return false, fmt.Errorf("read wal header: %w", err)
}
salt1 := binary.BigEndian.Uint32(hdr0[16:])
salt2 := binary.BigEndian.Uint32(hdr0[20:])
ltxPath := db.LTXPath(0, pos.TXID, pos.TXID)
f, err := os.Open(ltxPath)
if err != nil {
return false, fmt.Errorf("open ltx path: %w", err)
}
defer func() { _ = f.Close() }()
dec := ltx.NewDecoder(f)
if err := dec.DecodeHeader(); err != nil {
return false, fmt.Errorf("decode ltx header: %w", err)
}
hdr1 := dec.Header()
if salt1 != hdr1.WALSalt1 || salt2 != hdr1.WALSalt2 {
db.Logger.Log(internal.LevelTrace, "salt mismatch",
"path", ltxPath,
"wal", [2]uint32{salt1, salt2},
"ltx", [2]uint32{hdr1.WALSalt1, hdr1.WALSalt2})
return false, nil
}
return true, nil
}
*/
// acquireReadLock begins a read transaction on the database to prevent checkpointing.
func (db *DB) acquireReadLock(ctx context.Context) error {
if db.rtx != nil {
return nil
}
// Start long running read-transaction to prevent checkpoints.
tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// Execute read query to obtain read lock.
if _, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
_ = tx.Rollback()
return err
}
// Track transaction so we can release it later before checkpoint.
db.rtx = tx
return nil
}
// releaseReadLock rolls back the long-running read transaction.
func (db *DB) releaseReadLock() error {
// Ignore if we do not have a read lock.
if db.rtx == nil {
return nil
}
// Rollback & clear read transaction.
err := db.rtx.Rollback()
db.rtx = nil
return err
}
// Sync copies pending data from the WAL to the shadow WAL.
func (db *DB) Sync(ctx context.Context) (err error) {
db.mu.Lock()
defer db.mu.Unlock()
// Initialize database, if necessary. Exit if no DB exists.
if err := db.init(ctx); err != nil {
return err
} else if db.db == nil {
db.Logger.Debug("sync: no database found")
return nil
}
// Track total sync metrics.
t := time.Now()
defer func() {
db.syncNCounter.Inc()
if err != nil {
db.syncErrorNCounter.Inc()
}
db.syncSecondsCounter.Add(float64(time.Since(t).Seconds()))
}()
// Ensure WAL has at least one frame in it.
if err := db.ensureWALExists(ctx); err != nil {
return fmt.Errorf("ensure wal exists: %w", err)
}
if err := db.verifyAndSync(ctx, false); err != nil {
return err
}
/*
// TODO(ltx): Move checkpointing into its own goroutine?
// If WAL size is greater than max threshold, force checkpoint.
// If WAL size is greater than min threshold, attempt checkpoint.
var checkpoint bool
checkpointMode := CheckpointModePassive
if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) {
checkpoint, checkpointMode = true, CheckpointModeTruncate
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
checkpoint, checkpointMode = true, CheckpointModeRestart
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
checkpoint = true
} else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
checkpoint = true
}
// Issue the checkpoint.
if checkpoint {
changed = true
if err := db.checkpoint(ctx, checkpointMode); err != nil {
return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
}
}
*/
// Compute current index and total shadow WAL size.
pos, err := db.Pos()
if err != nil {
return fmt.Errorf("pos: %w", err)
}
db.txIDGauge.Set(float64(pos.TXID))
// db.shadowWALSizeGauge.Set(float64(size))
// Notify replicas of WAL changes.
// if changed {
close(db.notify)
db.notify = make(chan struct{})
// }
return nil
}
func (db *DB) verifyAndSync(ctx context.Context, checkpointing bool) error {
// Verify our last sync matches the current state of the WAL.
// This ensures that the last sync position of the real WAL hasn't
// been overwritten by another process.
info, err := db.verify(ctx)
if err != nil {
return fmt.Errorf("cannot verify wal state: %w", err)
}
if err := db.sync(ctx, checkpointing, info); err != nil {
return fmt.Errorf("sync: %w", err)
}
return nil
}
// ensureWALExists checks that the real WAL exists and has a header.
func (db *DB) ensureWALExists(ctx context.Context) (err error) {
// Exit early if WAL header exists.
if fi, err := os.Stat(db.WALPath()); err == nil && fi.Size() >= WALHeaderSize {
return nil
}
// Otherwise create transaction that updates the internal litestream table.
_, err = db.db.ExecContext(ctx, `INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`)
return err
}
// verify ensures the current LTX state matches where it left off from
// the real WAL. Check info.ok if verification was successful.
func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
info.snapshotting = true
pos, err := db.Pos()
if err != nil {
return info, fmt.Errorf("pos: %w", err)
} else if pos.TXID == 0 {
info.offset = WALHeaderSize
return info, nil // first sync
}
// Determine last WAL offset we save from.
ltxFile, err := os.Open(db.LTXPath(0, pos.TXID, pos.TXID))
if err != nil {
return info, fmt.Errorf("open ltx file: %w", err)
}
defer func() { _ = ltxFile.Close() }()
dec := ltx.NewDecoder(ltxFile)
if err := dec.DecodeHeader(); err != nil {
return info, fmt.Errorf("decode ltx file: %w", err)
}
info.offset = dec.Header().WALOffset + dec.Header().WALSize
info.salt1 = dec.Header().WALSalt1
info.salt2 = dec.Header().WALSalt2
// If LTX WAL offset is larger than real WAL then the WAL has been truncated
// so we cannot determine our last state.
if fi, err := os.Stat(db.WALPath()); err != nil {
return info, fmt.Errorf("open wal file: %w", err)
} else if info.offset > fi.Size() {
info.reason = "wal truncated by another process"
return info, nil
}
// Compare WAL headers. Restart from beginning of WAL if different.
hdr0, err := readWALHeader(db.WALPath())
if err != nil {
return info, fmt.Errorf("cannot read wal header: %w", err)
}
salt1 := binary.BigEndian.Uint32(hdr0[16:])
salt2 := binary.BigEndian.Uint32(hdr0[20:])
if salt1 != dec.Header().WALSalt1 || salt2 != dec.Header().WALSalt2 {
db.Logger.Log(ctx, internal.LevelTrace, "wal restarted",
"salt1", salt1,
"salt2", salt2)
info.offset = WALHeaderSize
info.salt1, info.salt2 = salt1, salt2
info.reason = "wal salt changed"
return info, nil
}
// If offset is at the beginning of the first page, we can't check for previous page.
frameSize := int64(db.pageSize + WALFrameHeaderSize)
prevWALOffset := info.offset - frameSize
if prevWALOffset <= 0 {
info.reason = "wal offset before first frame"
return info, nil
}
// Verify last page exists in latest LTX file.
buf, err := readWALFileAt(db.WALPath(), prevWALOffset, frameSize)
if err != nil {
return info, fmt.Errorf("cannot read last synced wal page: %w", err)
}
pgno := binary.BigEndian.Uint32(buf[0:])
fsalt1 := binary.BigEndian.Uint32(buf[8:])
fsalt2 := binary.BigEndian.Uint32(buf[12:])
if fsalt1 != dec.Header().WALSalt1 || fsalt2 != dec.Header().WALSalt2 {
info.reason = "frame salt mismatch, wal overwritten by another process"
return info, nil
}
// Verify that the last page in the WAL exists in the last LTX file.
if ok, err := db.ltxDecoderContains(dec, pgno, buf[WALFrameHeaderSize:]); err != nil {
return info, fmt.Errorf("ltx contains: %w", err)
} else if !ok {
db.Logger.Log(ctx, internal.LevelTrace, "cannot find last page in last ltx file", "pgno", pgno, "offset", prevWALOffset)
info.reason = "last page does not exist in last ltx file, wal overwritten by another process"
return info, nil
}
info.snapshotting = false
return info, nil
}
func (db *DB) ltxDecoderContains(dec *ltx.Decoder, pgno uint32, data []byte) (bool, error) {
buf := make([]byte, dec.Header().PageSize)
for {
var hdr ltx.PageHeader
if err := dec.DecodePage(&hdr, buf); errors.Is(err, io.EOF) {
return false, nil
} else if err != nil {
return false, fmt.Errorf("decode ltx page: %w", err)
}
if pgno != hdr.Pgno {
continue
}
if !bytes.Equal(data, buf) {
continue
}
return true, nil
}
}
type syncInfo struct {
offset int64 // end of the previous LTX read
salt1 uint32
salt2 uint32
snapshotting bool // if true, a full snapshot is required
reason string // reason for snapshot
}
// sync copies pending bytes from the real WAL to LTX.
func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error {
// Determine the next sequential transaction ID.
pos, err := db.Pos()
if err != nil {
return fmt.Errorf("pos: %w", err)
}
txID := pos.TXID + 1
filename := db.LTXPath(0, txID, txID)
logArgs := []any{
"txid", txID.String(),
"offset", info.offset,
}
if checkpointing {
logArgs = append(logArgs, "chkpt", "true")
}
if info.snapshotting {
logArgs = append(logArgs, "snap", "true")
}
if info.reason != "" {
logArgs = append(logArgs, "reason", info.reason)
}
db.Logger.Debug("sync", logArgs...)
// Prevent internal checkpoints during sync. Ignore if already in a checkpoint.
if !checkpointing {
db.chkMu.RLock()
defer db.chkMu.RUnlock()
}
fi, err := db.f.Stat()
if err != nil {
return err
}
mode := fi.Mode()
commit := uint32(fi.Size() / int64(db.pageSize))
walFile, err := os.Open(db.WALPath())
if err != nil {
return err
}
defer walFile.Close()
var rd *WALReader
if info.offset == WALHeaderSize {
if rd, err = NewWALReader(walFile, db.Logger); err != nil {
return fmt.Errorf("new wal reader: %w", err)
}
} else {
// If we cannot verify the previous frame
var pfmError *PrevFrameMismatchError
if rd, err = NewWALReaderWithOffset(ctx, walFile, info.offset, info.salt1, info.salt2, db.Logger); errors.As(err, &pfmError) {
db.Logger.Log(ctx, internal.LevelTrace, "prev frame mismatch, snapshotting", "err", pfmError.Err)
info.offset = WALHeaderSize
if rd, err = NewWALReader(walFile, db.Logger); err != nil {
return fmt.Errorf("new wal reader, after reset")
}
} else if err != nil {
return fmt.Errorf("new wal reader with offset: %w", err)
}
}
// Build a mapping of changed page numbers and their latest content.
pageMap, maxOffset, walCommit, err := rd.PageMap(ctx)
if err != nil {
return fmt.Errorf("page map: %w", err)
}
if walCommit > 0 {
commit = walCommit
}
var sz int64
if maxOffset > 0 {
sz = maxOffset - info.offset
}
assert(sz >= 0, fmt.Sprintf("wal size must be positive: sz=%d, maxOffset=%d, info.offset=%d", sz, maxOffset, info.offset))
// Exit if we have no new WAL pages and we aren't snapshotting.
if !info.snapshotting && sz == 0 {
db.Logger.Log(ctx, internal.LevelTrace, "sync: skip", "reason", "no new wal pages")
return nil
}
tmpFilename := filename + ".tmp"
if err := internal.MkdirAll(filepath.Dir(tmpFilename), db.dirInfo); err != nil {
return err
}
ltxFile, err := os.OpenFile(tmpFilename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
if err != nil {
return fmt.Errorf("open temp ltx file: %w", err)
}
defer func() { _ = os.Remove(tmpFilename) }()
defer func() { _ = ltxFile.Close() }()
uid, gid := internal.Fileinfo(db.fileInfo)
_ = os.Chown(tmpFilename, uid, gid)
db.Logger.Log(ctx, internal.LevelTrace, "encode header",
"txid", txID.String(),
"commit", commit,
"walOffset", info.offset,
"walSize", sz,
"salt1", rd.salt1,
"salt2", rd.salt2)
timestamp := time.Now()
enc, err := ltx.NewEncoder(ltxFile)
if err != nil {
return fmt.Errorf("new ltx encoder: %w", err)
}
if err := enc.EncodeHeader(ltx.Header{
Version: ltx.Version,
Flags: ltx.HeaderFlagNoChecksum,
PageSize: uint32(db.pageSize),
Commit: commit,
MinTXID: txID,
MaxTXID: txID,
Timestamp: timestamp.UnixMilli(),
WALOffset: info.offset,
WALSize: sz,
WALSalt1: rd.salt1,
WALSalt2: rd.salt2,
}); err != nil {
return fmt.Errorf("encode ltx header: %w", err)
}
// If we need a full snapshot, then copy from the database & WAL.
// Otherwise, just copy incrementally from the WAL.
if info.snapshotting {
if err := db.writeLTXFromDB(ctx, enc, walFile, commit, pageMap); err != nil {
return fmt.Errorf("write ltx from db: %w", err)
}
} else {
if err := db.writeLTXFromWAL(ctx, enc, walFile, pageMap); err != nil {
return fmt.Errorf("write ltx from db: %w", err)
}
}
// Encode final trailer to the end of the LTX file.
if err := enc.Close(); err != nil {
return fmt.Errorf("close ltx encoder: %w", err)
}
// Sync & close LTX file.
if err := ltxFile.Sync(); err != nil {
return fmt.Errorf("sync ltx file: %w", err)
}
if err := ltxFile.Close(); err != nil {
return fmt.Errorf("close ltx file: %w", err)
}
// Atomically rename file to final path.
if err := os.Rename(tmpFilename, filename); err != nil {
db.maxLTXFileInfos.Lock()
delete(db.maxLTXFileInfos.m, 0) // clear cache if in unknown state
db.maxLTXFileInfos.Unlock()
return fmt.Errorf("rename ltx file: %w", err)
}
// Update file info cache for L0.
db.maxLTXFileInfos.Lock()
db.maxLTXFileInfos.m[0] = &ltx.FileInfo{
Level: 0,
MinTXID: txID,
MaxTXID: txID,
CreatedAt: time.Now(),
Size: enc.N(),
}
db.maxLTXFileInfos.Unlock()
db.Logger.Debug("db sync", "status", "ok")
return nil
}
func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.File, commit uint32, pageMap map[uint32]int64) error {
lockPgno := ltx.LockPgno(uint32(db.pageSize))
data := make([]byte, db.pageSize)
for pgno := uint32(1); pgno <= commit; pgno++ {
if pgno == lockPgno {
continue
}
// Check if the caller has canceled during processing.
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
// If page exists in the WAL, read from there.
if offset, ok := pageMap[pgno]; ok {
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "db+wal")
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
return fmt.Errorf("read page %d @ %d: %w", pgno, offset, err)
} else if n != len(data) {
return fmt.Errorf("short read page %d @ %d", pgno, offset)
}
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, data); err != nil {
return fmt.Errorf("encode ltx frame (pgno=%d): %w", pgno, err)
}
continue
}
offset := int64(pgno-1) * int64(db.pageSize)
db.Logger.Log(ctx, internal.LevelTrace, "encode page from database", "offset", offset, "pgno", pgno)
// Otherwise read directly from the database file.
if _, err := db.f.ReadAt(data, offset); err != nil {
return fmt.Errorf("read database page %d: %w", pgno, err)
}
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, data); err != nil {
return fmt.Errorf("encode ltx frame (pgno=%d): %w", pgno, err)
}
}
return nil
}
func (db *DB) writeLTXFromWAL(ctx context.Context, enc *ltx.Encoder, walFile *os.File, pageMap map[uint32]int64) error {
// Create an ordered list of page numbers since the LTX encoder requires it.
pgnos := make([]uint32, 0, len(pageMap))
for pgno := range pageMap {
pgnos = append(pgnos, pgno)
}
slices.Sort(pgnos)
data := make([]byte, db.pageSize)
for _, pgno := range pgnos {
offset := pageMap[pgno]
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "walonly")
// Read source page using page map.
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
return fmt.Errorf("read page %d @ %d: %w", pgno, offset, err)
} else if n != len(data) {
return fmt.Errorf("short read page %d @ %d", pgno, offset)
}
// Write page to LTX encoder.
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, data); err != nil {
return fmt.Errorf("encode ltx frame (pgno=%d): %w", pgno, err)
}
}
return nil
}
// Checkpoint performs a checkpoint on the WAL file.
func (db *DB) Checkpoint(ctx context.Context, mode string) (err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.checkpoint(ctx, mode)
}
// checkpoint performs a checkpoint on the WAL file and initializes a
// new shadow WAL file.
func (db *DB) checkpoint(ctx context.Context, mode string) error {
// Try getting a checkpoint lock, will fail during snapshots.
if !db.chkMu.TryLock() {
return nil
}
defer db.chkMu.Unlock()
// Read WAL header before checkpoint to check if it has been restarted.
hdr, err := readWALHeader(db.WALPath())
if err != nil {
return err
}
// Copy end of WAL before checkpoint to copy as much as possible.
if err := db.verifyAndSync(ctx, true); err != nil {
return fmt.Errorf("cannot copy wal before checkpoint: %w", err)
}
// Execute checkpoint and immediately issue a write to the WAL to ensure
// a new page is written.
if err := db.execCheckpoint(ctx, mode); err != nil {
return err
} else if _, err = db.db.ExecContext(ctx, `INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
return err
}
// If WAL hasn't been restarted, exit.
if other, err := readWALHeader(db.WALPath()); err != nil {
return err
} else if bytes.Equal(hdr, other) {
return nil
}
// Start a transaction. This will be promoted immediately after.
tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() { _ = rollback(tx) }()
// Insert into the lock table to promote to a write tx. The lock table
// insert will never actually occur because our tx will be rolled back,
// however, it will ensure our tx grabs the write lock. Unfortunately,
// we can't call "BEGIN IMMEDIATE" as we are already in a transaction.
if _, err := tx.ExecContext(ctx, `INSERT INTO _litestream_lock (id) VALUES (1);`); err != nil {
return fmt.Errorf("_litestream_lock: %w", err)
}
// Copy anything that may have occurred after the checkpoint.
if err := db.verifyAndSync(ctx, true); err != nil {
return fmt.Errorf("cannot copy wal after checkpoint: %w", err)
}
// Release write lock before exiting.
if err := tx.Rollback(); err != nil {
return fmt.Errorf("rollback post-checkpoint tx: %w", err)
}
return nil
}
func (db *DB) execCheckpoint(ctx context.Context, mode string) (err error) {
// Ignore if there is no underlying database.
if db.db == nil {
return nil
}
// Track checkpoint metrics.
t := time.Now()
defer func() {
labels := prometheus.Labels{"mode": mode}
db.checkpointNCounterVec.With(labels).Inc()
if err != nil {
db.checkpointErrorNCounterVec.With(labels).Inc()
}
db.checkpointSecondsCounterVec.With(labels).Add(float64(time.Since(t).Seconds()))
}()
// Ensure the read lock has been removed before issuing a checkpoint.
// We defer the re-acquire to ensure it occurs even on an early return.
if err := db.releaseReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
}
defer func() { _ = db.acquireReadLock(ctx) }()
// A non-forced checkpoint is issued as "PASSIVE". This will only checkpoint
// if there are not pending transactions. A forced checkpoint ("RESTART")
// will wait for pending transactions to end & block new transactions before
// forcing the checkpoint and restarting the WAL.
//
// See: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
rawsql := `PRAGMA wal_checkpoint(` + mode + `);`
var row [3]int
if err := db.db.QueryRowContext(ctx, rawsql).Scan(&row[0], &row[1], &row[2]); err != nil {
return err
}
db.Logger.Debug("checkpoint", "mode", mode, "result", fmt.Sprintf("%d,%d,%d", row[0], row[1], row[2]))
// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(ctx); err != nil {
return fmt.Errorf("reacquire read lock: %w", err)
}
return nil
}
// SnapshotReader returns the current position of the database & a reader that contains a full database snapshot.
func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
if db.PageSize() == 0 {
return ltx.Pos{}, nil, fmt.Errorf("page size not initialized yet")
}
pos, err := db.Pos()
if err != nil {
return pos, nil, fmt.Errorf("pos: %w", err)
}
db.Logger.Debug("snapshot", "txid", pos.TXID.String())
// Prevent internal checkpoints during sync.
db.chkMu.RLock()
defer db.chkMu.RUnlock()
// TODO(ltx): Read database size from database header.
fi, err := db.f.Stat()
if err != nil {
return pos, nil, err
}
commit := uint32(fi.Size() / int64(db.pageSize))
// Execute encoding in a separate goroutine so the caller can initialize before reading.
pr, pw := io.Pipe()
go func() {
walFile, err := os.Open(db.WALPath())
if err != nil {
pw.CloseWithError(err)
return
}
defer walFile.Close()
rd, err := NewWALReader(walFile, db.Logger)
if err != nil {
pw.CloseWithError(fmt.Errorf("new wal reader: %w", err))
return
}
// Build a mapping of changed page numbers and their latest content.
pageMap, maxOffset, walCommit, err := rd.PageMap(ctx)
if err != nil {
pw.CloseWithError(fmt.Errorf("page map: %w", err))
return
}
if walCommit > 0 {
commit = walCommit
}
var sz int64
if maxOffset > 0 {
sz = maxOffset - rd.Offset()
}
db.Logger.Debug("encode snapshot header",
"txid", pos.TXID.String(),
"commit", commit,
"walOffset", rd.Offset(),
"walSize", sz,
"salt1", rd.salt1,
"salt2", rd.salt2)
enc, err := ltx.NewEncoder(pw)
if err != nil {
pw.CloseWithError(fmt.Errorf("new ltx encoder: %w", err))
return
}
if err := enc.EncodeHeader(ltx.Header{
Version: ltx.Version,
Flags: ltx.HeaderFlagNoChecksum,
PageSize: uint32(db.pageSize),
Commit: commit,
MinTXID: 1,
MaxTXID: pos.TXID,
Timestamp: time.Now().UnixMilli(),
WALOffset: rd.Offset(),
WALSize: sz,
WALSalt1: rd.salt1,
WALSalt2: rd.salt2,
}); err != nil {
pw.CloseWithError(fmt.Errorf("encode ltx snapshot header: %w", err))
return
}
if err := db.writeLTXFromDB(ctx, enc, walFile, commit, pageMap); err != nil {
pw.CloseWithError(fmt.Errorf("write snapshot ltx: %w", err))
return
}
if err := enc.Close(); err != nil {
pw.CloseWithError(fmt.Errorf("close ltx snapshot encoder: %w", err))
return
}
_ = pw.Close()
}()
return pos, pr, nil
}
// Compact performs a compaction of the LTX file at the previous level into dstLevel.
// Returns metadata for the newly written compaction file. Returns ErrNoCompaction
// if no new files are available to be compacted.
func (db *DB) Compact(ctx context.Context, dstLevel int) (*ltx.FileInfo, error) {
srcLevel := dstLevel - 1
prevMaxInfo, err := db.Replica.MaxLTXFileInfo(ctx, dstLevel)
if err != nil {
return nil, fmt.Errorf("cannot determine max ltx file for destination level: %w", err)
}
seekTXID := prevMaxInfo.MaxTXID + 1
// Collect files after last compaction.
itr, err := db.Replica.Client.LTXFiles(ctx, srcLevel, seekTXID)
if err != nil {
return nil, fmt.Errorf("source ltx files after %s: %w", seekTXID, err)
}
defer itr.Close()
// Ensure all readers are closed by the end, even if an error occurs.
var rdrs []io.Reader
defer func() {
for _, rd := range rdrs {
if closer, ok := rd.(io.Closer); ok {
_ = closer.Close()
}
}
}()
// Build a list of input files to compact from.
var minTXID, maxTXID ltx.TXID
for itr.Next() {
info := itr.Item()
// Track TXID bounds of all files being compacted.
if minTXID == 0 || info.MinTXID < minTXID {
minTXID = info.MinTXID
}
if maxTXID == 0 || info.MaxTXID > maxTXID {
maxTXID = info.MaxTXID
}
f, err := db.Replica.Client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, 0)
if err != nil {
return nil, fmt.Errorf("open ltx file: %w", err)
}
rdrs = append(rdrs, f)
}
if len(rdrs) == 0 {
return nil, ErrNoCompaction
}
// Stream compaction to destination in level.
pr, pw := io.Pipe()
go func() {
c, err := ltx.NewCompactor(pw, rdrs)
if err != nil {
pw.CloseWithError(fmt.Errorf("new ltx compactor: %w", err))
return
}
c.HeaderFlags = ltx.HeaderFlagNoChecksum
_ = pw.CloseWithError(c.Compact(ctx))
}()
info, err := db.Replica.Client.WriteLTXFile(ctx, dstLevel, minTXID, maxTXID, pr)
if err != nil {
return nil, fmt.Errorf("write ltx file: %w", err)
}
// Cache last metadata for the level.
db.maxLTXFileInfos.Lock()
db.maxLTXFileInfos.m[dstLevel] = info
db.maxLTXFileInfos.Unlock()
// If this is L1, clean up L0 files that are below the minTXID.
if dstLevel == 1 {
if err := db.EnforceRetentionByTXID(ctx, 0, maxTXID); err != nil {
// Don't log context cancellation errors during shutdown
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
db.Logger.Error("enforce L0 retention", "error", err)
}
}
}
return info, nil
}
// SnapshotDB writes a snapshot to the replica for the current position of the database.
func (db *DB) Snapshot(ctx context.Context) (*ltx.FileInfo, error) {
pos, r, err := db.SnapshotReader(ctx)
if err != nil {
return nil, err
}
info, err := db.Replica.Client.WriteLTXFile(ctx, SnapshotLevel, 1, pos.TXID, r)
if err != nil {
return info, err
}
db.maxLTXFileInfos.Lock()
db.maxLTXFileInfos.m[SnapshotLevel] = info
db.maxLTXFileInfos.Unlock()
return info, nil
}
// EnforceSnapshotRetention enforces retention of the snapshot level in the database by timestamp.
func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) (minSnapshotTXID ltx.TXID, err error) {
db.Logger.Debug("enforcing snapshot retention", "timestamp", timestamp)
itr, err := db.Replica.Client.LTXFiles(ctx, SnapshotLevel, 0)
if err != nil {
return 0, fmt.Errorf("fetch ltx files: %w", err)
}
defer itr.Close()
var deleted []*ltx.FileInfo
var lastInfo *ltx.FileInfo
for itr.Next() {
info := itr.Item()
lastInfo = info
// If this snapshot is before the retention timestamp, mark it for deletion.
if info.CreatedAt.Before(timestamp) {
deleted = append(deleted, info)
continue
}
// Track the lowest snapshot TXID so we can enforce retention in lower levels.
// This is only tracked for snapshots not marked for deletion.
if minSnapshotTXID == 0 || info.MaxTXID < minSnapshotTXID {
minSnapshotTXID = info.MaxTXID
}
}
// If this is the snapshot level, we need to ensure that at least one snapshot exists.
if len(deleted) > 0 && deleted[len(deleted)-1] == lastInfo {
deleted = deleted[:len(deleted)-1]
}
// Remove all files marked for deletion.
for _, info := range deleted {
db.Logger.Info("deleting ltx file", "level", SnapshotLevel, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
}
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
return 0, fmt.Errorf("remove ltx files: %w", err)
}
return minSnapshotTXID, nil
}
// EnforceRetentionByTXID enforces retention so that any LTX files below
// the target TXID are deleted. Always keep at least one file.
func (db *DB) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TXID) (err error) {
db.Logger.Debug("enforcing retention", "level", level, "txid", txID)
itr, err := db.Replica.Client.LTXFiles(ctx, level, 0)
if err != nil {
return fmt.Errorf("fetch ltx files: %w", err)
}
defer itr.Close()
var deleted []*ltx.FileInfo
var lastInfo *ltx.FileInfo
for itr.Next() {
info := itr.Item()
lastInfo = info
// If this file's maxTXID is below the target TXID, mark it for deletion.
if info.MaxTXID < txID {
deleted = append(deleted, info)
continue
}
}
// Ensure we don't delete the last file.
if len(deleted) > 0 && deleted[len(deleted)-1] == lastInfo {
deleted = deleted[:len(deleted)-1]
}
// Remove all files marked for deletion.
for _, info := range deleted {
db.Logger.Info("deleting ltx file", "level", level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
}
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
return fmt.Errorf("remove ltx files: %w", err)
}
return nil
}
// monitor runs in a separate goroutine and monitors the database & WAL.
func (db *DB) monitor() {
ticker := time.NewTicker(db.MonitorInterval)
defer ticker.Stop()
for {
// Wait for ticker or context close.
select {
case <-db.ctx.Done():
return
case <-ticker.C:
}
// Sync the database to the shadow WAL.
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
db.Logger.Error("sync error", "error", err)
}
}
}
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
//
// This function obtains a read lock so it prevents syncs from occurring until
// the operation is complete. The database will still be usable but it will be
// unable to checkpoint during this time.
//
// If dst is set, the database file is copied to that location before checksum.
func (db *DB) CRC64(ctx context.Context) (uint64, ltx.Pos, error) {
db.mu.Lock()
defer db.mu.Unlock()
if err := db.init(ctx); err != nil {
return 0, ltx.Pos{}, err
} else if db.db == nil {
return 0, ltx.Pos{}, os.ErrNotExist
}
// Force a RESTART checkpoint to ensure the database is at the start of the WAL.
if err := db.checkpoint(ctx, CheckpointModeRestart); err != nil {
return 0, ltx.Pos{}, err
}
// Obtain current position. Clear the offset since we are only reading the
// DB and not applying the current WAL.
pos, err := db.Pos()
if err != nil {
return 0, pos, err
}
// Seek to the beginning of the db file descriptor and checksum whole file.
h := crc64.New(crc64.MakeTable(crc64.ISO))
if _, err := db.f.Seek(0, io.SeekStart); err != nil {
return 0, pos, err
} else if _, err := io.Copy(h, db.f); err != nil {
return 0, pos, err
}
return h.Sum64(), pos, nil
}
// MaxLTXFileInfo returns the metadata for the last LTX file in a level.
// If cached, it will returned the local copy. Otherwise, it fetches from the replica.
func (db *DB) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error) {
db.maxLTXFileInfos.Lock()
defer db.maxLTXFileInfos.Unlock()
info, ok := db.maxLTXFileInfos.m[level]
if ok {
return *info, nil
}
remoteInfo, err := db.Replica.MaxLTXFileInfo(ctx, level)
if err != nil {
return ltx.FileInfo{}, fmt.Errorf("cannot determine L%d max ltx file for %q: %w", level, db.Path(), err)
}
db.maxLTXFileInfos.m[level] = &remoteInfo
return remoteInfo, nil
}
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8
// RestoreOptions represents options for DB.Restore().
type RestoreOptions struct {
// Target path to restore into.
// If blank, the original DB path is used.
OutputPath string
// Specific transaction to restore to.
// If zero, TXID is ignored.
TXID ltx.TXID
// Point-in-time to restore database.
// If zero, database restore to most recent state available.
Timestamp time.Time
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
}
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
func NewRestoreOptions() RestoreOptions {
return RestoreOptions{
Parallelism: DefaultRestoreParallelism,
}
}
// Database metrics.
var (
dbSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "litestream_db_size",
Help: "The current size of the real DB",
}, []string{"db"})
walSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "litestream_wal_size",
Help: "The current size of the real WAL",
}, []string{"db"})
totalWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_total_wal_bytes",
Help: "Total number of bytes written to shadow WAL",
}, []string{"db"})
txIDIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "litestream_txid",
Help: "The current transaction ID",
}, []string{"db"})
syncNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_sync_count",
Help: "Number of sync operations performed",
}, []string{"db"})
syncErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_sync_error_count",
Help: "Number of sync errors that have occurred",
}, []string{"db"})
syncSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_sync_seconds",
Help: "Time spent syncing shadow WAL, in seconds",
}, []string{"db"})
checkpointNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_checkpoint_count",
Help: "Number of checkpoint operations performed",
}, []string{"db", "mode"})
checkpointErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_checkpoint_error_count",
Help: "Number of checkpoint errors that have occurred",
}, []string{"db", "mode"})
checkpointSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "litestream_checkpoint_seconds",
Help: "Time spent checkpointing WAL, in seconds",
}, []string{"db", "mode"})
)
package litestream_test
import (
"context"
"database/sql"
"io"
"log/slog"
"os"
"path/filepath"
"testing"
"time"
_ "modernc.org/sqlite"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/file"
)
// openTestDB initializes a Litestream DB with a file replica rooted at replicaPath.
// A corresponding *sql.DB (configured for WAL) is returned for issuing SQL writes.
func openTestDB(tb testing.TB, dbPath, replicaPath string) (*litestream.DB, *sql.DB) {
tb.Helper()
if err := os.MkdirAll(filepath.Dir(dbPath), 0o755); err != nil {
tb.Fatalf("mkdir %s: %v", filepath.Dir(dbPath), err)
}
db := litestream.NewDB(dbPath)
db.Logger = slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
db.MonitorInterval = 0
db.Replica = litestream.NewReplica(db)
client := file.NewReplicaClient(replicaPath)
db.Replica.Client = client
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
tb.Fatalf("open litestream db: %v", err)
}
client.Replica = db.Replica
sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
tb.Fatalf("open sql db: %v", err)
}
if _, err := sqldb.ExecContext(context.Background(), `PRAGMA journal_mode = wal;`); err != nil {
tb.Fatalf("set journal_mode=wal: %v", err)
}
return db, sqldb
}
// TestRestoreFailsAfterFullCheckpointWhileDown codifies bug #752 by asserting
// that the current restore path fails once the real WAL is truncated while
// Litestream is offline. The test is skipped so it can serve as a focused
// regression harness until the bug is fixed.
func TestRestoreFailsAfterFullCheckpointWhileDown(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
dbPath := filepath.Join(dir, "db.sqlite")
replicaPath := filepath.Join(dir, "replica")
db, sqldb := openTestDB(t, dbPath, replicaPath)
defer func() { _ = db.Close(ctx) }()
defer func() { _ = sqldb.Close() }()
if _, err := sqldb.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, data BLOB);`); err != nil {
t.Fatalf("create table: %v", err)
}
for i := 0; i < 128; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t(data) VALUES (randomblob(1024));`); err != nil {
t.Fatalf("prime insert %d: %v", i, err)
}
}
if err := db.Sync(ctx); err != nil {
t.Fatalf("initial sync: %v", err)
}
if err := db.Replica.Sync(ctx); err != nil {
t.Fatalf("initial replica sync: %v", err)
}
if err := db.Close(ctx); err != nil {
t.Fatalf("close before downtime: %v", err)
}
for i := 0; i < 64; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t(data) VALUES (randomblob(1024));`); err != nil {
t.Fatalf("post-downtime insert %d: %v", i, err)
}
}
var wantRows int
if err := sqldb.QueryRowContext(ctx, `SELECT COUNT(*) FROM t;`).Scan(&wantRows); err != nil {
t.Fatalf("count source rows: %v", err)
}
if _, err := sqldb.ExecContext(ctx, `PRAGMA wal_checkpoint(FULL);`); err != nil {
t.Fatalf("checkpoint FULL: %v", err)
}
if err := sqldb.Close(); err != nil {
t.Fatalf("close sql db before restart: %v", err)
}
db2, sqldb2 := openTestDB(t, dbPath, replicaPath)
defer func() { _ = db2.Close(ctx) }()
defer func() { _ = sqldb2.Close() }()
if err := db2.Sync(ctx); err != nil {
t.Fatalf("sync after restart: %v", err)
}
if err := db2.Replica.Sync(ctx); err != nil {
t.Fatalf("replica sync after restart: %v", err)
}
if plan, err := litestream.CalcRestorePlan(ctx, db2.Replica.Client, 0, time.Time{}, db2.Logger); err == nil {
for i, info := range plan {
t.Logf("restore plan[%d]: level=%d min=%s max=%s size=%d", i, info.Level, info.MinTXID, info.MaxTXID, info.Size)
}
}
restorePath := filepath.Join(dir, "restore.sqlite")
if err := db2.Replica.Restore(ctx, litestream.RestoreOptions{OutputPath: restorePath}); err != nil {
t.Fatalf("restore returned error: %v", err)
}
restoredDB, err := sql.Open("sqlite", restorePath)
if err != nil {
t.Fatalf("open restored db: %v", err)
}
defer restoredDB.Close()
var gotRows int
if err := restoredDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM t;`).Scan(&gotRows); err != nil {
t.Fatalf("count restored rows: %v", err)
}
if gotRows != wantRows {
t.Fatalf("restored row count mismatch: got %d want %d", gotRows, wantRows)
}
}
// TestRestoreLosesRowsAfterAutoCheckpointWhileDown captures the row-loss mode
// where SQLite's automatic checkpoint runs while Litestream is offline. The
// restored database currently comes back with fewer rows even though restore
// reports success. Skipped until bug #752 is addressed.
func TestRestoreLosesRowsAfterAutoCheckpointWhileDown(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
dbPath := filepath.Join(dir, "db.sqlite")
replicaPath := filepath.Join(dir, "replica")
db, sqldb := openTestDB(t, dbPath, replicaPath)
defer func() { _ = db.Close(ctx) }()
defer func() { _ = sqldb.Close() }()
if _, err := sqldb.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, data BLOB);`); err != nil {
t.Fatalf("create table: %v", err)
}
for i := 0; i < 64; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t(data) VALUES (randomblob(2048));`); err != nil {
t.Fatalf("prime insert %d: %v", i, err)
}
}
if err := db.Sync(ctx); err != nil {
t.Fatalf("initial sync: %v", err)
}
if err := db.Replica.Sync(ctx); err != nil {
t.Fatalf("initial replica sync: %v", err)
}
if _, err := sqldb.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 1;`); err != nil {
t.Fatalf("set wal_autocheckpoint: %v", err)
}
if err := db.Close(ctx); err != nil {
t.Fatalf("close before downtime: %v", err)
}
for i := 0; i < 512; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t(data) VALUES (randomblob(4096));`); err != nil {
t.Fatalf("downtime insert %d: %v", i, err)
}
}
time.Sleep(50 * time.Millisecond)
for i := 0; i < 128; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t(data) VALUES (randomblob(512));`); err != nil {
t.Fatalf("post-checkpoint insert %d: %v", i, err)
}
}
var wantRows int
if err := sqldb.QueryRowContext(ctx, `SELECT COUNT(*) FROM t;`).Scan(&wantRows); err != nil {
t.Fatalf("count source rows: %v", err)
}
if err := sqldb.Close(); err != nil {
t.Fatalf("close sql db before restart: %v", err)
}
db2, sqldb2 := openTestDB(t, dbPath, replicaPath)
defer func() { _ = db2.Close(ctx) }()
defer func() { _ = sqldb2.Close() }()
if err := db2.Sync(ctx); err != nil {
t.Fatalf("sync after restart: %v", err)
}
if err := db2.Replica.Sync(ctx); err != nil {
t.Fatalf("replica sync after restart: %v", err)
}
if plan, err := litestream.CalcRestorePlan(ctx, db2.Replica.Client, 0, time.Time{}, db2.Logger); err == nil {
for i, info := range plan {
t.Logf("restore plan[%d]: level=%d min=%s max=%s size=%d", i, info.Level, info.MinTXID, info.MaxTXID, info.Size)
}
}
restorePath := filepath.Join(dir, "restore.sqlite")
if err := db2.Replica.Restore(ctx, litestream.RestoreOptions{OutputPath: restorePath}); err != nil {
t.Fatalf("restore returned error: %v", err)
}
restoredDB, err := sql.Open("sqlite", restorePath)
if err != nil {
t.Fatalf("open restored db: %v", err)
}
defer restoredDB.Close()
var gotRows int
if err := restoredDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM t;`).Scan(&gotRows); err != nil {
t.Fatalf("count restored rows: %v", err)
}
if gotRows != wantRows {
t.Fatalf("restored row count mismatch: got %d want %d", gotRows, wantRows)
}
}
============================================
Litestream v0.5.0 Critical Bug Reproduction
============================================
This demonstrates a data loss scenario where restore fails after:
1. Litestream is killed (simulating crash)
2. Writes continue and a checkpoint occurs
3. Litestream is restarted
[SETUP] Cleaning up previous test files...
Using local build: ./bin/litestream
Using local litestream-test: ./bin/litestream-test
Versions:
(development build)
[STEP 1] Creating test database (50MB)...
time=2025-09-23T10:19:16.683-05:00 level=INFO msg="Starting database population" db=/tmp/critical-bug-test.db target_size=50MB row_size=1024 batch_size=1000 table_count=2 page_size=4096
time=2025-09-23T10:19:16.688-05:00 level=INFO msg="Populating database" target_bytes=52428800 total_rows=51200 rows_per_table=25600
time=2025-09-23T10:19:17.014-05:00 level=INFO msg=Progress table=test_table_0 current_size_mb=35 progress_percent=70.8
time=2025-09-23T10:19:17.210-05:00 level=INFO msg=Progress table=test_table_1 current_size_mb=68 progress_percent=136.6
time=2025-09-23T10:19:17.210-05:00 level=INFO msg="Population complete" duration=521.989208ms final_size_mb=68 throughput_mb_per_sec=130.83
time=2025-09-23T10:19:17.217-05:00 level=INFO msg="Database population complete" db=/tmp/critical-bug-test.db
✓ Database created: 67M
[STEP 2] Starting Litestream replication...
✓ Litestream running (PID: 85775)
[STEP 3] Starting continuous writes...
✓ Write load started (PID: 85953)
[STEP 4] Running normally for 20 seconds...
✓ Rows written before interruption: 1568
[STEP 5] Killing Litestream (simulating crash)...
✓ Litestream killed
[STEP 6] Continuing writes for 15 seconds (Litestream is down)...
/tmp/litestream-pr748/cmd/litestream-test/scripts/reproduce-critical-bug.sh: line 109: 85775 Killed: 9 ./bin/litestream replicate "$DB" "file://$REPLICA" > /tmp/litestream.log 2>&1
[STEP 7] Executing FULL checkpoint while Litestream is down...
✓ Checkpoint result: 0|552|552
✓ Rows after checkpoint: 2784
[STEP 8] Resuming Litestream...
✓ Litestream restarted (PID: 88603)
[STEP 9] Letting Litestream catch up for 20 seconds...
✓ Writes stopped
✓ Final row count in source database: 4620
[STEP 10] Attempting to restore database...
==========================================
✓ SUCCESS: Restore completed successfully
- Restored row count: 4620
- Integrity check: ok
- Data integrity: ✓ VERIFIED (no data loss)
==========================================
Test artifacts saved in:
- Source database: /tmp/critical-bug-test.db
- Replica files: /tmp/critical-bug-replica/
- Litestream log: /tmp/litestream.log
- Restore output: /tmp/restore-output.log
Test complete.
#!/bin/bash
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
BIN="$ROOT/bin/litestream"
TEST="$ROOT/bin/litestream-test"
SCRIPT="$ROOT/cmd/litestream-test/scripts/reproduce-critical-bug.sh"
LOG="$ROOT/reproduce.log"
rm -f /tmp/critical-bug-test.db*
rm -rf /tmp/critical-bug-replica /tmp/.critical-bug-test.db-litestream
"$SCRIPT" 2>&1 | tee "$LOG"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment