storage

package
v2.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// B+ Tree configuration
	BPTreeOrder       = 4 // Maximum number of children per node
	BPTreeMaxKeys     = BPTreeOrder - 1
	BPTreeMinKeys     = (BPTreeOrder+1)/2 - 1
	BPTreeLeafOrder   = 4
	BPTreeMaxLeafKeys = BPTreeLeafOrder - 1
)
View Source
const (
	// DefaultSlotSize is the default size for each log slot (16KB)
	DefaultSlotSize = 16 * 1024

	// DefaultLogBufferCapacity is the number of slots (must be power of 2)
	DefaultLogBufferCapacity = 256 // 256 * 16KB = 4MB total

	// MinLogBufferCapacity is the minimum buffer capacity
	MinLogBufferCapacity = 16

	// MaxLogBufferCapacity is the maximum buffer capacity
	MaxLogBufferCapacity = 16384 // 16K slots * 16KB = 256MB
)
View Source
const (
	DefaultMaxSegmentSize = 64 * 1024 * 1024 // 64MB
	MaxRecyclePoolSize    = 10               // Keep max 10 recycled segments
)
View Source
const (
	// Initial file size: 1GB (256K pages * 4KB)
	InitialFileSize = 1024 * 1024 * 1024
	// Grow by 256MB when we run out of space
	FileGrowSize = 256 * 1024 * 1024
)
View Source
const (
	PageHeaderSize = 8 // 4 * uint16 = 8 bytes
	SlotEntrySize  = 4 // 2 * uint16 = 4 bytes
)
View Source
const (
	CompressedPageMagic     = 0xC0DE
	CompressedHeaderSize    = 12
	MinCompressionThreshold = 100 // Minimum bytes saved to use compression
)
View Source
const DefaultLogBufferSize = 4096 // 4KB buffer
View Source
const (
	PageSize = 4096 // 4KB pages
)
View Source
const (
	ParallelLogBufferSize = 16384 // 16KB buffer
)

Variables

This section is empty.

Functions

func CompressPageTransparent

func CompressPageTransparent(data []byte, compressionType CompressionType) ([]byte, error)

CompressPageTransparent compresses a page and returns serialized form (compression + serialization).

func DecompressPage

func DecompressPage(cp *CompressedPage) ([]byte, error)

DecompressPage decompresses a compressed page

func DecompressPageTransparent

func DecompressPageTransparent(data []byte) ([]byte, error)

DecompressPageTransparent detects if page is compressed and decompresses if needed Returns original data if not compressed

func IsCompressedPage

func IsCompressedPage(data []byte) bool

IsCompressedPage checks if the page data represents a compressed page

func IsErrorCode

func IsErrorCode(err error, code ErrorCode) bool

IsErrorCode checks if an error has a specific error code

func ParseSegmentID

func ParseSegmentID(filename string) (uint32, error)

Helper function to parse segment ID from filename

func SerializeCompressedPage

func SerializeCompressedPage(cp *CompressedPage) ([]byte, error)

SerializeCompressedPage serializes a compressed page to bytes

Types

type ARCReplacer

type ARCReplacer struct {
	// contains filtered or unexported fields
}

ARCReplacer implements the Adaptive Replacement Cache algorithm ARC maintains four LRU lists: - T1: Recent cache hits (recency) - T2: Frequent cache hits (frequency) - B1: Ghost entries evicted from T1 (recent evictions) - B2: Ghost entries evicted from T2 (frequent evictions)

The algorithm adaptively adjusts the target size (p) between T1 and T2 based on cache hit patterns to optimize for the current workload.

func NewARCReplacer

func NewARCReplacer(capacity int) *ARCReplacer

NewARCReplacer creates a new ARC cache replacer

func (*ARCReplacer) GetStats

func (arc *ARCReplacer) GetStats() map[string]int

GetStats returns ARC-specific statistics

func (*ARCReplacer) Pin

func (arc *ARCReplacer) Pin(frameID uint32)

Pin marks a page as in-use

func (*ARCReplacer) Size

func (arc *ARCReplacer) Size() uint32

Size returns the number of evictable pages

func (*ARCReplacer) Unpin

func (arc *ARCReplacer) Unpin(frameID uint32)

Unpin marks a page as evictable and records the access

func (*ARCReplacer) Victim

func (arc *ARCReplacer) Victim() (uint32, bool)

Victim selects a page for eviction

type AccessPattern

type AccessPattern struct {
	StartPageID    uint32
	LastPageID     uint32
	Stride         int32   // Distance between consecutive accesses
	AccessCount    int     // Number of accesses matching this stride
	Confidence     float64 // Confidence score (0.0 - 1.0)
	LastAccessTime time.Time
	History        []uint32 // Recent page accesses for pattern analysis
	HistorySize    int      // Max history size
}

AccessPattern represents a detected access pattern with stride

type AdaptiveFlushConfig

type AdaptiveFlushConfig struct {
	// Target dirty page ratio (0.0 - 1.0)
	TargetDirtyRatio float64

	// Maximum dirty ratio before aggressive flushing (0.0 - 1.0)
	MaxDirtyRatio float64

	// Flush check interval
	CheckInterval time.Duration

	// Minimum pages to flush per interval
	MinFlushPages int

	// Maximum pages to flush per interval
	MaxFlushPages int

	// PID controller gains
	Kp float64 // Proportional gain
	Ki float64 // Integral gain
	Kd float64 // Derivative gain

	// Adaptive parameters
	EnableAdaptive     bool    // Enable adaptive adjustment
	WriteRateThreshold float64 // Pages/sec to trigger adaptive mode
	CheckpointInterval time.Duration
}

AdaptiveFlushConfig contains configuration for adaptive flushing

func DefaultAdaptiveFlushConfig

func DefaultAdaptiveFlushConfig() AdaptiveFlushConfig

DefaultAdaptiveFlushConfig returns default configuration

type AdaptiveFlushStats

type AdaptiveFlushStats struct {
	FlushesIssued  uint64
	PagesFlushed   uint64
	CurrentRate    float64 // Pages per second
	DirtyRatio     float64
	AvgFlushTime   time.Duration
	LastAdjustment time.Time
}

AdaptiveFlushStats contains statistics about adaptive flushing

type AdaptiveFlusher

type AdaptiveFlusher struct {
	// contains filtered or unexported fields
}

func NewAdaptiveFlusher

func NewAdaptiveFlusher(bp FlushableBufferPool, config AdaptiveFlushConfig) *AdaptiveFlusher

NewAdaptiveFlusher creates a new adaptive flusher

func (*AdaptiveFlusher) GetConfig

func (af *AdaptiveFlusher) GetConfig() AdaptiveFlushConfig

GetConfig returns the current configuration

func (*AdaptiveFlusher) GetStats

func (af *AdaptiveFlusher) GetStats() AdaptiveFlushStats

GetStats returns current statistics

func (*AdaptiveFlusher) IsRunning

func (af *AdaptiveFlusher) IsRunning() bool

IsRunning returns whether the flusher is currently running

func (*AdaptiveFlusher) SetMaxDirtyRatio

func (af *AdaptiveFlusher) SetMaxDirtyRatio(ratio float64) error

SetMaxDirtyRatio dynamically adjusts the maximum dirty ratio

func (*AdaptiveFlusher) SetTargetDirtyRatio

func (af *AdaptiveFlusher) SetTargetDirtyRatio(ratio float64) error

SetTargetDirtyRatio dynamically adjusts the target dirty ratio

func (*AdaptiveFlusher) Start

func (af *AdaptiveFlusher) Start() error

Start starts the adaptive flusher background goroutine

func (*AdaptiveFlusher) Stop

func (af *AdaptiveFlusher) Stop() error

Stop stops the adaptive flusher

func (*AdaptiveFlusher) TriggerFlush

func (af *AdaptiveFlusher) TriggerFlush(maxPages int) int

TriggerFlush manually triggers a flush cycle

type AdviceType

type AdviceType int

AdviceType represents memory access advice

const (
	AdviceNormal     AdviceType = 0 // No special treatment
	AdviceRandom     AdviceType = 1 // Random access pattern
	AdviceSequential AdviceType = 2 // Sequential access pattern
	AdviceWillNeed   AdviceType = 3 // Will need these pages soon (prefetch)
	AdviceDontNeed   AdviceType = 4 // Won't need these pages (can evict)
)

type BPTreeEntry

type BPTreeEntry struct {
	Key   int64
	Value int64
}

BPTreeEntry represents a key-value pair in the B+ Tree

type BPTreeIterator

type BPTreeIterator struct {
	// contains filtered or unexported fields
}

BPTreeIterator represents an iterator for B+ Tree

func (*BPTreeIterator) HasNext

func (iter *BPTreeIterator) HasNext() bool

HasNext returns true if there are more elements

func (*BPTreeIterator) Next

func (iter *BPTreeIterator) Next() (int64, int64, error)

Next returns the next key-value pair

type BPTreeNode

type BPTreeNode struct {
	// contains filtered or unexported fields
}

BPTreeNode represents a node in the B+ Tree

func NewInternalNode

func NewInternalNode() *BPTreeNode

NewInternalNode creates a new internal node

func NewLeafNode

func NewLeafNode() *BPTreeNode

NewLeafNode creates a new leaf node

type BPTreeNodeType

type BPTreeNodeType int

BPTreeNodeType represents the type of B+ Tree node

const (
	LeafNode BPTreeNodeType = iota
	InternalNode
)

type BPlusTree

type BPlusTree struct {
	// contains filtered or unexported fields
}

BPlusTree represents a B+ Tree index

func NewBPlusTree

func NewBPlusTree(bpm *BufferPoolManager) (*BPlusTree, error)

NewBPlusTree creates a new B+ Tree

func (*BPlusTree) BulkLoad

func (tree *BPlusTree) BulkLoad(entries []BPTreeEntry) error

BulkLoad efficiently builds a B+ Tree from sorted key-value pairs. Much faster than inserting keys one by one. Requirements: entries must be sorted by key in ascending order.

func (*BPlusTree) Delete

func (tree *BPlusTree) Delete(key int64) error

Delete removes a key from the B+ Tree

func (*BPlusTree) GetParallelScanStats

func (tree *BPlusTree) GetParallelScanStats(startKey, endKey int64, config ParallelScanConfig) (ParallelRangeScanStats, error)

GetParallelScanStats analyzes a range scan and returns statistics

func (*BPlusTree) Insert

func (tree *BPlusTree) Insert(key int64, value int64) error

Insert inserts a key-value pair into the B+ Tree Uses write locks for safe concurrent modifications

func (*BPlusTree) Iterator

func (tree *BPlusTree) Iterator() (*BPTreeIterator, error)

Iterator creates a new iterator for the B+ Tree

func (*BPlusTree) ParallelRangeScan

func (tree *BPlusTree) ParallelRangeScan(startKey, endKey int64, config ParallelScanConfig) (<-chan ScanResult, <-chan error, error)

ParallelRangeScan performs a parallel range scan on the B+ Tree Returns results through a channel, allowing streaming of large result sets

func (*BPlusTree) ParallelRangeScanCount

func (tree *BPlusTree) ParallelRangeScanCount(startKey, endKey int64, config ParallelScanConfig) (int64, error)

ParallelRangeScanCount counts entries in a range using parallel workers. Faster than scan-and-count since we don't return values.

func (*BPlusTree) ParallelRangeScanOrdered

func (tree *BPlusTree) ParallelRangeScanOrdered(startKey, endKey int64, config ParallelScanConfig) ([]BPTreeEntry, error)

ParallelRangeScanOrdered performs a parallel range scan with ordered results. Slower than unordered scan but maintains key order.

func (*BPlusTree) Search

func (tree *BPlusTree) Search(key int64) (int64, bool, error)

Search searches for a key in the B+ Tree and returns its value Uses read locks for high concurrency

type BloomFilter

type BloomFilter struct {
	// contains filtered or unexported fields
}

BloomFilter is a space-efficient probabilistic data structure for membership testing Used at page level to quickly reject negative lookups without reading page data

func DeserializeBloomFilter

func DeserializeBloomFilter(data []byte) (*BloomFilter, error)

DeserializeBloomFilter reconstructs a Bloom filter from serialized bytes

func NewBloomFilter

func NewBloomFilter(config BloomFilterConfig) *BloomFilter

NewBloomFilter creates a new Bloom filter with the given configuration

func NewBloomFilterFromBytes

func NewBloomFilterFromBytes(data []byte, numHashes uint32) *BloomFilter

NewBloomFilterFromBytes reconstructs a Bloom filter from serialized bytes

func (*BloomFilter) Clear

func (bf *BloomFilter) Clear()

Clear resets the Bloom filter to empty state

func (*BloomFilter) Clone

func (bf *BloomFilter) Clone() *BloomFilter

Clone creates a deep copy of the Bloom filter

func (*BloomFilter) EstimateFalsePositiveRate

func (bf *BloomFilter) EstimateFalsePositiveRate() float64

EstimateFalsePositiveRate calculates the current false positive rate Based on: p ≈ (1 - e^(-kn/m))^k where k = num hashes, n = num inserts, m = num bits

func (*BloomFilter) GetBytes

func (bf *BloomFilter) GetBytes() []byte

GetBytes returns the raw byte array (for serialization)

func (*BloomFilter) GetFillRatio

func (bf *BloomFilter) GetFillRatio() float64

GetFillRatio returns the fraction of bits set to 1

func (*BloomFilter) GetNumBits

func (bf *BloomFilter) GetNumBits() uint32

GetNumBits returns the total number of bits in the filter

func (*BloomFilter) GetNumHashes

func (bf *BloomFilter) GetNumHashes() uint32

GetNumHashes returns the number of hash functions used

func (*BloomFilter) GetNumInserts

func (bf *BloomFilter) GetNumInserts() uint32

GetNumInserts returns the number of elements inserted

func (*BloomFilter) Insert

func (bf *BloomFilter) Insert(key []byte)

Insert adds an element to the Bloom filter

func (*BloomFilter) Intersect

func (bf *BloomFilter) Intersect(other *BloomFilter) error

Intersect performs bitwise AND with another Bloom filter Both filters must have the same configuration (numBits, numHashes)

func (*BloomFilter) MayContain

func (bf *BloomFilter) MayContain(key []byte) bool

MayContain checks if an element might be in the set Returns true if element might exist (could be false positive) Returns false if element definitely does not exist (no false negatives)

func (*BloomFilter) Serialize

func (bf *BloomFilter) Serialize() []byte

Serialize converts the Bloom filter to a byte slice for storage Format: [numBits:4][numHashes:4][numInserts:4][bits:variable]

func (*BloomFilter) Union

func (bf *BloomFilter) Union(other *BloomFilter) error

Union merges another Bloom filter into this one (bitwise OR) Both filters must have the same configuration (numBits, numHashes)

type BloomFilterConfig

type BloomFilterConfig struct {
	ExpectedElements  uint32  // Expected number of elements to insert
	FalsePositiveRate float64 // Target false positive rate (e.g., 0.01 = 1%)
}

BloomFilterConfig holds configuration for creating a Bloom filter

func DefaultBloomFilterConfig

func DefaultBloomFilterConfig() BloomFilterConfig

DefaultBloomFilterConfig returns a default configuration suitable for page-level filtering Assumes ~100 keys per page with 1% false positive rate

type BufferPoolManager

type BufferPoolManager struct {
	// contains filtered or unexported fields
}

BufferPoolManager manages a pool of pages in memory

func NewBufferPoolManager

func NewBufferPoolManager(poolSize uint32, diskManager *DiskManager) (*BufferPoolManager, error)

NewBufferPoolManager creates a new buffer pool manager

func NewBufferPoolManagerWithReplacer

func NewBufferPoolManagerWithReplacer(poolSize uint32, diskManager *DiskManager, replacerAlg string) (*BufferPoolManager, error)

NewBufferPoolManagerWithReplacer creates a buffer pool with a specific replacement policy

func (*BufferPoolManager) FetchPage

func (bpm *BufferPoolManager) FetchPage(pageId uint32) (*Page, error)

FetchPage fetches a page from disk if not in buffer pool, or returns existing page

func (*BufferPoolManager) FlushAllPages

func (bpm *BufferPoolManager) FlushAllPages() error

FlushAllPages flushes all dirty pages to disk using batch writes

func (*BufferPoolManager) FlushAllPagesParallel

func (bpm *BufferPoolManager) FlushAllPagesParallel(workers int) error

FlushAllPagesParallel flushes all dirty pages concurrently with configurable parallelism

func (*BufferPoolManager) FlushPage

func (bpm *BufferPoolManager) FlushPage(pageId uint32) error

FlushPage explicitly flushes a page to disk

func (*BufferPoolManager) GetCapacity

func (bpm *BufferPoolManager) GetCapacity() int

GetCapacity returns the total capacity of the buffer pool

func (*BufferPoolManager) GetDirtyPageCount

func (bpm *BufferPoolManager) GetDirtyPageCount() int

GetDirtyPageCount returns the number of dirty pages in the buffer pool

func (*BufferPoolManager) GetDirtyPages

func (bpm *BufferPoolManager) GetDirtyPages(maxPages int) []uint32

GetDirtyPages returns up to maxPages dirty page IDs

func (*BufferPoolManager) GetMetrics

func (bpm *BufferPoolManager) GetMetrics() *Metrics

GetMetrics returns the buffer pool metrics

func (*BufferPoolManager) GetPoolSize

func (bpm *BufferPoolManager) GetPoolSize() uint32

GetPoolSize returns the pool size

func (*BufferPoolManager) IsPageInPool

func (bpm *BufferPoolManager) IsPageInPool(pageID uint32) bool

IsPageInPool checks if a page is already in the buffer pool to avoid redundant prefetches.

func (*BufferPoolManager) NewPage

func (bpm *BufferPoolManager) NewPage() (*Page, error)

NewPage creates a new page, allocates it on disk, and brings it into the buffer pool

func (*BufferPoolManager) SetLogManager

func (bpm *BufferPoolManager) SetLogManager(logManager *LogManager)

SetLogManager sets the log manager for WAL integration

func (*BufferPoolManager) UnpinPage

func (bpm *BufferPoolManager) UnpinPage(pageId uint32, isDirty bool) error

UnpinPage unpins a page and optionally marks it as dirty

type ClockProReplacer

type ClockProReplacer struct {
	// contains filtered or unexported fields
}

ClockProReplacer implements the Clock-Pro cache replacement algorithm Clock-Pro is a scan-resistant algorithm that improves upon Clock by tracking both recency and frequency. It maintains three "hands" that move through a circular list:

1. Hand_cold: Points to the oldest cold page (candidate for eviction) 2. Hand_hot: Points to the oldest hot page (for demoting to cold) 3. Hand_test: Points to the oldest test page (for pruning history)

Pages can be in three states: - Cold: Recently accessed once (on probation) - Hot: Accessed multiple times (protected from eviction) - Test: Ghost entry tracking recent evictions

The algorithm adaptively adjusts the sizes of hot and cold regions based on the workload, making it effective for both recency and frequency patterns.

func NewClockProReplacer

func NewClockProReplacer(capacity int) *ClockProReplacer

NewClockProReplacer creates a new Clock-Pro cache replacer

func (*ClockProReplacer) GetStats

func (cp *ClockProReplacer) GetStats() ClockProStats

GetStats returns statistics about the Clock-Pro replacer

func (*ClockProReplacer) Pin

func (cp *ClockProReplacer) Pin(frameID uint32)

Pin marks a page as in-use

func (*ClockProReplacer) Size

func (cp *ClockProReplacer) Size() int

Size returns the number of evictable pages

func (*ClockProReplacer) Unpin

func (cp *ClockProReplacer) Unpin(frameID uint32)

Unpin marks a page as evictable and records the access

func (*ClockProReplacer) Victim

func (cp *ClockProReplacer) Victim() (uint32, bool)

Victim selects a page for eviction using the Clock-Pro algorithm

type ClockProStats

type ClockProStats struct {
	Capacity   int // Total cache capacity
	HotSize    int // Current hot pages
	HotMax     int // Maximum hot pages (adaptive)
	ColdSize   int // Current cold pages
	ColdMax    int // Maximum cold pages
	TestSize   int // Current test (ghost) entries
	TestMax    int // Maximum test entries
	TotalPages int // Total cached pages (hot + cold)
}

ClockProStats contains statistics about the Clock-Pro replacer

type CompressedLogRecord

type CompressedLogRecord struct {
	LSN     uint64
	PrevLSN uint64
	TxnID   uint64
	Type    LogType
	PageID  uint32

	// Delta compression fields
	DataLength uint16 // Total length of the data region
	ChangeMask []byte // Bitmap indicating which bytes changed
	DeltaData  []byte // Only the changed bytes

	// For non-update operations, store full data
	FullData []byte
}

CompressedLogRecord represents a WAL entry with delta compression. Stores only changed bytes instead of full before/after images (5-10x compression).

func CompressLogRecord

func CompressLogRecord(record *LogRecord) *CompressedLogRecord

CompressLogRecord compresses a standard log record using delta encoding

func DeserializeCompressedLogRecord

func DeserializeCompressedLogRecord(data []byte) (*CompressedLogRecord, error)

DeserializeCompressedLogRecord creates CompressedLogRecord from bytes

func (*CompressedLogRecord) CompressionRatio

func (clr *CompressedLogRecord) CompressionRatio() float64

CompressionRatio returns the compression ratio achieved

func (*CompressedLogRecord) Serialize

func (clr *CompressedLogRecord) Serialize() []byte

Serialize converts CompressedLogRecord to bytes Format: LSN(8) | PrevLSN(8) | TxnID(8) | Type(1) | PageID(4) | DataLength(2) |

MaskLen(2) | ChangeMask | DeltaLen(2) | DeltaData | FullDataLen(2) | FullData

type CompressedPage

type CompressedPage struct {
	CompressionType  CompressionType
	UncompressedSize uint16
	CompressedSize   uint16
	CompressedData   []byte
	OriginalChecksum uint32 // CRC32 of original data
}

CompressedPage represents a compressed page with metadata

func ChooseBestCompression

func ChooseBestCompression(data []byte) (*CompressedPage, error)

ChooseBestCompression tries all algorithms and returns the best one

func CompressPage

func CompressPage(data []byte, compressionType CompressionType) (*CompressedPage, error)

CompressPage compresses a page using the specified algorithm

func DeserializeCompressedPage

func DeserializeCompressedPage(data []byte) (*CompressedPage, error)

DeserializeCompressedPage deserializes a compressed page from bytes

func (*CompressedPage) GetCompressionRatio

func (cp *CompressedPage) GetCompressionRatio() float64

GetCompressionRatio returns the compression ratio (original size / compressed size)

func (*CompressedPage) GetSpaceSavings

func (cp *CompressedPage) GetSpaceSavings() int

GetSpaceSavings returns bytes saved by compression

type CompressionType

type CompressionType uint8

CompressionType represents the compression algorithm used

const (
	CompressionNone   CompressionType = 0
	CompressionLZ4    CompressionType = 1
	CompressionSnappy CompressionType = 2
)

type Config

type Config struct {
	// Buffer Pool Configuration
	BufferPoolSize    uint32 `json:"buffer_pool_size"`   // Number of pages in buffer pool
	CacheReplacer     string `json:"cache_replacer"`     // Cache replacement policy (lru, 2q, arc)
	EnablePrefetching bool   `json:"enable_prefetching"` // Enable sequential prefetching

	// Disk Configuration
	DataDirectory string `json:"data_directory"` // Directory for data files
	PageSize      uint32 `json:"page_size"`      // Page size in bytes (default: 4096)

	// WAL Configuration
	WALDirectory      string `json:"wal_directory"`       // Directory for WAL files
	WALEnabled        bool   `json:"wal_enabled"`         // Whether WAL is enabled
	WALParallel       bool   `json:"wal_parallel"`        // Use parallel WAL for better concurrency
	WALCompression    bool   `json:"wal_compression"`     // Enable WAL compression
	WALCompressionAlg string `json:"wal_compression_alg"` // Compression algorithm (delta, snappy, none)

	// Transaction Configuration
	MaxTransactions uint32 `json:"max_transactions"` // Maximum concurrent transactions

	// Performance Configuration
	EnableMetrics bool   `json:"enable_metrics"` // Whether to collect performance metrics
	LogLevel      string `json:"log_level"`      // Log level (debug, info, warn, error)

	// Recovery Configuration
	AutoRecovery bool `json:"auto_recovery"` // Whether to perform recovery on startup
}

Config holds storage engine configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default configuration

func LoadConfigFromEnv

func LoadConfigFromEnv() *Config

LoadConfigFromEnv loads configuration from environment variables Falls back to default values if environment variables are not set

func LoadConfigFromFile

func LoadConfigFromFile(path string) (*Config, error)

LoadConfigFromFile loads configuration from a JSON file

func (*Config) Clone

func (c *Config) Clone() *Config

Clone creates a deep copy of the configuration

func (*Config) SaveToFile

func (c *Config) SaveToFile(path string) error

SaveToFile saves the configuration to a JSON file

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type DiskManager

type DiskManager struct {
	// contains filtered or unexported fields
}

func NewDiskManager

func NewDiskManager(fileName string) (*DiskManager, error)

NewDiskManager creates a new disk manager that manages pages in a file

func (*DiskManager) AllocatePage

func (dm *DiskManager) AllocatePage() uint32

AllocatePage allocates a new page and returns its page ID

func (*DiskManager) Close

func (dm *DiskManager) Close() error

Close closes the disk manager and its underlying file

func (*DiskManager) ReadPage

func (dm *DiskManager) ReadPage(pageId uint32) ([]byte, error)

ReadPage reads a page from disk given its page ID

func (*DiskManager) WritePage

func (dm *DiskManager) WritePage(pageId uint32, data []byte) error

WritePage writes a page to disk at the specified page ID

func (*DiskManager) WritePagesV

func (dm *DiskManager) WritePagesV(writes []PageWrite) error

WritePagesV writes multiple pages in a single batch operation. More efficient than writing pages one-at-a-time.

type ErrorCode

type ErrorCode int

ErrorCode represents different types of storage errors

const (
	// Generic errors
	ErrCodeUnknown ErrorCode = iota
	ErrCodeInternal

	// Page errors
	ErrCodePageNotFound
	ErrCodePageFull
	ErrCodeInvalidPageID
	ErrCodePageCorrupted

	// Buffer pool errors
	ErrCodeNoFreePages
	ErrCodePagePinned
	ErrCodeInvalidPin

	// Transaction errors
	ErrCodeTxnNotFound
	ErrCodeTxnAlreadyCommitted
	ErrCodeTxnAlreadyAborted
	ErrCodeInvalidTxnState

	// Recovery errors
	ErrCodeRecoveryFailed
	ErrCodeLogCorrupted
	ErrCodeCheckpointFailed

	// B+ Tree errors
	ErrCodeKeyNotFound
	ErrCodeDuplicateKey
	ErrCodeInvalidKey

	// Disk errors
	ErrCodeDiskFull
	ErrCodeDiskReadFailed
	ErrCodeDiskWriteFailed
	ErrCodeFileNotFound
)

func GetErrorCode

func GetErrorCode(err error) ErrorCode

GetErrorCode returns the error code from an error, or ErrCodeUnknown

type FlushableBufferPool

type FlushableBufferPool interface {
	GetDirtyPageCount() int
	GetCapacity() int
	GetDirtyPages(maxPages int) []uint32
	FlushPage(pageID uint32) error
}

FlushableBufferPool is the interface required by adaptive flusher

type GroupCommitManager

type GroupCommitManager struct {
	// contains filtered or unexported fields
}

GroupCommitManager batches multiple transaction commits into a single fsync, reducing I/O overhead. Instead of fsyncing on every commit, commits are grouped and persisted together.

func NewGroupCommitManager

func NewGroupCommitManager(logManager *LogManager, maxBatchSize int, maxBatchDelay time.Duration) *GroupCommitManager

NewGroupCommitManager creates a new group commit manager

func (*GroupCommitManager) Commit

func (gcm *GroupCommitManager) Commit(lsn uint64) error

Commit submits a transaction commit request and waits for it to be persisted

func (*GroupCommitManager) Shutdown

func (gcm *GroupCommitManager) Shutdown()

Shutdown gracefully shuts down the group commit manager

func (*GroupCommitManager) Stats

func (gcm *GroupCommitManager) Stats() GroupCommitStats

Stats returns statistics about group commit performance

type GroupCommitStats

type GroupCommitStats struct {
	TotalCommits     uint64
	TotalBatches     uint64
	TotalFsyncs      uint64
	AverageBatchSize float64
}

GroupCommitStats contains statistics about group commit performance

type Histogram

type Histogram struct {
	// contains filtered or unexported fields
}

Histogram tracks latency distribution with percentile support

func NewHistogram

func NewHistogram(maxSize int) *Histogram

NewHistogram creates a new histogram with a max sample size

func (*Histogram) Count

func (h *Histogram) Count() int

Count returns the number of samples

func (*Histogram) Max

func (h *Histogram) Max() float64

Max returns the maximum latency

func (*Histogram) Mean

func (h *Histogram) Mean() float64

Mean calculates the average latency

func (*Histogram) Min

func (h *Histogram) Min() float64

Min returns the minimum latency

func (*Histogram) Percentile

func (h *Histogram) Percentile(p float64) float64

Percentile calculates the given percentile (0-100)

func (*Histogram) Record

func (h *Histogram) Record(latencyUs float64)

Record adds a latency sample (in microseconds)

func (*Histogram) Reset

func (h *Histogram) Reset()

Reset clears all samples

func (*Histogram) Snapshot

func (h *Histogram) Snapshot() HistogramSnapshot

Snapshot captures current histogram statistics

type HistogramSnapshot

type HistogramSnapshot struct {
	Count int
	Min   float64
	Max   float64
	Mean  float64
	P50   float64 // Median
	P95   float64
	P99   float64
	P999  float64
}

Snapshot returns current percentile statistics

type KeyValue

type KeyValue struct {
	Key   []byte
	Value []byte
}

KeyValue represents a key-value pair

type LRUNode

type LRUNode struct {
	// contains filtered or unexported fields
}

LRUNode represents a node in the LRU list

type LRUReplacer

type LRUReplacer struct {
	// contains filtered or unexported fields
}

LRUReplacer implements LRU (Least Recently Used) replacement policy

func NewLRUReplacer

func NewLRUReplacer(capacity uint32) *LRUReplacer

NewLRUReplacer creates a new LRU replacer

func (*LRUReplacer) Pin

func (lru *LRUReplacer) Pin(frameID uint32)

Pin removes a frame from the LRU replacer Called when a page is pinned (in use, not evictable)

func (*LRUReplacer) Size

func (lru *LRUReplacer) Size() uint32

Size returns the number of evictable frames

func (*LRUReplacer) Unpin

func (lru *LRUReplacer) Unpin(frameID uint32)

Unpin adds a frame to the LRU replacer Called when a page is unpinned (available for eviction)

func (*LRUReplacer) Victim

func (lru *LRUReplacer) Victim() (uint32, bool)

Victim selects a frame to evict using LRU policy Returns the frame ID and true if a victim was found, or 0 and false if no victim available

type LockFreeLogBuffer

type LockFreeLogBuffer struct {
	// contains filtered or unexported fields
}

LockFreeLogBuffer is a lock-free circular buffer for WAL records Uses CAS operations for thread-safe concurrent append without blocking

Design: - Fixed-size circular buffer with power-of-2 capacity - Two atomic counters: head (read position) and tail (write position) - Writers reserve space via atomic tail increment - Readers wait for writers to finish via sequence numbers - Supports concurrent appends with linearizability

Memory Layout: Each slot: [sequence(8) | lsn(8) | size(4) | data(var)]

func NewLockFreeLogBuffer

func NewLockFreeLogBuffer(config LogBufferConfig) (*LockFreeLogBuffer, error)

NewLockFreeLogBuffer creates a new lock-free log buffer

func (*LockFreeLogBuffer) Append

func (lb *LockFreeLogBuffer) Append(lsn uint64, data []byte) (uint64, uint64, error)

Append adds a log record to the buffer using lock-free CAS. Returns (slotIndex, sequence, error).

func (*LockFreeLogBuffer) Available

func (lb *LockFreeLogBuffer) Available() uint64

Available returns the number of records available to read

func (*LockFreeLogBuffer) Deserialize

func (lb *LockFreeLogBuffer) Deserialize(buf []byte) (uint64, []byte, int, error)

Deserialize deserializes a log record Returns (lsn, data, bytesRead, error)

func (*LockFreeLogBuffer) Free

func (lb *LockFreeLogBuffer) Free() uint64

Free returns the number of free slots available for writing

func (*LockFreeLogBuffer) IsEmpty

func (lb *LockFreeLogBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty

func (*LockFreeLogBuffer) IsFull

func (lb *LockFreeLogBuffer) IsFull() bool

IsFull returns true if the buffer is full

func (*LockFreeLogBuffer) Read

func (lb *LockFreeLogBuffer) Read() (uint64, uint64, []byte, error)

Read reads the next available log record Returns (slotIndex, lsn, data, error) Returns nil data when no records available

func (*LockFreeLogBuffer) ReadBatch

func (lb *LockFreeLogBuffer) ReadBatch(maxRecords int) ([]LogBufferRecord, error)

ReadBatch reads multiple records in a batch (up to maxRecords) Returns slice of (lsn, data) tuples

func (*LockFreeLogBuffer) Reset

func (lb *LockFreeLogBuffer) Reset()

Reset clears the buffer (not thread-safe, use only when no concurrent access)

func (*LockFreeLogBuffer) Serialize

func (lb *LockFreeLogBuffer) Serialize(lsn uint64, data []byte) []byte

Serialize serializes a log record for storage Format: [lsn(8) | size(4) | data(var)]

func (*LockFreeLogBuffer) Stats

func (lb *LockFreeLogBuffer) Stats() LogBufferStats

Stats returns buffer statistics

type LockFreeSkipList

type LockFreeSkipList struct {
	// contains filtered or unexported fields
}

LockFreeSkipList implements a lock-free skip list using CAS operations Based on the algorithm by Herlihy, Lev, Luchangco, and Shavit

func NewLockFreeSkipList

func NewLockFreeSkipList(config SkipListConfig) *LockFreeSkipList

NewLockFreeSkipList creates a new lock-free skip list

func (*LockFreeSkipList) Clear

func (sl *LockFreeSkipList) Clear()

Clear removes all elements (not lock-free, for testing only)

func (*LockFreeSkipList) CompactMemory

func (sl *LockFreeSkipList) CompactMemory() int

CompactMemory helps GC by clearing deleted nodes (not lock-free)

func (*LockFreeSkipList) Delete

func (sl *LockFreeSkipList) Delete(key []byte) bool

Delete removes a key from the skip list

func (*LockFreeSkipList) Insert

func (sl *LockFreeSkipList) Insert(key []byte, value []byte) bool

Insert adds a key-value pair to the skip list

func (*LockFreeSkipList) Length

func (sl *LockFreeSkipList) Length() int64

Length returns the approximate number of elements

func (*LockFreeSkipList) MaxKey

func (sl *LockFreeSkipList) MaxKey() ([]byte, bool)

MaxKey returns the largest key (if any)

func (*LockFreeSkipList) MinKey

func (sl *LockFreeSkipList) MinKey() ([]byte, bool)

MinKey returns the smallest key (if any)

func (*LockFreeSkipList) Range

func (sl *LockFreeSkipList) Range(start, end []byte, callback func(key, value []byte) bool)

Range iterates over keys in sorted order within [start, end]. Snapshot iteration may miss concurrent updates.

func (*LockFreeSkipList) RangeScan

func (sl *LockFreeSkipList) RangeScan(start, end []byte) []KeyValue

RangeScan returns all key-value pairs in [start, end]

func (*LockFreeSkipList) Search

func (sl *LockFreeSkipList) Search(key []byte) ([]byte, bool)

Search looks up a key in the skip list

func (*LockFreeSkipList) Stats

func (sl *LockFreeSkipList) Stats() SkipListStats

Stats returns skip list statistics

type LogBufferConfig

type LogBufferConfig struct {
	SlotSize    int  // Size of each slot in bytes
	Capacity    int  // Number of slots (must be power of 2)
	FlushOnFull bool // Auto-flush when buffer is full
}

LogBufferConfig contains configuration for the lock-free log buffer

func DefaultLogBufferConfig

func DefaultLogBufferConfig() LogBufferConfig

DefaultLogBufferConfig returns default configuration

type LogBufferRecord

type LogBufferRecord struct {
	LSN  uint64
	Data []byte
}

LogBufferRecord represents a single record from the buffer

type LogBufferStats

type LogBufferStats struct {
	Capacity     uint64  // Total capacity (number of slots)
	SlotSize     uint64  // Size of each slot in bytes
	Available    uint64  // Number of records available to read
	Free         uint64  // Number of free slots
	Appends      uint64  // Total appends
	Reads        uint64  // Total reads
	Wraps        uint64  // Number of times buffer wrapped
	Contentions  uint64  // Number of CAS contentions
	MaxOccupancy uint64  // Maximum occupancy observed
	Utilization  float64 // Current utilization percentage
}

LogBufferStats contains statistics about the log buffer

type LogManager

type LogManager struct {
	// contains filtered or unexported fields
}

LogManager manages the write-ahead log Now supports both serial and parallel modes for better concurrency

func NewLogManager

func NewLogManager(logFileName string) (*LogManager, error)

NewLogManager creates a new log manager

func NewLogManagerWithConfig

func NewLogManagerWithConfig(logFileName string, useParallel bool, useCompression bool, compressionAlg string) (*LogManager, error)

NewLogManagerWithConfig creates a log manager with specific configuration

func (*LogManager) AppendLog

func (lm *LogManager) AppendLog(record *LogRecord) (uint64, error)

AppendLog adds a log record and returns its LSN

func (*LogManager) Close

func (lm *LogManager) Close() error

Close closes the log manager

func (*LogManager) Flush

func (lm *LogManager) Flush() error

Flush writes buffered log records to disk

func (*LogManager) FlushToLSN

func (lm *LogManager) FlushToLSN(lsn uint64) error

FlushToLSN flushes all log records up to and including the specified LSN

func (*LogManager) GetCurrentLSN

func (lm *LogManager) GetCurrentLSN() uint64

GetCurrentLSN returns the current LSN

func (*LogManager) GetFlushedLSN

func (lm *LogManager) GetFlushedLSN() uint64

GetFlushedLSN returns the last flushed LSN

func (*LogManager) ReadAllLogs

func (lm *LogManager) ReadAllLogs() ([]*LogRecord, error)

ReadAllLogs reads all log records from the file

type LogRecord

type LogRecord struct {
	LSN        uint64  // Log Sequence Number (unique, monotonic)
	PrevLSN    uint64  // Previous LSN for this transaction
	TxnID      uint64  // Transaction ID
	Type       LogType // Type of operation
	PageID     uint32  // Affected page
	Offset     uint16  // Offset within page
	Length     uint16  // Length of data
	BeforeData []byte  // Old value (for UNDO)
	AfterData  []byte  // New value (for REDO)
}

LogRecord represents a single WAL entry

func DecompressLogRecord

func DecompressLogRecord(compressed *CompressedLogRecord, beforeData []byte) (*LogRecord, error)

DecompressLogRecord reconstructs the original log record

func DeserializeLogRecord

func DeserializeLogRecord(data []byte) (*LogRecord, error)

DeserializeLogRecord creates LogRecord from bytes

func (*LogRecord) Serialize

func (lr *LogRecord) Serialize() []byte

Serialize converts LogRecord to bytes Format: LSN(8) | PrevLSN(8) | TxnID(8) | Type(1) | PageID(4) | Offset(2) | Length(2) |

BeforeDataLen(2) | BeforeData | AfterDataLen(2) | AfterData

type LogRecycler

type LogRecycler struct {
	// contains filtered or unexported fields
}

LogRecycler manages WAL segment recycling

func NewLogRecycler

func NewLogRecycler(baseDir string, baseFilename string) *LogRecycler

NewLogRecycler creates a new log recycler

func (*LogRecycler) Checkpoint

func (lr *LogRecycler) Checkpoint(checkpointLSN uint64) error

Checkpoint marks a checkpoint LSN, allowing older segments to be archived

func (*LogRecycler) Close

func (lr *LogRecycler) Close() error

Close closes all open segments

func (*LogRecycler) DeleteOldSegments

func (lr *LogRecycler) DeleteOldSegments(beforeLSN uint64) (int, error)

DeleteOldSegments permanently deletes segments older than the given LSN

func (*LogRecycler) GetOrCreateActiveSegment

func (lr *LogRecycler) GetOrCreateActiveSegment() (*LogSegment, error)

GetOrCreateActiveSegment returns the current active segment, creating new one if needed

func (*LogRecycler) GetSegmentFilename

func (lr *LogRecycler) GetSegmentFilename(segmentID uint32) string

GetSegmentFilename generates a segment filename

func (*LogRecycler) GetStats

func (lr *LogRecycler) GetStats() SegmentStats

func (*LogRecycler) ListSegments

func (lr *LogRecycler) ListSegments() ([]string, error)

ListSegments returns all segment files in the directory

func (*LogRecycler) LoadExistingSegments

func (lr *LogRecycler) LoadExistingSegments() error

LoadExistingSegments scans directory and loads existing segments

func (*LogRecycler) RecycleArchivedSegments

func (lr *LogRecycler) RecycleArchivedSegments() (int, error)

RecycleArchivedSegments moves archived segments to recycle pool

func (*LogRecycler) SetMaxSegmentSize

func (lr *LogRecycler) SetMaxSegmentSize(maxSize int64)

SetMaxSegmentSize configures the maximum segment size

func (*LogRecycler) WriteToSegment

func (lr *LogRecycler) WriteToSegment(data []byte, lsn uint64) error

WriteToSegment writes data to the current active segment

type LogSegment

type LogSegment struct {
	SegmentID  uint32
	FilePath   string
	File       *os.File
	Size       int64
	MinLSN     uint64 // Minimum LSN in this segment
	MaxLSN     uint64 // Maximum LSN in this segment
	IsActive   bool   // Currently being written to
	IsArchived bool   // Can be recycled after checkpoint
}

LogSegment represents a WAL segment file

type LogType

type LogType byte

LogType represents the type of log record

const (
	LogInsert LogType = iota
	LogDelete
	LogUpdate
	LogCommit
	LogAbort
	LogCheckpoint
)

func (LogType) String

func (lt LogType) String() string

String returns string representation of LogType

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics tracks storage engine performance metrics

func NewMetrics

func NewMetrics() *Metrics

NewMetrics creates a new metrics tracker

func (*Metrics) GetBPTreeInsertLatency

func (m *Metrics) GetBPTreeInsertLatency() HistogramSnapshot

GetBPTreeInsertLatency returns snapshot of B+ tree insert latency distribution

func (*Metrics) GetBPTreeSearchLatency

func (m *Metrics) GetBPTreeSearchLatency() HistogramSnapshot

GetBPTreeSearchLatency returns snapshot of B+ tree search latency distribution

func (*Metrics) GetCacheHitRate

func (m *Metrics) GetCacheHitRate() float64

func (*Metrics) GetCacheHits

func (m *Metrics) GetCacheHits() uint64

func (*Metrics) GetCacheMisses

func (m *Metrics) GetCacheMisses() uint64

func (*Metrics) GetDirtyPageFlushes

func (m *Metrics) GetDirtyPageFlushes() uint64

func (*Metrics) GetPageEvictions

func (m *Metrics) GetPageEvictions() uint64

func (*Metrics) GetPageFetchLatency

func (m *Metrics) GetPageFetchLatency() HistogramSnapshot

GetPageFetchLatency returns snapshot of page fetch latency distribution

func (*Metrics) GetPageFlushLatency

func (m *Metrics) GetPageFlushLatency() HistogramSnapshot

GetPageFlushLatency returns snapshot of page flush latency distribution

func (*Metrics) GetRecoveries

func (m *Metrics) GetRecoveries() uint64

func (*Metrics) GetRedoOps

func (m *Metrics) GetRedoOps() uint64

func (*Metrics) GetTxnCommitLatency

func (m *Metrics) GetTxnCommitLatency() HistogramSnapshot

GetTxnCommitLatency returns snapshot of transaction commit latency distribution

func (*Metrics) GetTxnsAborted

func (m *Metrics) GetTxnsAborted() uint64

func (*Metrics) GetTxnsCommitted

func (m *Metrics) GetTxnsCommitted() uint64

func (*Metrics) GetTxnsStarted

func (m *Metrics) GetTxnsStarted() uint64

func (*Metrics) GetUndoOps

func (m *Metrics) GetUndoOps() uint64

func (*Metrics) GetUptime

func (m *Metrics) GetUptime() time.Duration

func (*Metrics) LogMetrics

func (m *Metrics) LogMetrics(logger *slog.Logger)

LogMetrics logs all metrics using structured logging

func (*Metrics) RecordBPTreeInsertLatency

func (m *Metrics) RecordBPTreeInsertLatency(duration time.Duration)

RecordBPTreeInsertLatency records the latency of a B+ tree insert

func (*Metrics) RecordBPTreeSearchLatency

func (m *Metrics) RecordBPTreeSearchLatency(duration time.Duration)

RecordBPTreeSearchLatency records the latency of a B+ tree search

func (*Metrics) RecordCacheHit

func (m *Metrics) RecordCacheHit()

func (*Metrics) RecordCacheMiss

func (m *Metrics) RecordCacheMiss()

func (*Metrics) RecordDirtyPageFlush

func (m *Metrics) RecordDirtyPageFlush()

func (*Metrics) RecordPageEviction

func (m *Metrics) RecordPageEviction()

func (*Metrics) RecordPageFetchLatency

func (m *Metrics) RecordPageFetchLatency(duration time.Duration)

RecordPageFetchLatency records the latency of a page fetch operation

func (*Metrics) RecordPageFlushLatency

func (m *Metrics) RecordPageFlushLatency(duration time.Duration)

RecordPageFlushLatency records the latency of a page flush operation

func (*Metrics) RecordRecovery

func (m *Metrics) RecordRecovery()

func (*Metrics) RecordRedoOp

func (m *Metrics) RecordRedoOp()

func (*Metrics) RecordTxnAbort

func (m *Metrics) RecordTxnAbort()

func (*Metrics) RecordTxnCommit

func (m *Metrics) RecordTxnCommit()

func (*Metrics) RecordTxnCommitLatency

func (m *Metrics) RecordTxnCommitLatency(duration time.Duration)

RecordTxnCommitLatency records the latency of a transaction commit

func (*Metrics) RecordTxnStart

func (m *Metrics) RecordTxnStart()

func (*Metrics) RecordUndoOp

func (m *Metrics) RecordUndoOp()

func (*Metrics) Reset

func (m *Metrics) Reset()

Reset resets all metrics (useful for testing)

type MmapDiskManager

type MmapDiskManager struct {
	// contains filtered or unexported fields
}

MmapDiskManager provides zero-copy disk access using memory-mapped files

func NewMmapDiskManager

func NewMmapDiskManager(fileName string) (*MmapDiskManager, error)

NewMmapDiskManager creates a new memory-mapped disk manager

func (*MmapDiskManager) Advise

func (dm *MmapDiskManager) Advise(pageId uint32, advice AdviceType) error

Advise provides hints to the OS about memory access patterns

func (*MmapDiskManager) AllocatePage

func (dm *MmapDiskManager) AllocatePage() (uint32, error)

AllocatePage allocates a new page and returns its page ID

func (*MmapDiskManager) Close

func (dm *MmapDiskManager) Close() error

Close unmaps memory and closes the file

func (*MmapDiskManager) Flush

func (dm *MmapDiskManager) Flush() error

Flush ensures all dirty pages are written to disk

func (*MmapDiskManager) FlushPage

func (dm *MmapDiskManager) FlushPage(pageId uint32) error

FlushPage flushes a specific page to disk

func (*MmapDiskManager) FlushPages

func (dm *MmapDiskManager) FlushPages(pageIds []uint32) error

FlushPages flushes multiple pages to disk

func (*MmapDiskManager) GetFileSize

func (dm *MmapDiskManager) GetFileSize() int64

GetFileSize returns the current file size

func (*MmapDiskManager) GetNextPageId

func (dm *MmapDiskManager) GetNextPageId() uint32

GetNextPageId returns the next page ID that will be allocated

func (*MmapDiskManager) GetStats

func (dm *MmapDiskManager) GetStats() MmapStats

func (*MmapDiskManager) ReadPage

func (dm *MmapDiskManager) ReadPage(pageId uint32) ([]byte, error)

ReadPage reads a page from the memory-mapped region (zero-copy)

func (*MmapDiskManager) ReadPageCopy

func (dm *MmapDiskManager) ReadPageCopy(pageId uint32) ([]byte, error)

ReadPageCopy reads a page and returns a copy (safe for modification)

func (*MmapDiskManager) WritePage

func (dm *MmapDiskManager) WritePage(pageId uint32, data []byte) error

WritePage writes a page to the memory-mapped region

func (*MmapDiskManager) WritePagesV

func (dm *MmapDiskManager) WritePagesV(writes []PageWrite) error

WritePagesV writes multiple pages in a single batch operation

type MmapStats

type MmapStats struct {
	FileSize    int64
	MappedSize  int64
	NextPageId  uint32
	UsedPages   uint32
	AllocatedMB int64
	UsedMB      int64
}

Stats returns statistics about the mmap disk manager

type Page

type Page struct {
	// contains filtered or unexported fields
}

Page represents a page in memory with metadata

func NewPage

func NewPage(pageId uint32) *Page

NewPage creates a new page with the given page ID

func (*Page) GetData

func (p *Page) GetData() *SlottedPage

GetData returns the slotted page data

func (*Page) GetPageId

func (p *Page) GetPageId() uint32

GetPageId returns the page ID

func (*Page) GetPinCount

func (p *Page) GetPinCount() int32

GetPinCount returns the pin count

func (*Page) IsDirty

func (p *Page) IsDirty() bool

IsDirty returns whether the page is dirty

func (*Page) Pin

func (p *Page) Pin()

Pin increments the pin count

func (*Page) SetDirty

func (p *Page) SetDirty(dirty bool)

SetDirty sets the dirty flag

func (*Page) Unpin

func (p *Page) Unpin()

Unpin decrements the pin count

type PageBloomFilter

type PageBloomFilter struct {
	// contains filtered or unexported fields
}

PageBloomFilter attaches a Bloom filter to a page for key lookups

func NewPageBloomFilter

func NewPageBloomFilter(pageID uint32, config BloomFilterConfig) *PageBloomFilter

NewPageBloomFilter creates a new page-level Bloom filter

func (*PageBloomFilter) GetFilter

func (pbf *PageBloomFilter) GetFilter() *BloomFilter

GetFilter returns the underlying Bloom filter

func (*PageBloomFilter) GetPageID

func (pbf *PageBloomFilter) GetPageID() uint32

GetPageID returns the page ID this filter belongs to

func (*PageBloomFilter) GetStats

func (pbf *PageBloomFilter) GetStats() PageBloomFilterStats

GetStats returns statistics about the page Bloom filter

func (*PageBloomFilter) InsertKey

func (pbf *PageBloomFilter) InsertKey(key []byte)

InsertKey adds a key to the page's Bloom filter

func (*PageBloomFilter) MayContainKey

func (pbf *PageBloomFilter) MayContainKey(key []byte) bool

MayContainKey checks if a key might exist in the page

type PageBloomFilterStats

type PageBloomFilterStats struct {
	PageID       uint32
	NumInserts   uint32
	NumBits      uint32
	NumHashes    uint32
	FillRatio    float64
	EstimatedFPR float64
}

PageBloomFilterStats holds statistics for a page Bloom filter

type PageCompressionStats

type PageCompressionStats struct {
	TotalPages         uint64
	CompressedPages    uint64
	UncompressedPages  uint64
	TotalBytesOriginal uint64
	TotalBytesStored   uint64
	LZ4Count           uint64
	SnappyCount        uint64
	NoneCount          uint64
}

PageCompressionStats tracks compression statistics

func (*PageCompressionStats) AddCompression

func (pcs *PageCompressionStats) AddCompression(cp *CompressedPage)

AddCompression updates stats for a compression operation

func (*PageCompressionStats) GetCompressionPercentage

func (pcs *PageCompressionStats) GetCompressionPercentage() float64

GetCompressionPercentage returns percentage of pages compressed

func (*PageCompressionStats) GetCompressionRatio

func (pcs *PageCompressionStats) GetCompressionRatio() float64

GetCompressionRatio returns overall compression ratio

func (*PageCompressionStats) GetSpaceSavings

func (pcs *PageCompressionStats) GetSpaceSavings() uint64

GetSpaceSavings returns total bytes saved

type PageDependencyGraph

type PageDependencyGraph struct {
	// contains filtered or unexported fields
}

PageDependencyGraph tracks dependencies between log records

func NewPageDependencyGraph

func NewPageDependencyGraph() *PageDependencyGraph

NewPageDependencyGraph creates a new dependency graph

func (*PageDependencyGraph) AddRecord

func (g *PageDependencyGraph) AddRecord(record *LogRecord)

AddRecord adds a log record to the dependency graph

func (*PageDependencyGraph) GetAllPages

func (g *PageDependencyGraph) GetAllPages() []uint32

GetAllPages returns all page IDs in the graph

func (*PageDependencyGraph) GetPageRecords

func (g *PageDependencyGraph) GetPageRecords(pageID uint32) []*LogRecord

GetPageRecords returns all records for a page

type PageHeader struct {
	SlotCount      uint16 // Number of slots in the directory (including deleted)
	TupleCount     uint16 // Number of active (non-deleted) tuples
	SlotDirEnd     uint16 // End of slot directory (grows right)
	TupleDataStart uint16 // Start of tuple data area (grows left from end)
}

PageHeader contains metadata about the page

type PageLatch

type PageLatch struct {
	// contains filtered or unexported fields
}

PageLatch wraps a page with a lock-free RWLatch This replaces the sync.RWMutex in the Page struct

func NewPageLatch

func NewPageLatch(pageId uint32) *PageLatch

NewPageLatch creates a new page with lock-free latching

func (*PageLatch) GetData

func (p *PageLatch) GetData() *SlottedPage

GetData returns the slotted page data

func (*PageLatch) GetLatchStats

func (p *PageLatch) GetLatchStats() RWLatchStats

GetLatchStats returns latch statistics

func (*PageLatch) GetPageId

func (p *PageLatch) GetPageId() uint32

GetPageId returns the page ID

func (*PageLatch) GetPinCount

func (p *PageLatch) GetPinCount() int32

GetPinCount returns the pin count (atomic read)

func (*PageLatch) IsDirty

func (p *PageLatch) IsDirty() bool

IsDirty returns whether the page is dirty (atomic read)

func (*PageLatch) Pin

func (p *PageLatch) Pin()

Pin increments the pin count (atomic)

func (*PageLatch) RLock

func (p *PageLatch) RLock()

RLock acquires a read lock on the page

func (*PageLatch) RUnlock

func (p *PageLatch) RUnlock()

RUnlock releases a read lock on the page

func (*PageLatch) SetDirty

func (p *PageLatch) SetDirty(dirty bool)

SetDirty sets the dirty flag (atomic write)

func (*PageLatch) TryRLock

func (p *PageLatch) TryRLock() bool

TryRLock attempts to acquire a read lock without blocking

func (*PageLatch) TryWLock

func (p *PageLatch) TryWLock() bool

TryWLock attempts to acquire a write lock without blocking

func (*PageLatch) Unpin

func (p *PageLatch) Unpin()

Unpin decrements the pin count (atomic)

func (*PageLatch) WLock

func (p *PageLatch) WLock()

WLock acquires a write lock on the page

func (*PageLatch) WUnlock

func (p *PageLatch) WUnlock()

WUnlock releases a write lock on the page

type PageTableShard

type PageTableShard struct {
	// contains filtered or unexported fields
}

PageTableShard represents a single shard with its own lock

type PageWrite

type PageWrite struct {
	PageID uint32
	Data   []byte
}

PageWrite represents a single page write operation

type ParallelLogManager

type ParallelLogManager struct {
	// contains filtered or unexported fields
}

ParallelLogManager provides lock-free LSN allocation using atomic operations, allowing multiple threads to append log records with reduced contention.

func NewParallelLogManager

func NewParallelLogManager(logFileName string) (*ParallelLogManager, error)

NewParallelLogManager creates a new parallel log manager

func (*ParallelLogManager) AppendLog

func (plm *ParallelLogManager) AppendLog(record *LogRecord) (uint64, error)

AppendLog adds a log record using atomic LSN allocation

func (*ParallelLogManager) Close

func (plm *ParallelLogManager) Close() error

Close closes the parallel log manager

func (*ParallelLogManager) Flush

func (plm *ParallelLogManager) Flush() error

Flush writes all buffered log records to disk

func (*ParallelLogManager) FlushToLSN

func (plm *ParallelLogManager) FlushToLSN(lsn uint64) error

FlushToLSN flushes all log records up to the specified LSN

func (*ParallelLogManager) GetBufferStats

func (plm *ParallelLogManager) GetBufferStats() int

GetBufferStats returns statistics about buffer usage

func (*ParallelLogManager) GetCurrentLSN

func (plm *ParallelLogManager) GetCurrentLSN() uint64

GetCurrentLSN returns the current LSN

func (*ParallelLogManager) GetFlushedLSN

func (plm *ParallelLogManager) GetFlushedLSN() uint64

GetFlushedLSN returns the last flushed LSN

func (*ParallelLogManager) ReadAllLogs

func (plm *ParallelLogManager) ReadAllLogs() ([]*LogRecord, error)

ReadAllLogs reads all log records from the file

type ParallelRangeScanStats

type ParallelRangeScanStats struct {
	NumChunks      int
	NumWorkers     int
	EntriesScanned int64
	ChunkSizes     []int
}

ParallelRangeScanStats contains statistics about a parallel scan

type ParallelRecoveryConfig

type ParallelRecoveryConfig struct {
	NumWorkers int // Number of parallel workers
}

ParallelRecoveryConfig configures parallel recovery

func DefaultParallelRecoveryConfig

func DefaultParallelRecoveryConfig() ParallelRecoveryConfig

DefaultParallelRecoveryConfig returns default configuration

type ParallelRecoveryManager

type ParallelRecoveryManager struct {
	// contains filtered or unexported fields
}

ParallelRecoveryManager implements parallel ARIES recovery

func NewParallelRecoveryManager

func NewParallelRecoveryManager(logManager *LogManager, config ParallelRecoveryConfig) *ParallelRecoveryManager

NewParallelRecoveryManager creates a new parallel recovery manager

func (*ParallelRecoveryManager) AnalysisPass

func (rm *ParallelRecoveryManager) AnalysisPass() (map[uint64]bool, map[uint64]bool, error)

AnalysisPass identifies committed and uncommitted transactions

func (*ParallelRecoveryManager) BuildDependencyGraphs

func (rm *ParallelRecoveryManager) BuildDependencyGraphs(
	committedTxns map[uint64]bool,
	uncommittedTxns map[uint64]bool,
	redoGraph *PageDependencyGraph,
	undoGraph *PageDependencyGraph,
) error

BuildDependencyGraphs builds page-level dependency graphs

func (*ParallelRecoveryManager) GetMetrics

func (rm *ParallelRecoveryManager) GetMetrics() *Metrics

GetMetrics returns the recovery manager metrics

func (*ParallelRecoveryManager) GetNumWorkers

func (rm *ParallelRecoveryManager) GetNumWorkers() int

GetNumWorkers returns the number of workers

func (*ParallelRecoveryManager) GetRecoveryStats

func (rm *ParallelRecoveryManager) GetRecoveryStats() RecoveryStats

GetRecoveryStats returns recovery statistics

func (*ParallelRecoveryManager) ParallelRecover

func (rm *ParallelRecoveryManager) ParallelRecover() error

ParallelRecover performs parallel ARIES recovery

func (*ParallelRecoveryManager) ParallelRedoPass

func (rm *ParallelRecoveryManager) ParallelRedoPass(graph *PageDependencyGraph) (int64, error)

ParallelRedoPass performs parallel redo

func (*ParallelRecoveryManager) ParallelUndoPass

func (rm *ParallelRecoveryManager) ParallelUndoPass(graph *PageDependencyGraph) (int64, error)

ParallelUndoPass performs parallel undo

type ParallelScanConfig

type ParallelScanConfig struct {
	NumWorkers        int  // Number of parallel workers
	ChunkSize         int  // Number of entries per chunk
	OrderedResults    bool // Whether to maintain order (slower)
	EnablePrefetching bool // Whether to prefetch next chunks
}

ParallelScanConfig contains configuration for parallel range scans

func DefaultParallelScanConfig

func DefaultParallelScanConfig() ParallelScanConfig

DefaultParallelScanConfig returns a sensible default configuration

type PipelineCommitManager

type PipelineCommitManager struct {
	// contains filtered or unexported fields
}

PipelineCommitManager implements a pipelined commit protocol that overlaps validation, WAL writes, and fsync operations across consecutive batches. While one batch waits for fsync, the next batch is validated and written.

func NewPipelineCommitManager

func NewPipelineCommitManager(logManager *LogManager, txnManager *TransactionManager, maxBatchSize int, maxBatchDelay time.Duration) *PipelineCommitManager

NewPipelineCommitManager creates a new pipelined commit manager

func (*PipelineCommitManager) Commit

func (pcm *PipelineCommitManager) Commit(txnID uint64, lsn uint64) error

Commit submits a transaction for pipelined commit

func (*PipelineCommitManager) GetAverageBatchSize

func (pcm *PipelineCommitManager) GetAverageBatchSize() float64

GetAverageBatchSize returns average commits per batch

func (*PipelineCommitManager) GetAverageFsyncTime

func (pcm *PipelineCommitManager) GetAverageFsyncTime() int64

GetAverageFsyncTime returns average fsync time per batch (nanoseconds)

func (*PipelineCommitManager) GetAverageValidationTime

func (pcm *PipelineCommitManager) GetAverageValidationTime() int64

GetAverageValidationTime returns average validation time per batch (nanoseconds)

func (*PipelineCommitManager) GetAverageWriteTime

func (pcm *PipelineCommitManager) GetAverageWriteTime() int64

GetAverageWriteTime returns average write time per batch (nanoseconds)

func (*PipelineCommitManager) GetPipelineEfficiency

func (pcm *PipelineCommitManager) GetPipelineEfficiency() float64

GetPipelineEfficiency returns ratio of actual work time to elapsed time Higher is better - indicates pipeline is keeping stages busy

func (*PipelineCommitManager) GetStats

GetStats returns current pipeline statistics Returns a snapshot with atomic loads to avoid race conditions

func (*PipelineCommitManager) GetTotalPipelineTime

func (pcm *PipelineCommitManager) GetTotalPipelineTime() int64

GetTotalPipelineTime returns total time spent in all stages (nanoseconds)

func (*PipelineCommitManager) Shutdown

func (pcm *PipelineCommitManager) Shutdown()

Shutdown stops the pipeline and waits for in-flight batches

type PipelineStage

type PipelineStage int

PipelineStage represents a stage in the transaction commit pipeline

const (
	StageValidation PipelineStage = iota // Check transaction can commit
	StageWrite                           // Write commit record to log buffer
	StageFsync                           // Flush log to disk
)

type PipelineStats

type PipelineStats struct {
	TotalCommits        atomic.Uint64 // Total commits processed
	TotalBatches        atomic.Uint64 // Total batches processed
	TotalFsyncs         atomic.Uint64 // Total fsync operations
	ValidationTime      atomic.Int64  // Total time in validation (ns)
	WriteTime           atomic.Int64  // Total time in write (ns)
	FsyncTime           atomic.Int64  // Total time in fsync (ns)
	PipelineUtilization atomic.Int64  // Percentage of time with work in flight
}

PipelineStats tracks pipeline performance metrics

type PipelineStatsSnapshot added in v2.9.0

type PipelineStatsSnapshot struct {
	TotalCommits        uint64 // Total commits processed
	TotalBatches        uint64 // Total batches processed
	TotalFsyncs         uint64 // Total fsync operations
	ValidationTime      int64  // Total time in validation (ns)
	WriteTime           int64  // Total time in write (ns)
	FsyncTime           int64  // Total time in fsync (ns)
	PipelineUtilization int64  // Percentage of time with work in flight
}

PipelineStatsSnapshot is a plain-value snapshot of pipeline statistics

type PrefetchStats

type PrefetchStats struct {
	PatternsDetected  uint64
	PagesPrefetched   uint64
	PrefetchHits      uint64 // Pages used before eviction
	PrefetchMisses    uint64 // Pages evicted before use
	PrefetchQueueFull uint64
	AvgConfidence     float64 // Average confidence of triggered prefetches
	StridesDetected   uint64  // Number of non-sequential strides detected
}

PrefetchStats tracks prefetching effectiveness

type Prefetcher

type Prefetcher struct {
	// contains filtered or unexported fields
}

Prefetcher detects sequential access patterns and prefetches pages

func NewPrefetcher

func NewPrefetcher(bpm *BufferPoolManager) *Prefetcher

NewPrefetcher creates a new prefetcher for the given buffer pool

func (*Prefetcher) Cleanup

func (p *Prefetcher) Cleanup()

Cleanup removes stale patterns (called periodically)

func (*Prefetcher) ClearPattern

func (p *Prefetcher) ClearPattern(contextID uint64)

ClearPattern removes pattern tracking for a context (e.g., after transaction commit)

func (*Prefetcher) Configure

func (p *Prefetcher) Configure(detectionThreshold, prefetchDistance int)

Configure sets prefetcher parameters

func (*Prefetcher) GetStats

func (p *Prefetcher) GetStats() PrefetchStats

GetStats returns current prefetching statistics

func (*Prefetcher) RecordAccess

func (p *Prefetcher) RecordAccess(contextID uint64, pageID uint32)

RecordAccess records a page access and detects patterns with stride and confidence contextID should be transaction ID or goroutine ID

func (*Prefetcher) ResetStats

func (p *Prefetcher) ResetStats()

ResetStats resets prefetching statistics

func (*Prefetcher) SetEnabled

func (p *Prefetcher) SetEnabled(enabled bool)

Enable or disable prefetching

func (*Prefetcher) StartCleanupWorker

func (p *Prefetcher) StartCleanupWorker(stopChan <-chan struct{})

StartCleanupWorker starts a background goroutine to clean up stale patterns

type RWLatch

type RWLatch struct {
	// contains filtered or unexported fields
}

RWLatch provides lock-free reader-writer synchronization

func NewRWLatch

func NewRWLatch() *RWLatch

NewRWLatch creates a new lock-free RWLatch

func (*RWLatch) GetReaderCount

func (rw *RWLatch) GetReaderCount() uint32

GetReaderCount returns the current number of active readers (for testing)

func (*RWLatch) GetStats

func (rw *RWLatch) GetStats() RWLatchStats

GetStats returns statistics about the latch state

func (*RWLatch) GetWriterWaitingCount

func (rw *RWLatch) GetWriterWaitingCount() uint32

GetWriterWaitingCount returns the number of writers waiting (for testing)

func (*RWLatch) IsWriterActive

func (rw *RWLatch) IsWriterActive() bool

IsWriterActive returns true if a writer currently holds the latch (for testing)

func (*RWLatch) Lock

func (rw *RWLatch) Lock()

Lock acquires a write lock. Only one writer can hold the latch, and no readers can be active.

func (*RWLatch) RLock

func (rw *RWLatch) RLock()

RLock acquires a read lock Multiple readers can hold the latch simultaneously

func (*RWLatch) RUnlock

func (rw *RWLatch) RUnlock()

RUnlock releases a read lock

func (*RWLatch) TryLock

func (rw *RWLatch) TryLock() bool

TryLock attempts to acquire a write lock without blocking Returns true if successful, false otherwise

func (*RWLatch) TryRLock

func (rw *RWLatch) TryRLock() bool

TryRLock attempts to acquire a read lock without blocking Returns true if successful, false otherwise

func (*RWLatch) Unlock

func (rw *RWLatch) Unlock()

Unlock releases a write lock

type RWLatchStats

type RWLatchStats struct {
	ReaderCount        uint32 // Number of active readers
	WriterActive       bool   // Is a writer active
	WriterWaitingCount uint32 // Number of writers waiting
}

RWLatchStats contains statistics about a RWLatch

type RecoveryManager

type RecoveryManager struct {
	// contains filtered or unexported fields
}

RecoveryManager implements the ARIES recovery algorithm

func NewRecoveryManager

func NewRecoveryManager(logManager *LogManager) *RecoveryManager

NewRecoveryManager creates a new recovery manager

func (*RecoveryManager) AnalysisPass

func (rm *RecoveryManager) AnalysisPass() (map[uint64]bool, map[uint64]bool, error)

AnalysisPass identifies committed and uncommitted transactions

func (*RecoveryManager) CreateCheckpoint

func (rm *RecoveryManager) CreateCheckpoint() error

CreateCheckpoint creates a checkpoint in the log

func (*RecoveryManager) GetMetrics

func (rm *RecoveryManager) GetMetrics() *Metrics

GetMetrics returns the recovery manager metrics

func (*RecoveryManager) GetRecoveryStats

func (rm *RecoveryManager) GetRecoveryStats() RecoveryStats

GetRecoveryStats returns recovery statistics

func (*RecoveryManager) Recover

func (rm *RecoveryManager) Recover() error

Recover performs full ARIES recovery (Analysis, Redo, Undo)

func (*RecoveryManager) RedoPass

func (rm *RecoveryManager) RedoPass(committedTxns map[uint64]bool) (int, error)

RedoPass replays all operations from committed transactions

func (*RecoveryManager) UndoPass

func (rm *RecoveryManager) UndoPass(uncommittedTxns map[uint64]bool) (int, error)

UndoPass rolls back operations from uncommitted transactions

type RecoveryStats

type RecoveryStats struct {
	CommittedTxns   int
	UncommittedTxns int
	RedoOperations  int
	UndoOperations  int
}

RecoveryStats tracks recovery statistics

type Replacer

type Replacer interface {
	// Victim selects a frame to evict
	// Returns the frame ID and true if a victim was found, false otherwise
	Victim() (uint32, bool)

	// Pin marks a frame as in-use (not evictable)
	Pin(frameID uint32)

	// Unpin marks a frame as available for eviction
	Unpin(frameID uint32)

	// Size returns the number of evictable frames
	Size() uint32
}

Replacer interface for page replacement policies Allows different algorithms (LRU, 2Q, ARC, etc.)

func NewReplacer

func NewReplacer(algorithm string, capacity uint32) Replacer

NewReplacer creates a replacer based on the specified algorithm

type ScanResult

type ScanResult struct {
	Key   int64
	Value int64
	Chunk int // Chunk number for ordering
	Index int // Index within chunk
}

ScanResult represents a result from a parallel scan

type SegmentStats

type SegmentStats struct {
	TotalSegments    int
	ActiveSegments   int
	ArchivedSegments int
	RecycledSegments int
	TotalSize        int64
	OldestLSN        uint64
	NewestLSN        uint64
}

GetSegmentStats returns statistics about segments

type ShardedPageTable

type ShardedPageTable struct {
	// contains filtered or unexported fields
}

ShardedPageTable provides a thread-safe, sharded hash table for pages Reduces lock contention by partitioning the page table into multiple shards

func NewShardedPageTable

func NewShardedPageTable(numShards uint32) *ShardedPageTable

NewShardedPageTable creates a new sharded page table numShards should be a power of 2 for efficient modulo operations Recommended: 64-256 shards for good parallelism

func (*ShardedPageTable) Clear

func (spt *ShardedPageTable) Clear()

Clear removes all pages from all shards

func (*ShardedPageTable) Delete

func (spt *ShardedPageTable) Delete(pageId uint32)

Delete removes a page from the table

func (*ShardedPageTable) ForEach

func (spt *ShardedPageTable) ForEach(fn func(pageId uint32, page *Page) bool)

ForEach executes a function for each page in the table The function is called while holding the shard lock, so it should be fast

func (*ShardedPageTable) Get

func (spt *ShardedPageTable) Get(pageId uint32) (*Page, bool)

Get retrieves a page from the table

func (*ShardedPageTable) GetAll

func (spt *ShardedPageTable) GetAll() []*Page

GetAll returns all pages (useful for iteration) This acquires all locks and should be used sparingly

func (*ShardedPageTable) Put

func (spt *ShardedPageTable) Put(pageId uint32, page *Page)

Put adds or updates a page in the table

func (*ShardedPageTable) Size

func (spt *ShardedPageTable) Size() int

Size returns the total number of pages across all shards

type SkipListConfig

type SkipListConfig struct {
	MaxLevel int32   // Maximum number of levels (default: 16)
	P        float64 // Probability for level generation (default: 0.5)
}

SkipListConfig configures the skip list

func DefaultSkipListConfig

func DefaultSkipListConfig() SkipListConfig

DefaultSkipListConfig returns default configuration

type SkipListStats

type SkipListStats struct {
	Length      int64
	MaxLevel    int32
	LevelCounts []int32
}

SkipListStats contains skip list statistics

type SlotEntry

type SlotEntry struct {
	Offset uint16 // Offset of the tuple in the page
	Length uint16 // Length of the tuple
}

SlotEntry represents an entry in the slot directory

type SlottedPage

type SlottedPage struct {
	// contains filtered or unexported fields
}

SlottedPage represents a page with slotted page layout

func DeserializeSlottedPage

func DeserializeSlottedPage(data []byte) (*SlottedPage, error)

DeserializeSlottedPage reconstructs a SlottedPage from bytes

func NewSlottedPage

func NewSlottedPage() *SlottedPage

NewSlottedPage creates a new slotted page

func (*SlottedPage) DeleteTuple

func (sp *SlottedPage) DeleteTuple(slotId uint16) error

DeleteTuple marks a tuple as deleted

func (*SlottedPage) GetFreeSpaceSize

func (sp *SlottedPage) GetFreeSpaceSize() uint16

GetFreeSpaceSize returns the amount of free space in the page

func (*SlottedPage) GetTupleCount

func (sp *SlottedPage) GetTupleCount() uint16

GetTupleCount returns the number of tuples in the page

func (*SlottedPage) InsertTuple

func (sp *SlottedPage) InsertTuple(tuple *Tuple) (uint16, error)

InsertTuple inserts a new tuple into the page and returns its slot ID

func (*SlottedPage) ReadTuple

func (sp *SlottedPage) ReadTuple(slotId uint16) (*Tuple, error)

ReadTuple reads a tuple from the page given its slot ID

func (*SlottedPage) Serialize

func (sp *SlottedPage) Serialize() []byte

Serialize converts the SlottedPage to a byte array

func (*SlottedPage) UpdateTuple

func (sp *SlottedPage) UpdateTuple(slotId uint16, newTuple *Tuple) error

UpdateTuple updates an existing tuple

type Snapshot

type Snapshot struct {
	XminSnapshot uint64   // Oldest active transaction
	XmaxSnapshot uint64   // Next transaction ID
	ActiveTxns   []uint64 // Active transaction IDs at snapshot time
}

Snapshot represents a point-in-time view for MVCC

func (*Snapshot) IsVisible

func (s *Snapshot) IsVisible(tuple *Tuple, currentTxnID uint64) bool

IsVisible checks if a tuple is visible to a transaction

type StorageError

type StorageError struct {
	Code    ErrorCode
	Message string
	Op      string // Operation that failed
	Err     error  // Underlying error (if any)
}

StorageError represents a storage engine error with context

func ErrDiskOperation

func ErrDiskOperation(op string, err error) *StorageError

func ErrDuplicateKey

func ErrDuplicateKey(op string, key int) *StorageError

func ErrInvalidTxnState

func ErrInvalidTxnState(op string, txnID uint64, state string) *StorageError

func ErrKeyNotFound

func ErrKeyNotFound(op string, key int) *StorageError

func ErrLogCorrupted

func ErrLogCorrupted(op string, lsn uint64) *StorageError

func ErrNoFreePages

func ErrNoFreePages(op string) *StorageError

func ErrPageFull

func ErrPageFull(op string, pageID uint32) *StorageError

func ErrPageNotFound

func ErrPageNotFound(op string, pageID uint32) *StorageError

func ErrPagePinned

func ErrPagePinned(op string, pageID uint32, pinCount int) *StorageError

func ErrTxnNotFound

func ErrTxnNotFound(op string, txnID uint64) *StorageError

func NewStorageError

func NewStorageError(code ErrorCode, op, message string, err error) *StorageError

NewStorageError creates a new storage error

func (*StorageError) Error

func (e *StorageError) Error() string

Error implements the error interface

func (*StorageError) Is

func (e *StorageError) Is(target error) bool

Is checks if the error matches a specific error code

func (*StorageError) Unwrap

func (e *StorageError) Unwrap() error

Unwrap returns the underlying error

type Transaction

type Transaction struct {
	TxnID uint64

	StartTime time.Time
	LastLSN   uint64   // Last log record for this transaction
	UndoLog   []uint64 // LSNs for undo operations
	// contains filtered or unexported fields
}

Transaction represents a database transaction

func (*Transaction) CompareAndSwapState

func (t *Transaction) CompareAndSwapState(oldState, newState TxnState) bool

CompareAndSwapState atomically compares and swaps the transaction state Returns true if the swap was successful

func (*Transaction) GetState

func (t *Transaction) GetState() TxnState

GetState returns the current transaction state (thread-safe)

func (*Transaction) Reset

func (t *Transaction) Reset()

Reset resets transaction for reuse

func (*Transaction) SetState

func (t *Transaction) SetState(newState TxnState)

SetState sets the transaction state atomically

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager manages database transactions

func NewTransactionManager

func NewTransactionManager(logManager *LogManager) *TransactionManager

NewTransactionManager creates a new transaction manager

func (*TransactionManager) Abort

func (tm *TransactionManager) Abort(txnID uint64) error

Abort aborts a transaction and undoes all its changes

func (*TransactionManager) Begin

func (tm *TransactionManager) Begin() (*Transaction, error)

Begin starts a new transaction

func (*TransactionManager) Close

func (tm *TransactionManager) Close() error

Close gracefully shuts down the transaction manager

func (*TransactionManager) Commit

func (tm *TransactionManager) Commit(txnID uint64) error

Commit commits a transaction

func (*TransactionManager) GetActiveTxns

func (tm *TransactionManager) GetActiveTxns() []uint64

GetActiveTxns returns all active transaction IDs

func (*TransactionManager) GetGroupCommitStats

func (tm *TransactionManager) GetGroupCommitStats() *GroupCommitStats

GetGroupCommitStats returns group commit statistics if enabled

func (*TransactionManager) GetMetrics

func (tm *TransactionManager) GetMetrics() *Metrics

GetMetrics returns the transaction manager metrics

func (*TransactionManager) GetSnapshot

func (tm *TransactionManager) GetSnapshot(txnID uint64) *Snapshot

GetSnapshot creates a snapshot for MVCC visibility checks

func (*TransactionManager) GetTransaction

func (tm *TransactionManager) GetTransaction(txnID uint64) (*Transaction, bool)

GetTransaction returns a transaction by ID (for internal use)

func (*TransactionManager) RecordUndo

func (tm *TransactionManager) RecordUndo(txnID uint64, lsn uint64) error

RecordUndo adds an LSN to the transaction's undo log

func (*TransactionManager) SetBufferPool added in v2.9.0

func (tm *TransactionManager) SetBufferPool(bpm *BufferPoolManager)

SetBufferPool sets the buffer pool manager for page modifications This allows the transaction manager to perform actual page updates during undo operations

type Tuple

type Tuple struct {
	// contains filtered or unexported fields
}

Tuple represents a tuple with header and data Includes MVCC fields for snapshot isolation

func NewTuple

func NewTuple(data []byte) *Tuple

NewTuple creates a new tuple with the given data

func NewTupleWithMVCC

func NewTupleWithMVCC(data []byte, xmin uint64) *Tuple

NewTupleWithMVCC creates a new tuple with MVCC transaction ID

func (*Tuple) GetData

func (t *Tuple) GetData() []byte

GetData returns the tuple's data

func (*Tuple) GetSize

func (t *Tuple) GetSize() uint16

GetSize returns the size of the tuple

func (*Tuple) GetXmax

func (t *Tuple) GetXmax() uint64

GetXmax returns the deleting transaction ID (0 if not deleted)

func (*Tuple) GetXmin

func (t *Tuple) GetXmin() uint64

GetXmin returns the creating transaction ID

func (*Tuple) IsDeleted

func (t *Tuple) IsDeleted() bool

IsDeleted returns true if the tuple has been marked as deleted

func (*Tuple) SetXmax

func (t *Tuple) SetXmax(xmax uint64)

SetXmax marks the tuple as deleted by the given transaction

type TwoQReplacer

type TwoQReplacer struct {
	// contains filtered or unexported fields
}

TwoQReplacer implements the 2Q cache replacement algorithm 2Q is simpler than ARC but more effective than LRU for many workloads It maintains two queues: - A1 (Am): First-time access queue (FIFO) - A2 (A1in): Frequently accessed queue (LRU) Pages graduate from A1 to A2 on second access

func NewTwoQReplacer

func NewTwoQReplacer(capacity int) *TwoQReplacer

NewTwoQReplacer creates a new 2Q replacer with the given capacity Recommended size ratios (from 2Q paper): - A1: 25% of capacity (first-time pages) - A2: 75% of capacity (frequent pages) - A1out: 50% of capacity (ghost entries)

func (*TwoQReplacer) GetStats

func (r *TwoQReplacer) GetStats() TwoQStats

GetStats returns statistics about the 2Q cache

func (*TwoQReplacer) Pin

func (r *TwoQReplacer) Pin(frameID uint32)

Pin marks a frame as accessed

func (*TwoQReplacer) Remove

func (r *TwoQReplacer) Remove(frameID uint32)

Remove explicitly removes a frame from all queues

func (*TwoQReplacer) Size

func (r *TwoQReplacer) Size() uint32

Size returns the number of frames being tracked

func (*TwoQReplacer) Unpin

func (r *TwoQReplacer) Unpin(frameID uint32)

Unpin removes a frame from consideration

func (*TwoQReplacer) Victim

func (r *TwoQReplacer) Victim() (uint32, bool)

Victim selects a frame to evict

type TwoQStats

type TwoQStats struct {
	A1Size       int // Current pages in A1 (probationary)
	A1MaxSize    int // Max size of A1
	A2Size       int // Current pages in A2 (protected)
	A2MaxSize    int // Max size of A2
	A1outSize    int // Current ghost entries
	A1outMaxSize int // Max ghost entries
	TotalPages   int // Total pages tracked
	Capacity     int // Total capacity
}

TwoQStats contains statistics about the 2Q cache state

type TxnState

type TxnState int32 // Changed to int32 for atomic operations

TxnState represents the state of a transaction

const (
	TxnRunning   TxnState = 0
	TxnCommitted TxnState = 1
	TxnAborted   TxnState = 2
)

type VacuumManager

type VacuumManager struct {
	// contains filtered or unexported fields
}

VacuumManager handles garbage collection of old MVCC tuple versions

func NewVacuumManager

func NewVacuumManager(bpm *BufferPoolManager, txnMgr *TransactionManager) *VacuumManager

NewVacuumManager creates a new vacuum manager

func (*VacuumManager) GetStats

func (vm *VacuumManager) GetStats() VacuumStats

GetStats returns current vacuum statistics

func (*VacuumManager) ResetStats

func (vm *VacuumManager) ResetStats()

ResetStats resets all vacuum statistics

func (*VacuumManager) StartAutoVacuum

func (vm *VacuumManager) StartAutoVacuum(interval time.Duration)

StartAutoVacuum starts automatic vacuum in the background

func (*VacuumManager) StopAutoVacuum

func (vm *VacuumManager) StopAutoVacuum()

StopAutoVacuum stops the automatic vacuum process

func (*VacuumManager) UpdateHorizon

func (vm *VacuumManager) UpdateHorizon() uint64

UpdateHorizon updates the minimum transaction ID horizon Tuples with xmax < minHorizon can be safely reclaimed

func (*VacuumManager) VacuumAll

func (vm *VacuumManager) VacuumAll() (*VacuumStats, error)

VacuumAll performs garbage collection on all pages in the buffer pool

func (*VacuumManager) VacuumPage

func (vm *VacuumManager) VacuumPage(pageID uint32) (int, error)

VacuumPage performs garbage collection on a single page Returns the number of tuples reclaimed

type VacuumStats

type VacuumStats struct {
	TotalScanned   uint64
	TotalReclaimed uint64
	TotalRuns      uint64
	LastRunTime    time.Time
	IsRunning      bool
}

VacuumStats contains statistics about vacuum operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL