Documentation
¶
Index ¶
- Constants
- func CompressPageTransparent(data []byte, compressionType CompressionType) ([]byte, error)
- func DecompressPage(cp *CompressedPage) ([]byte, error)
- func DecompressPageTransparent(data []byte) ([]byte, error)
- func IsCompressedPage(data []byte) bool
- func IsErrorCode(err error, code ErrorCode) bool
- func ParseSegmentID(filename string) (uint32, error)
- func SerializeCompressedPage(cp *CompressedPage) ([]byte, error)
- type ARCReplacer
- type AccessPattern
- type AdaptiveFlushConfig
- type AdaptiveFlushStats
- type AdaptiveFlusher
- func (af *AdaptiveFlusher) GetConfig() AdaptiveFlushConfig
- func (af *AdaptiveFlusher) GetStats() AdaptiveFlushStats
- func (af *AdaptiveFlusher) IsRunning() bool
- func (af *AdaptiveFlusher) SetMaxDirtyRatio(ratio float64) error
- func (af *AdaptiveFlusher) SetTargetDirtyRatio(ratio float64) error
- func (af *AdaptiveFlusher) Start() error
- func (af *AdaptiveFlusher) Stop() error
- func (af *AdaptiveFlusher) TriggerFlush(maxPages int) int
- type AdviceType
- type BPTreeEntry
- type BPTreeIterator
- type BPTreeNode
- type BPTreeNodeType
- type BPlusTree
- func (tree *BPlusTree) BulkLoad(entries []BPTreeEntry) error
- func (tree *BPlusTree) Delete(key int64) error
- func (tree *BPlusTree) GetParallelScanStats(startKey, endKey int64, config ParallelScanConfig) (ParallelRangeScanStats, error)
- func (tree *BPlusTree) Insert(key int64, value int64) error
- func (tree *BPlusTree) Iterator() (*BPTreeIterator, error)
- func (tree *BPlusTree) ParallelRangeScan(startKey, endKey int64, config ParallelScanConfig) (<-chan ScanResult, <-chan error, error)
- func (tree *BPlusTree) ParallelRangeScanCount(startKey, endKey int64, config ParallelScanConfig) (int64, error)
- func (tree *BPlusTree) ParallelRangeScanOrdered(startKey, endKey int64, config ParallelScanConfig) ([]BPTreeEntry, error)
- func (tree *BPlusTree) Search(key int64) (int64, bool, error)
- type BloomFilter
- func (bf *BloomFilter) Clear()
- func (bf *BloomFilter) Clone() *BloomFilter
- func (bf *BloomFilter) EstimateFalsePositiveRate() float64
- func (bf *BloomFilter) GetBytes() []byte
- func (bf *BloomFilter) GetFillRatio() float64
- func (bf *BloomFilter) GetNumBits() uint32
- func (bf *BloomFilter) GetNumHashes() uint32
- func (bf *BloomFilter) GetNumInserts() uint32
- func (bf *BloomFilter) Insert(key []byte)
- func (bf *BloomFilter) Intersect(other *BloomFilter) error
- func (bf *BloomFilter) MayContain(key []byte) bool
- func (bf *BloomFilter) Serialize() []byte
- func (bf *BloomFilter) Union(other *BloomFilter) error
- type BloomFilterConfig
- type BufferPoolManager
- func (bpm *BufferPoolManager) FetchPage(pageId uint32) (*Page, error)
- func (bpm *BufferPoolManager) FlushAllPages() error
- func (bpm *BufferPoolManager) FlushAllPagesParallel(workers int) error
- func (bpm *BufferPoolManager) FlushPage(pageId uint32) error
- func (bpm *BufferPoolManager) GetCapacity() int
- func (bpm *BufferPoolManager) GetDirtyPageCount() int
- func (bpm *BufferPoolManager) GetDirtyPages(maxPages int) []uint32
- func (bpm *BufferPoolManager) GetMetrics() *Metrics
- func (bpm *BufferPoolManager) GetPoolSize() uint32
- func (bpm *BufferPoolManager) IsPageInPool(pageID uint32) bool
- func (bpm *BufferPoolManager) NewPage() (*Page, error)
- func (bpm *BufferPoolManager) SetLogManager(logManager *LogManager)
- func (bpm *BufferPoolManager) UnpinPage(pageId uint32, isDirty bool) error
- type ClockProReplacer
- type ClockProStats
- type CompressedLogRecord
- type CompressedPage
- type CompressionType
- type Config
- type DiskManager
- type ErrorCode
- type FlushableBufferPool
- type GroupCommitManager
- type GroupCommitStats
- type Histogram
- func (h *Histogram) Count() int
- func (h *Histogram) Max() float64
- func (h *Histogram) Mean() float64
- func (h *Histogram) Min() float64
- func (h *Histogram) Percentile(p float64) float64
- func (h *Histogram) Record(latencyUs float64)
- func (h *Histogram) Reset()
- func (h *Histogram) Snapshot() HistogramSnapshot
- type HistogramSnapshot
- type KeyValue
- type LRUNode
- type LRUReplacer
- type LockFreeLogBuffer
- func (lb *LockFreeLogBuffer) Append(lsn uint64, data []byte) (uint64, uint64, error)
- func (lb *LockFreeLogBuffer) Available() uint64
- func (lb *LockFreeLogBuffer) Deserialize(buf []byte) (uint64, []byte, int, error)
- func (lb *LockFreeLogBuffer) Free() uint64
- func (lb *LockFreeLogBuffer) IsEmpty() bool
- func (lb *LockFreeLogBuffer) IsFull() bool
- func (lb *LockFreeLogBuffer) Read() (uint64, uint64, []byte, error)
- func (lb *LockFreeLogBuffer) ReadBatch(maxRecords int) ([]LogBufferRecord, error)
- func (lb *LockFreeLogBuffer) Reset()
- func (lb *LockFreeLogBuffer) Serialize(lsn uint64, data []byte) []byte
- func (lb *LockFreeLogBuffer) Stats() LogBufferStats
- type LockFreeSkipList
- func (sl *LockFreeSkipList) Clear()
- func (sl *LockFreeSkipList) CompactMemory() int
- func (sl *LockFreeSkipList) Delete(key []byte) bool
- func (sl *LockFreeSkipList) Insert(key []byte, value []byte) bool
- func (sl *LockFreeSkipList) Length() int64
- func (sl *LockFreeSkipList) MaxKey() ([]byte, bool)
- func (sl *LockFreeSkipList) MinKey() ([]byte, bool)
- func (sl *LockFreeSkipList) Range(start, end []byte, callback func(key, value []byte) bool)
- func (sl *LockFreeSkipList) RangeScan(start, end []byte) []KeyValue
- func (sl *LockFreeSkipList) Search(key []byte) ([]byte, bool)
- func (sl *LockFreeSkipList) Stats() SkipListStats
- type LogBufferConfig
- type LogBufferRecord
- type LogBufferStats
- type LogManager
- func (lm *LogManager) AppendLog(record *LogRecord) (uint64, error)
- func (lm *LogManager) Close() error
- func (lm *LogManager) Flush() error
- func (lm *LogManager) FlushToLSN(lsn uint64) error
- func (lm *LogManager) GetCurrentLSN() uint64
- func (lm *LogManager) GetFlushedLSN() uint64
- func (lm *LogManager) ReadAllLogs() ([]*LogRecord, error)
- type LogRecord
- type LogRecycler
- func (lr *LogRecycler) Checkpoint(checkpointLSN uint64) error
- func (lr *LogRecycler) Close() error
- func (lr *LogRecycler) DeleteOldSegments(beforeLSN uint64) (int, error)
- func (lr *LogRecycler) GetOrCreateActiveSegment() (*LogSegment, error)
- func (lr *LogRecycler) GetSegmentFilename(segmentID uint32) string
- func (lr *LogRecycler) GetStats() SegmentStats
- func (lr *LogRecycler) ListSegments() ([]string, error)
- func (lr *LogRecycler) LoadExistingSegments() error
- func (lr *LogRecycler) RecycleArchivedSegments() (int, error)
- func (lr *LogRecycler) SetMaxSegmentSize(maxSize int64)
- func (lr *LogRecycler) WriteToSegment(data []byte, lsn uint64) error
- type LogSegment
- type LogType
- type Metrics
- func (m *Metrics) GetBPTreeInsertLatency() HistogramSnapshot
- func (m *Metrics) GetBPTreeSearchLatency() HistogramSnapshot
- func (m *Metrics) GetCacheHitRate() float64
- func (m *Metrics) GetCacheHits() uint64
- func (m *Metrics) GetCacheMisses() uint64
- func (m *Metrics) GetDirtyPageFlushes() uint64
- func (m *Metrics) GetPageEvictions() uint64
- func (m *Metrics) GetPageFetchLatency() HistogramSnapshot
- func (m *Metrics) GetPageFlushLatency() HistogramSnapshot
- func (m *Metrics) GetRecoveries() uint64
- func (m *Metrics) GetRedoOps() uint64
- func (m *Metrics) GetTxnCommitLatency() HistogramSnapshot
- func (m *Metrics) GetTxnsAborted() uint64
- func (m *Metrics) GetTxnsCommitted() uint64
- func (m *Metrics) GetTxnsStarted() uint64
- func (m *Metrics) GetUndoOps() uint64
- func (m *Metrics) GetUptime() time.Duration
- func (m *Metrics) LogMetrics(logger *slog.Logger)
- func (m *Metrics) RecordBPTreeInsertLatency(duration time.Duration)
- func (m *Metrics) RecordBPTreeSearchLatency(duration time.Duration)
- func (m *Metrics) RecordCacheHit()
- func (m *Metrics) RecordCacheMiss()
- func (m *Metrics) RecordDirtyPageFlush()
- func (m *Metrics) RecordPageEviction()
- func (m *Metrics) RecordPageFetchLatency(duration time.Duration)
- func (m *Metrics) RecordPageFlushLatency(duration time.Duration)
- func (m *Metrics) RecordRecovery()
- func (m *Metrics) RecordRedoOp()
- func (m *Metrics) RecordTxnAbort()
- func (m *Metrics) RecordTxnCommit()
- func (m *Metrics) RecordTxnCommitLatency(duration time.Duration)
- func (m *Metrics) RecordTxnStart()
- func (m *Metrics) RecordUndoOp()
- func (m *Metrics) Reset()
- type MmapDiskManager
- func (dm *MmapDiskManager) Advise(pageId uint32, advice AdviceType) error
- func (dm *MmapDiskManager) AllocatePage() (uint32, error)
- func (dm *MmapDiskManager) Close() error
- func (dm *MmapDiskManager) Flush() error
- func (dm *MmapDiskManager) FlushPage(pageId uint32) error
- func (dm *MmapDiskManager) FlushPages(pageIds []uint32) error
- func (dm *MmapDiskManager) GetFileSize() int64
- func (dm *MmapDiskManager) GetNextPageId() uint32
- func (dm *MmapDiskManager) GetStats() MmapStats
- func (dm *MmapDiskManager) ReadPage(pageId uint32) ([]byte, error)
- func (dm *MmapDiskManager) ReadPageCopy(pageId uint32) ([]byte, error)
- func (dm *MmapDiskManager) WritePage(pageId uint32, data []byte) error
- func (dm *MmapDiskManager) WritePagesV(writes []PageWrite) error
- type MmapStats
- type Page
- type PageBloomFilter
- type PageBloomFilterStats
- type PageCompressionStats
- type PageDependencyGraph
- type PageHeader
- type PageLatch
- func (p *PageLatch) GetData() *SlottedPage
- func (p *PageLatch) GetLatchStats() RWLatchStats
- func (p *PageLatch) GetPageId() uint32
- func (p *PageLatch) GetPinCount() int32
- func (p *PageLatch) IsDirty() bool
- func (p *PageLatch) Pin()
- func (p *PageLatch) RLock()
- func (p *PageLatch) RUnlock()
- func (p *PageLatch) SetDirty(dirty bool)
- func (p *PageLatch) TryRLock() bool
- func (p *PageLatch) TryWLock() bool
- func (p *PageLatch) Unpin()
- func (p *PageLatch) WLock()
- func (p *PageLatch) WUnlock()
- type PageTableShard
- type PageWrite
- type ParallelLogManager
- func (plm *ParallelLogManager) AppendLog(record *LogRecord) (uint64, error)
- func (plm *ParallelLogManager) Close() error
- func (plm *ParallelLogManager) Flush() error
- func (plm *ParallelLogManager) FlushToLSN(lsn uint64) error
- func (plm *ParallelLogManager) GetBufferStats() int
- func (plm *ParallelLogManager) GetCurrentLSN() uint64
- func (plm *ParallelLogManager) GetFlushedLSN() uint64
- func (plm *ParallelLogManager) ReadAllLogs() ([]*LogRecord, error)
- type ParallelRangeScanStats
- type ParallelRecoveryConfig
- type ParallelRecoveryManager
- func (rm *ParallelRecoveryManager) AnalysisPass() (map[uint64]bool, map[uint64]bool, error)
- func (rm *ParallelRecoveryManager) BuildDependencyGraphs(committedTxns map[uint64]bool, uncommittedTxns map[uint64]bool, ...) error
- func (rm *ParallelRecoveryManager) GetMetrics() *Metrics
- func (rm *ParallelRecoveryManager) GetNumWorkers() int
- func (rm *ParallelRecoveryManager) GetRecoveryStats() RecoveryStats
- func (rm *ParallelRecoveryManager) ParallelRecover() error
- func (rm *ParallelRecoveryManager) ParallelRedoPass(graph *PageDependencyGraph) (int64, error)
- func (rm *ParallelRecoveryManager) ParallelUndoPass(graph *PageDependencyGraph) (int64, error)
- type ParallelScanConfig
- type PipelineCommitManager
- func (pcm *PipelineCommitManager) Commit(txnID uint64, lsn uint64) error
- func (pcm *PipelineCommitManager) GetAverageBatchSize() float64
- func (pcm *PipelineCommitManager) GetAverageFsyncTime() int64
- func (pcm *PipelineCommitManager) GetAverageValidationTime() int64
- func (pcm *PipelineCommitManager) GetAverageWriteTime() int64
- func (pcm *PipelineCommitManager) GetPipelineEfficiency() float64
- func (pcm *PipelineCommitManager) GetStats() PipelineStatsSnapshot
- func (pcm *PipelineCommitManager) GetTotalPipelineTime() int64
- func (pcm *PipelineCommitManager) Shutdown()
- type PipelineStage
- type PipelineStats
- type PipelineStatsSnapshot
- type PrefetchStats
- type Prefetcher
- func (p *Prefetcher) Cleanup()
- func (p *Prefetcher) ClearPattern(contextID uint64)
- func (p *Prefetcher) Configure(detectionThreshold, prefetchDistance int)
- func (p *Prefetcher) GetStats() PrefetchStats
- func (p *Prefetcher) RecordAccess(contextID uint64, pageID uint32)
- func (p *Prefetcher) ResetStats()
- func (p *Prefetcher) SetEnabled(enabled bool)
- func (p *Prefetcher) StartCleanupWorker(stopChan <-chan struct{})
- type RWLatch
- func (rw *RWLatch) GetReaderCount() uint32
- func (rw *RWLatch) GetStats() RWLatchStats
- func (rw *RWLatch) GetWriterWaitingCount() uint32
- func (rw *RWLatch) IsWriterActive() bool
- func (rw *RWLatch) Lock()
- func (rw *RWLatch) RLock()
- func (rw *RWLatch) RUnlock()
- func (rw *RWLatch) TryLock() bool
- func (rw *RWLatch) TryRLock() bool
- func (rw *RWLatch) Unlock()
- type RWLatchStats
- type RecoveryManager
- func (rm *RecoveryManager) AnalysisPass() (map[uint64]bool, map[uint64]bool, error)
- func (rm *RecoveryManager) CreateCheckpoint() error
- func (rm *RecoveryManager) GetMetrics() *Metrics
- func (rm *RecoveryManager) GetRecoveryStats() RecoveryStats
- func (rm *RecoveryManager) Recover() error
- func (rm *RecoveryManager) RedoPass(committedTxns map[uint64]bool) (int, error)
- func (rm *RecoveryManager) UndoPass(uncommittedTxns map[uint64]bool) (int, error)
- type RecoveryStats
- type Replacer
- type ScanResult
- type SegmentStats
- type ShardedPageTable
- func (spt *ShardedPageTable) Clear()
- func (spt *ShardedPageTable) Delete(pageId uint32)
- func (spt *ShardedPageTable) ForEach(fn func(pageId uint32, page *Page) bool)
- func (spt *ShardedPageTable) Get(pageId uint32) (*Page, bool)
- func (spt *ShardedPageTable) GetAll() []*Page
- func (spt *ShardedPageTable) Put(pageId uint32, page *Page)
- func (spt *ShardedPageTable) Size() int
- type SkipListConfig
- type SkipListStats
- type SlotEntry
- type SlottedPage
- func (sp *SlottedPage) DeleteTuple(slotId uint16) error
- func (sp *SlottedPage) GetFreeSpaceSize() uint16
- func (sp *SlottedPage) GetTupleCount() uint16
- func (sp *SlottedPage) InsertTuple(tuple *Tuple) (uint16, error)
- func (sp *SlottedPage) ReadTuple(slotId uint16) (*Tuple, error)
- func (sp *SlottedPage) Serialize() []byte
- func (sp *SlottedPage) UpdateTuple(slotId uint16, newTuple *Tuple) error
- type Snapshot
- type StorageError
- func ErrDiskOperation(op string, err error) *StorageError
- func ErrDuplicateKey(op string, key int) *StorageError
- func ErrInvalidTxnState(op string, txnID uint64, state string) *StorageError
- func ErrKeyNotFound(op string, key int) *StorageError
- func ErrLogCorrupted(op string, lsn uint64) *StorageError
- func ErrNoFreePages(op string) *StorageError
- func ErrPageFull(op string, pageID uint32) *StorageError
- func ErrPageNotFound(op string, pageID uint32) *StorageError
- func ErrPagePinned(op string, pageID uint32, pinCount int) *StorageError
- func ErrTxnNotFound(op string, txnID uint64) *StorageError
- func NewStorageError(code ErrorCode, op, message string, err error) *StorageError
- type Transaction
- type TransactionManager
- func (tm *TransactionManager) Abort(txnID uint64) error
- func (tm *TransactionManager) Begin() (*Transaction, error)
- func (tm *TransactionManager) Close() error
- func (tm *TransactionManager) Commit(txnID uint64) error
- func (tm *TransactionManager) GetActiveTxns() []uint64
- func (tm *TransactionManager) GetGroupCommitStats() *GroupCommitStats
- func (tm *TransactionManager) GetMetrics() *Metrics
- func (tm *TransactionManager) GetSnapshot(txnID uint64) *Snapshot
- func (tm *TransactionManager) GetTransaction(txnID uint64) (*Transaction, bool)
- func (tm *TransactionManager) RecordUndo(txnID uint64, lsn uint64) error
- func (tm *TransactionManager) SetBufferPool(bpm *BufferPoolManager)
- type Tuple
- type TwoQReplacer
- type TwoQStats
- type TxnState
- type VacuumManager
- func (vm *VacuumManager) GetStats() VacuumStats
- func (vm *VacuumManager) ResetStats()
- func (vm *VacuumManager) StartAutoVacuum(interval time.Duration)
- func (vm *VacuumManager) StopAutoVacuum()
- func (vm *VacuumManager) UpdateHorizon() uint64
- func (vm *VacuumManager) VacuumAll() (*VacuumStats, error)
- func (vm *VacuumManager) VacuumPage(pageID uint32) (int, error)
- type VacuumStats
Constants ¶
const ( // B+ Tree configuration BPTreeOrder = 4 // Maximum number of children per node BPTreeMaxKeys = BPTreeOrder - 1 BPTreeMinKeys = (BPTreeOrder+1)/2 - 1 BPTreeLeafOrder = 4 BPTreeMaxLeafKeys = BPTreeLeafOrder - 1 )
const ( // DefaultSlotSize is the default size for each log slot (16KB) DefaultSlotSize = 16 * 1024 // DefaultLogBufferCapacity is the number of slots (must be power of 2) DefaultLogBufferCapacity = 256 // 256 * 16KB = 4MB total // MinLogBufferCapacity is the minimum buffer capacity MinLogBufferCapacity = 16 // MaxLogBufferCapacity is the maximum buffer capacity MaxLogBufferCapacity = 16384 // 16K slots * 16KB = 256MB )
const ( DefaultMaxSegmentSize = 64 * 1024 * 1024 // 64MB MaxRecyclePoolSize = 10 // Keep max 10 recycled segments )
const ( // Initial file size: 1GB (256K pages * 4KB) InitialFileSize = 1024 * 1024 * 1024 // Grow by 256MB when we run out of space FileGrowSize = 256 * 1024 * 1024 )
const ( PageHeaderSize = 8 // 4 * uint16 = 8 bytes SlotEntrySize = 4 // 2 * uint16 = 4 bytes )
const ( CompressedPageMagic = 0xC0DE CompressedHeaderSize = 12 MinCompressionThreshold = 100 // Minimum bytes saved to use compression )
const DefaultLogBufferSize = 4096 // 4KB buffer
const (
PageSize = 4096 // 4KB pages
)
const (
ParallelLogBufferSize = 16384 // 16KB buffer
)
Variables ¶
This section is empty.
Functions ¶
func CompressPageTransparent ¶
func CompressPageTransparent(data []byte, compressionType CompressionType) ([]byte, error)
CompressPageTransparent compresses a page and returns serialized form (compression + serialization).
func DecompressPage ¶
func DecompressPage(cp *CompressedPage) ([]byte, error)
DecompressPage decompresses a compressed page
func DecompressPageTransparent ¶
DecompressPageTransparent detects if page is compressed and decompresses if needed Returns original data if not compressed
func IsCompressedPage ¶
IsCompressedPage checks if the page data represents a compressed page
func IsErrorCode ¶
IsErrorCode checks if an error has a specific error code
func ParseSegmentID ¶
Helper function to parse segment ID from filename
func SerializeCompressedPage ¶
func SerializeCompressedPage(cp *CompressedPage) ([]byte, error)
SerializeCompressedPage serializes a compressed page to bytes
Types ¶
type ARCReplacer ¶
type ARCReplacer struct {
// contains filtered or unexported fields
}
ARCReplacer implements the Adaptive Replacement Cache algorithm ARC maintains four LRU lists: - T1: Recent cache hits (recency) - T2: Frequent cache hits (frequency) - B1: Ghost entries evicted from T1 (recent evictions) - B2: Ghost entries evicted from T2 (frequent evictions)
The algorithm adaptively adjusts the target size (p) between T1 and T2 based on cache hit patterns to optimize for the current workload.
func NewARCReplacer ¶
func NewARCReplacer(capacity int) *ARCReplacer
NewARCReplacer creates a new ARC cache replacer
func (*ARCReplacer) GetStats ¶
func (arc *ARCReplacer) GetStats() map[string]int
GetStats returns ARC-specific statistics
func (*ARCReplacer) Size ¶
func (arc *ARCReplacer) Size() uint32
Size returns the number of evictable pages
func (*ARCReplacer) Unpin ¶
func (arc *ARCReplacer) Unpin(frameID uint32)
Unpin marks a page as evictable and records the access
func (*ARCReplacer) Victim ¶
func (arc *ARCReplacer) Victim() (uint32, bool)
Victim selects a page for eviction
type AccessPattern ¶
type AccessPattern struct {
StartPageID uint32
LastPageID uint32
Stride int32 // Distance between consecutive accesses
AccessCount int // Number of accesses matching this stride
Confidence float64 // Confidence score (0.0 - 1.0)
LastAccessTime time.Time
History []uint32 // Recent page accesses for pattern analysis
HistorySize int // Max history size
}
AccessPattern represents a detected access pattern with stride
type AdaptiveFlushConfig ¶
type AdaptiveFlushConfig struct {
// Target dirty page ratio (0.0 - 1.0)
TargetDirtyRatio float64
// Maximum dirty ratio before aggressive flushing (0.0 - 1.0)
MaxDirtyRatio float64
// Flush check interval
CheckInterval time.Duration
// Minimum pages to flush per interval
MinFlushPages int
// Maximum pages to flush per interval
MaxFlushPages int
// PID controller gains
Kp float64 // Proportional gain
Ki float64 // Integral gain
Kd float64 // Derivative gain
// Adaptive parameters
EnableAdaptive bool // Enable adaptive adjustment
WriteRateThreshold float64 // Pages/sec to trigger adaptive mode
CheckpointInterval time.Duration
}
AdaptiveFlushConfig contains configuration for adaptive flushing
func DefaultAdaptiveFlushConfig ¶
func DefaultAdaptiveFlushConfig() AdaptiveFlushConfig
DefaultAdaptiveFlushConfig returns default configuration
type AdaptiveFlushStats ¶
type AdaptiveFlushStats struct {
FlushesIssued uint64
PagesFlushed uint64
CurrentRate float64 // Pages per second
DirtyRatio float64
AvgFlushTime time.Duration
LastAdjustment time.Time
}
AdaptiveFlushStats contains statistics about adaptive flushing
type AdaptiveFlusher ¶
type AdaptiveFlusher struct {
// contains filtered or unexported fields
}
func NewAdaptiveFlusher ¶
func NewAdaptiveFlusher(bp FlushableBufferPool, config AdaptiveFlushConfig) *AdaptiveFlusher
NewAdaptiveFlusher creates a new adaptive flusher
func (*AdaptiveFlusher) GetConfig ¶
func (af *AdaptiveFlusher) GetConfig() AdaptiveFlushConfig
GetConfig returns the current configuration
func (*AdaptiveFlusher) GetStats ¶
func (af *AdaptiveFlusher) GetStats() AdaptiveFlushStats
GetStats returns current statistics
func (*AdaptiveFlusher) IsRunning ¶
func (af *AdaptiveFlusher) IsRunning() bool
IsRunning returns whether the flusher is currently running
func (*AdaptiveFlusher) SetMaxDirtyRatio ¶
func (af *AdaptiveFlusher) SetMaxDirtyRatio(ratio float64) error
SetMaxDirtyRatio dynamically adjusts the maximum dirty ratio
func (*AdaptiveFlusher) SetTargetDirtyRatio ¶
func (af *AdaptiveFlusher) SetTargetDirtyRatio(ratio float64) error
SetTargetDirtyRatio dynamically adjusts the target dirty ratio
func (*AdaptiveFlusher) Start ¶
func (af *AdaptiveFlusher) Start() error
Start starts the adaptive flusher background goroutine
func (*AdaptiveFlusher) Stop ¶
func (af *AdaptiveFlusher) Stop() error
Stop stops the adaptive flusher
func (*AdaptiveFlusher) TriggerFlush ¶
func (af *AdaptiveFlusher) TriggerFlush(maxPages int) int
TriggerFlush manually triggers a flush cycle
type AdviceType ¶
type AdviceType int
AdviceType represents memory access advice
const ( AdviceNormal AdviceType = 0 // No special treatment AdviceRandom AdviceType = 1 // Random access pattern AdviceSequential AdviceType = 2 // Sequential access pattern AdviceWillNeed AdviceType = 3 // Will need these pages soon (prefetch) AdviceDontNeed AdviceType = 4 // Won't need these pages (can evict) )
type BPTreeEntry ¶
BPTreeEntry represents a key-value pair in the B+ Tree
type BPTreeIterator ¶
type BPTreeIterator struct {
// contains filtered or unexported fields
}
BPTreeIterator represents an iterator for B+ Tree
func (*BPTreeIterator) HasNext ¶
func (iter *BPTreeIterator) HasNext() bool
HasNext returns true if there are more elements
type BPTreeNode ¶
type BPTreeNode struct {
// contains filtered or unexported fields
}
BPTreeNode represents a node in the B+ Tree
func NewInternalNode ¶
func NewInternalNode() *BPTreeNode
NewInternalNode creates a new internal node
type BPTreeNodeType ¶
type BPTreeNodeType int
BPTreeNodeType represents the type of B+ Tree node
const ( LeafNode BPTreeNodeType = iota InternalNode )
type BPlusTree ¶
type BPlusTree struct {
// contains filtered or unexported fields
}
BPlusTree represents a B+ Tree index
func NewBPlusTree ¶
func NewBPlusTree(bpm *BufferPoolManager) (*BPlusTree, error)
NewBPlusTree creates a new B+ Tree
func (*BPlusTree) BulkLoad ¶
func (tree *BPlusTree) BulkLoad(entries []BPTreeEntry) error
BulkLoad efficiently builds a B+ Tree from sorted key-value pairs. Much faster than inserting keys one by one. Requirements: entries must be sorted by key in ascending order.
func (*BPlusTree) GetParallelScanStats ¶
func (tree *BPlusTree) GetParallelScanStats(startKey, endKey int64, config ParallelScanConfig) (ParallelRangeScanStats, error)
GetParallelScanStats analyzes a range scan and returns statistics
func (*BPlusTree) Insert ¶
Insert inserts a key-value pair into the B+ Tree Uses write locks for safe concurrent modifications
func (*BPlusTree) Iterator ¶
func (tree *BPlusTree) Iterator() (*BPTreeIterator, error)
Iterator creates a new iterator for the B+ Tree
func (*BPlusTree) ParallelRangeScan ¶
func (tree *BPlusTree) ParallelRangeScan(startKey, endKey int64, config ParallelScanConfig) (<-chan ScanResult, <-chan error, error)
ParallelRangeScan performs a parallel range scan on the B+ Tree Returns results through a channel, allowing streaming of large result sets
func (*BPlusTree) ParallelRangeScanCount ¶
func (tree *BPlusTree) ParallelRangeScanCount(startKey, endKey int64, config ParallelScanConfig) (int64, error)
ParallelRangeScanCount counts entries in a range using parallel workers. Faster than scan-and-count since we don't return values.
func (*BPlusTree) ParallelRangeScanOrdered ¶
func (tree *BPlusTree) ParallelRangeScanOrdered(startKey, endKey int64, config ParallelScanConfig) ([]BPTreeEntry, error)
ParallelRangeScanOrdered performs a parallel range scan with ordered results. Slower than unordered scan but maintains key order.
type BloomFilter ¶
type BloomFilter struct {
// contains filtered or unexported fields
}
BloomFilter is a space-efficient probabilistic data structure for membership testing Used at page level to quickly reject negative lookups without reading page data
func DeserializeBloomFilter ¶
func DeserializeBloomFilter(data []byte) (*BloomFilter, error)
DeserializeBloomFilter reconstructs a Bloom filter from serialized bytes
func NewBloomFilter ¶
func NewBloomFilter(config BloomFilterConfig) *BloomFilter
NewBloomFilter creates a new Bloom filter with the given configuration
func NewBloomFilterFromBytes ¶
func NewBloomFilterFromBytes(data []byte, numHashes uint32) *BloomFilter
NewBloomFilterFromBytes reconstructs a Bloom filter from serialized bytes
func (*BloomFilter) Clear ¶
func (bf *BloomFilter) Clear()
Clear resets the Bloom filter to empty state
func (*BloomFilter) Clone ¶
func (bf *BloomFilter) Clone() *BloomFilter
Clone creates a deep copy of the Bloom filter
func (*BloomFilter) EstimateFalsePositiveRate ¶
func (bf *BloomFilter) EstimateFalsePositiveRate() float64
EstimateFalsePositiveRate calculates the current false positive rate Based on: p ≈ (1 - e^(-kn/m))^k where k = num hashes, n = num inserts, m = num bits
func (*BloomFilter) GetBytes ¶
func (bf *BloomFilter) GetBytes() []byte
GetBytes returns the raw byte array (for serialization)
func (*BloomFilter) GetFillRatio ¶
func (bf *BloomFilter) GetFillRatio() float64
GetFillRatio returns the fraction of bits set to 1
func (*BloomFilter) GetNumBits ¶
func (bf *BloomFilter) GetNumBits() uint32
GetNumBits returns the total number of bits in the filter
func (*BloomFilter) GetNumHashes ¶
func (bf *BloomFilter) GetNumHashes() uint32
GetNumHashes returns the number of hash functions used
func (*BloomFilter) GetNumInserts ¶
func (bf *BloomFilter) GetNumInserts() uint32
GetNumInserts returns the number of elements inserted
func (*BloomFilter) Insert ¶
func (bf *BloomFilter) Insert(key []byte)
Insert adds an element to the Bloom filter
func (*BloomFilter) Intersect ¶
func (bf *BloomFilter) Intersect(other *BloomFilter) error
Intersect performs bitwise AND with another Bloom filter Both filters must have the same configuration (numBits, numHashes)
func (*BloomFilter) MayContain ¶
func (bf *BloomFilter) MayContain(key []byte) bool
MayContain checks if an element might be in the set Returns true if element might exist (could be false positive) Returns false if element definitely does not exist (no false negatives)
func (*BloomFilter) Serialize ¶
func (bf *BloomFilter) Serialize() []byte
Serialize converts the Bloom filter to a byte slice for storage Format: [numBits:4][numHashes:4][numInserts:4][bits:variable]
func (*BloomFilter) Union ¶
func (bf *BloomFilter) Union(other *BloomFilter) error
Union merges another Bloom filter into this one (bitwise OR) Both filters must have the same configuration (numBits, numHashes)
type BloomFilterConfig ¶
type BloomFilterConfig struct {
ExpectedElements uint32 // Expected number of elements to insert
FalsePositiveRate float64 // Target false positive rate (e.g., 0.01 = 1%)
}
BloomFilterConfig holds configuration for creating a Bloom filter
func DefaultBloomFilterConfig ¶
func DefaultBloomFilterConfig() BloomFilterConfig
DefaultBloomFilterConfig returns a default configuration suitable for page-level filtering Assumes ~100 keys per page with 1% false positive rate
type BufferPoolManager ¶
type BufferPoolManager struct {
// contains filtered or unexported fields
}
BufferPoolManager manages a pool of pages in memory
func NewBufferPoolManager ¶
func NewBufferPoolManager(poolSize uint32, diskManager *DiskManager) (*BufferPoolManager, error)
NewBufferPoolManager creates a new buffer pool manager
func NewBufferPoolManagerWithReplacer ¶
func NewBufferPoolManagerWithReplacer(poolSize uint32, diskManager *DiskManager, replacerAlg string) (*BufferPoolManager, error)
NewBufferPoolManagerWithReplacer creates a buffer pool with a specific replacement policy
func (*BufferPoolManager) FetchPage ¶
func (bpm *BufferPoolManager) FetchPage(pageId uint32) (*Page, error)
FetchPage fetches a page from disk if not in buffer pool, or returns existing page
func (*BufferPoolManager) FlushAllPages ¶
func (bpm *BufferPoolManager) FlushAllPages() error
FlushAllPages flushes all dirty pages to disk using batch writes
func (*BufferPoolManager) FlushAllPagesParallel ¶
func (bpm *BufferPoolManager) FlushAllPagesParallel(workers int) error
FlushAllPagesParallel flushes all dirty pages concurrently with configurable parallelism
func (*BufferPoolManager) FlushPage ¶
func (bpm *BufferPoolManager) FlushPage(pageId uint32) error
FlushPage explicitly flushes a page to disk
func (*BufferPoolManager) GetCapacity ¶
func (bpm *BufferPoolManager) GetCapacity() int
GetCapacity returns the total capacity of the buffer pool
func (*BufferPoolManager) GetDirtyPageCount ¶
func (bpm *BufferPoolManager) GetDirtyPageCount() int
GetDirtyPageCount returns the number of dirty pages in the buffer pool
func (*BufferPoolManager) GetDirtyPages ¶
func (bpm *BufferPoolManager) GetDirtyPages(maxPages int) []uint32
GetDirtyPages returns up to maxPages dirty page IDs
func (*BufferPoolManager) GetMetrics ¶
func (bpm *BufferPoolManager) GetMetrics() *Metrics
GetMetrics returns the buffer pool metrics
func (*BufferPoolManager) GetPoolSize ¶
func (bpm *BufferPoolManager) GetPoolSize() uint32
GetPoolSize returns the pool size
func (*BufferPoolManager) IsPageInPool ¶
func (bpm *BufferPoolManager) IsPageInPool(pageID uint32) bool
IsPageInPool checks if a page is already in the buffer pool to avoid redundant prefetches.
func (*BufferPoolManager) NewPage ¶
func (bpm *BufferPoolManager) NewPage() (*Page, error)
NewPage creates a new page, allocates it on disk, and brings it into the buffer pool
func (*BufferPoolManager) SetLogManager ¶
func (bpm *BufferPoolManager) SetLogManager(logManager *LogManager)
SetLogManager sets the log manager for WAL integration
type ClockProReplacer ¶
type ClockProReplacer struct {
// contains filtered or unexported fields
}
ClockProReplacer implements the Clock-Pro cache replacement algorithm Clock-Pro is a scan-resistant algorithm that improves upon Clock by tracking both recency and frequency. It maintains three "hands" that move through a circular list:
1. Hand_cold: Points to the oldest cold page (candidate for eviction) 2. Hand_hot: Points to the oldest hot page (for demoting to cold) 3. Hand_test: Points to the oldest test page (for pruning history)
Pages can be in three states: - Cold: Recently accessed once (on probation) - Hot: Accessed multiple times (protected from eviction) - Test: Ghost entry tracking recent evictions
The algorithm adaptively adjusts the sizes of hot and cold regions based on the workload, making it effective for both recency and frequency patterns.
func NewClockProReplacer ¶
func NewClockProReplacer(capacity int) *ClockProReplacer
NewClockProReplacer creates a new Clock-Pro cache replacer
func (*ClockProReplacer) GetStats ¶
func (cp *ClockProReplacer) GetStats() ClockProStats
GetStats returns statistics about the Clock-Pro replacer
func (*ClockProReplacer) Pin ¶
func (cp *ClockProReplacer) Pin(frameID uint32)
Pin marks a page as in-use
func (*ClockProReplacer) Size ¶
func (cp *ClockProReplacer) Size() int
Size returns the number of evictable pages
func (*ClockProReplacer) Unpin ¶
func (cp *ClockProReplacer) Unpin(frameID uint32)
Unpin marks a page as evictable and records the access
func (*ClockProReplacer) Victim ¶
func (cp *ClockProReplacer) Victim() (uint32, bool)
Victim selects a page for eviction using the Clock-Pro algorithm
type ClockProStats ¶
type ClockProStats struct {
Capacity int // Total cache capacity
HotSize int // Current hot pages
HotMax int // Maximum hot pages (adaptive)
ColdSize int // Current cold pages
ColdMax int // Maximum cold pages
TestSize int // Current test (ghost) entries
TestMax int // Maximum test entries
TotalPages int // Total cached pages (hot + cold)
}
ClockProStats contains statistics about the Clock-Pro replacer
type CompressedLogRecord ¶
type CompressedLogRecord struct {
LSN uint64
PrevLSN uint64
TxnID uint64
Type LogType
PageID uint32
// Delta compression fields
DataLength uint16 // Total length of the data region
ChangeMask []byte // Bitmap indicating which bytes changed
DeltaData []byte // Only the changed bytes
// For non-update operations, store full data
FullData []byte
}
CompressedLogRecord represents a WAL entry with delta compression. Stores only changed bytes instead of full before/after images (5-10x compression).
func CompressLogRecord ¶
func CompressLogRecord(record *LogRecord) *CompressedLogRecord
CompressLogRecord compresses a standard log record using delta encoding
func DeserializeCompressedLogRecord ¶
func DeserializeCompressedLogRecord(data []byte) (*CompressedLogRecord, error)
DeserializeCompressedLogRecord creates CompressedLogRecord from bytes
func (*CompressedLogRecord) CompressionRatio ¶
func (clr *CompressedLogRecord) CompressionRatio() float64
CompressionRatio returns the compression ratio achieved
func (*CompressedLogRecord) Serialize ¶
func (clr *CompressedLogRecord) Serialize() []byte
Serialize converts CompressedLogRecord to bytes Format: LSN(8) | PrevLSN(8) | TxnID(8) | Type(1) | PageID(4) | DataLength(2) |
MaskLen(2) | ChangeMask | DeltaLen(2) | DeltaData | FullDataLen(2) | FullData
type CompressedPage ¶
type CompressedPage struct {
CompressionType CompressionType
UncompressedSize uint16
CompressedSize uint16
CompressedData []byte
OriginalChecksum uint32 // CRC32 of original data
}
CompressedPage represents a compressed page with metadata
func ChooseBestCompression ¶
func ChooseBestCompression(data []byte) (*CompressedPage, error)
ChooseBestCompression tries all algorithms and returns the best one
func CompressPage ¶
func CompressPage(data []byte, compressionType CompressionType) (*CompressedPage, error)
CompressPage compresses a page using the specified algorithm
func DeserializeCompressedPage ¶
func DeserializeCompressedPage(data []byte) (*CompressedPage, error)
DeserializeCompressedPage deserializes a compressed page from bytes
func (*CompressedPage) GetCompressionRatio ¶
func (cp *CompressedPage) GetCompressionRatio() float64
GetCompressionRatio returns the compression ratio (original size / compressed size)
func (*CompressedPage) GetSpaceSavings ¶
func (cp *CompressedPage) GetSpaceSavings() int
GetSpaceSavings returns bytes saved by compression
type CompressionType ¶
type CompressionType uint8
CompressionType represents the compression algorithm used
const ( CompressionNone CompressionType = 0 CompressionLZ4 CompressionType = 1 CompressionSnappy CompressionType = 2 )
type Config ¶
type Config struct {
// Buffer Pool Configuration
BufferPoolSize uint32 `json:"buffer_pool_size"` // Number of pages in buffer pool
CacheReplacer string `json:"cache_replacer"` // Cache replacement policy (lru, 2q, arc)
EnablePrefetching bool `json:"enable_prefetching"` // Enable sequential prefetching
// Disk Configuration
DataDirectory string `json:"data_directory"` // Directory for data files
PageSize uint32 `json:"page_size"` // Page size in bytes (default: 4096)
// WAL Configuration
WALDirectory string `json:"wal_directory"` // Directory for WAL files
WALEnabled bool `json:"wal_enabled"` // Whether WAL is enabled
WALParallel bool `json:"wal_parallel"` // Use parallel WAL for better concurrency
WALCompression bool `json:"wal_compression"` // Enable WAL compression
WALCompressionAlg string `json:"wal_compression_alg"` // Compression algorithm (delta, snappy, none)
// Transaction Configuration
MaxTransactions uint32 `json:"max_transactions"` // Maximum concurrent transactions
// Performance Configuration
EnableMetrics bool `json:"enable_metrics"` // Whether to collect performance metrics
LogLevel string `json:"log_level"` // Log level (debug, info, warn, error)
// Recovery Configuration
AutoRecovery bool `json:"auto_recovery"` // Whether to perform recovery on startup
}
Config holds storage engine configuration
func LoadConfigFromEnv ¶
func LoadConfigFromEnv() *Config
LoadConfigFromEnv loads configuration from environment variables Falls back to default values if environment variables are not set
func LoadConfigFromFile ¶
LoadConfigFromFile loads configuration from a JSON file
func (*Config) SaveToFile ¶
SaveToFile saves the configuration to a JSON file
type DiskManager ¶
type DiskManager struct {
// contains filtered or unexported fields
}
func NewDiskManager ¶
func NewDiskManager(fileName string) (*DiskManager, error)
NewDiskManager creates a new disk manager that manages pages in a file
func (*DiskManager) AllocatePage ¶
func (dm *DiskManager) AllocatePage() uint32
AllocatePage allocates a new page and returns its page ID
func (*DiskManager) Close ¶
func (dm *DiskManager) Close() error
Close closes the disk manager and its underlying file
func (*DiskManager) ReadPage ¶
func (dm *DiskManager) ReadPage(pageId uint32) ([]byte, error)
ReadPage reads a page from disk given its page ID
func (*DiskManager) WritePage ¶
func (dm *DiskManager) WritePage(pageId uint32, data []byte) error
WritePage writes a page to disk at the specified page ID
func (*DiskManager) WritePagesV ¶
func (dm *DiskManager) WritePagesV(writes []PageWrite) error
WritePagesV writes multiple pages in a single batch operation. More efficient than writing pages one-at-a-time.
type ErrorCode ¶
type ErrorCode int
ErrorCode represents different types of storage errors
const ( // Generic errors ErrCodeUnknown ErrorCode = iota ErrCodeInternal // Page errors ErrCodePageNotFound ErrCodePageFull ErrCodeInvalidPageID ErrCodePageCorrupted // Buffer pool errors ErrCodeNoFreePages ErrCodePagePinned ErrCodeInvalidPin // Transaction errors ErrCodeTxnNotFound ErrCodeTxnAlreadyCommitted ErrCodeTxnAlreadyAborted ErrCodeInvalidTxnState // Recovery errors ErrCodeRecoveryFailed ErrCodeLogCorrupted ErrCodeCheckpointFailed // B+ Tree errors ErrCodeKeyNotFound ErrCodeDuplicateKey ErrCodeInvalidKey // Disk errors ErrCodeDiskFull ErrCodeDiskReadFailed ErrCodeDiskWriteFailed ErrCodeFileNotFound )
func GetErrorCode ¶
GetErrorCode returns the error code from an error, or ErrCodeUnknown
type FlushableBufferPool ¶
type FlushableBufferPool interface {
GetDirtyPageCount() int
GetCapacity() int
GetDirtyPages(maxPages int) []uint32
FlushPage(pageID uint32) error
}
FlushableBufferPool is the interface required by adaptive flusher
type GroupCommitManager ¶
type GroupCommitManager struct {
// contains filtered or unexported fields
}
GroupCommitManager batches multiple transaction commits into a single fsync, reducing I/O overhead. Instead of fsyncing on every commit, commits are grouped and persisted together.
func NewGroupCommitManager ¶
func NewGroupCommitManager(logManager *LogManager, maxBatchSize int, maxBatchDelay time.Duration) *GroupCommitManager
NewGroupCommitManager creates a new group commit manager
func (*GroupCommitManager) Commit ¶
func (gcm *GroupCommitManager) Commit(lsn uint64) error
Commit submits a transaction commit request and waits for it to be persisted
func (*GroupCommitManager) Shutdown ¶
func (gcm *GroupCommitManager) Shutdown()
Shutdown gracefully shuts down the group commit manager
func (*GroupCommitManager) Stats ¶
func (gcm *GroupCommitManager) Stats() GroupCommitStats
Stats returns statistics about group commit performance
type GroupCommitStats ¶
type GroupCommitStats struct {
TotalCommits uint64
TotalBatches uint64
TotalFsyncs uint64
AverageBatchSize float64
}
GroupCommitStats contains statistics about group commit performance
type Histogram ¶
type Histogram struct {
// contains filtered or unexported fields
}
Histogram tracks latency distribution with percentile support
func NewHistogram ¶
NewHistogram creates a new histogram with a max sample size
func (*Histogram) Percentile ¶
Percentile calculates the given percentile (0-100)
func (*Histogram) Snapshot ¶
func (h *Histogram) Snapshot() HistogramSnapshot
Snapshot captures current histogram statistics
type HistogramSnapshot ¶
type HistogramSnapshot struct {
Count int
Min float64
Max float64
Mean float64
P50 float64 // Median
P95 float64
P99 float64
P999 float64
}
Snapshot returns current percentile statistics
type LRUNode ¶
type LRUNode struct {
// contains filtered or unexported fields
}
LRUNode represents a node in the LRU list
type LRUReplacer ¶
type LRUReplacer struct {
// contains filtered or unexported fields
}
LRUReplacer implements LRU (Least Recently Used) replacement policy
func NewLRUReplacer ¶
func NewLRUReplacer(capacity uint32) *LRUReplacer
NewLRUReplacer creates a new LRU replacer
func (*LRUReplacer) Pin ¶
func (lru *LRUReplacer) Pin(frameID uint32)
Pin removes a frame from the LRU replacer Called when a page is pinned (in use, not evictable)
func (*LRUReplacer) Size ¶
func (lru *LRUReplacer) Size() uint32
Size returns the number of evictable frames
func (*LRUReplacer) Unpin ¶
func (lru *LRUReplacer) Unpin(frameID uint32)
Unpin adds a frame to the LRU replacer Called when a page is unpinned (available for eviction)
func (*LRUReplacer) Victim ¶
func (lru *LRUReplacer) Victim() (uint32, bool)
Victim selects a frame to evict using LRU policy Returns the frame ID and true if a victim was found, or 0 and false if no victim available
type LockFreeLogBuffer ¶
type LockFreeLogBuffer struct {
// contains filtered or unexported fields
}
LockFreeLogBuffer is a lock-free circular buffer for WAL records Uses CAS operations for thread-safe concurrent append without blocking
Design: - Fixed-size circular buffer with power-of-2 capacity - Two atomic counters: head (read position) and tail (write position) - Writers reserve space via atomic tail increment - Readers wait for writers to finish via sequence numbers - Supports concurrent appends with linearizability
Memory Layout: Each slot: [sequence(8) | lsn(8) | size(4) | data(var)]
func NewLockFreeLogBuffer ¶
func NewLockFreeLogBuffer(config LogBufferConfig) (*LockFreeLogBuffer, error)
NewLockFreeLogBuffer creates a new lock-free log buffer
func (*LockFreeLogBuffer) Append ¶
Append adds a log record to the buffer using lock-free CAS. Returns (slotIndex, sequence, error).
func (*LockFreeLogBuffer) Available ¶
func (lb *LockFreeLogBuffer) Available() uint64
Available returns the number of records available to read
func (*LockFreeLogBuffer) Deserialize ¶
Deserialize deserializes a log record Returns (lsn, data, bytesRead, error)
func (*LockFreeLogBuffer) Free ¶
func (lb *LockFreeLogBuffer) Free() uint64
Free returns the number of free slots available for writing
func (*LockFreeLogBuffer) IsEmpty ¶
func (lb *LockFreeLogBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty
func (*LockFreeLogBuffer) IsFull ¶
func (lb *LockFreeLogBuffer) IsFull() bool
IsFull returns true if the buffer is full
func (*LockFreeLogBuffer) Read ¶
func (lb *LockFreeLogBuffer) Read() (uint64, uint64, []byte, error)
Read reads the next available log record Returns (slotIndex, lsn, data, error) Returns nil data when no records available
func (*LockFreeLogBuffer) ReadBatch ¶
func (lb *LockFreeLogBuffer) ReadBatch(maxRecords int) ([]LogBufferRecord, error)
ReadBatch reads multiple records in a batch (up to maxRecords) Returns slice of (lsn, data) tuples
func (*LockFreeLogBuffer) Reset ¶
func (lb *LockFreeLogBuffer) Reset()
Reset clears the buffer (not thread-safe, use only when no concurrent access)
func (*LockFreeLogBuffer) Serialize ¶
func (lb *LockFreeLogBuffer) Serialize(lsn uint64, data []byte) []byte
Serialize serializes a log record for storage Format: [lsn(8) | size(4) | data(var)]
func (*LockFreeLogBuffer) Stats ¶
func (lb *LockFreeLogBuffer) Stats() LogBufferStats
Stats returns buffer statistics
type LockFreeSkipList ¶
type LockFreeSkipList struct {
// contains filtered or unexported fields
}
LockFreeSkipList implements a lock-free skip list using CAS operations Based on the algorithm by Herlihy, Lev, Luchangco, and Shavit
func NewLockFreeSkipList ¶
func NewLockFreeSkipList(config SkipListConfig) *LockFreeSkipList
NewLockFreeSkipList creates a new lock-free skip list
func (*LockFreeSkipList) Clear ¶
func (sl *LockFreeSkipList) Clear()
Clear removes all elements (not lock-free, for testing only)
func (*LockFreeSkipList) CompactMemory ¶
func (sl *LockFreeSkipList) CompactMemory() int
CompactMemory helps GC by clearing deleted nodes (not lock-free)
func (*LockFreeSkipList) Delete ¶
func (sl *LockFreeSkipList) Delete(key []byte) bool
Delete removes a key from the skip list
func (*LockFreeSkipList) Insert ¶
func (sl *LockFreeSkipList) Insert(key []byte, value []byte) bool
Insert adds a key-value pair to the skip list
func (*LockFreeSkipList) Length ¶
func (sl *LockFreeSkipList) Length() int64
Length returns the approximate number of elements
func (*LockFreeSkipList) MaxKey ¶
func (sl *LockFreeSkipList) MaxKey() ([]byte, bool)
MaxKey returns the largest key (if any)
func (*LockFreeSkipList) MinKey ¶
func (sl *LockFreeSkipList) MinKey() ([]byte, bool)
MinKey returns the smallest key (if any)
func (*LockFreeSkipList) Range ¶
func (sl *LockFreeSkipList) Range(start, end []byte, callback func(key, value []byte) bool)
Range iterates over keys in sorted order within [start, end]. Snapshot iteration may miss concurrent updates.
func (*LockFreeSkipList) RangeScan ¶
func (sl *LockFreeSkipList) RangeScan(start, end []byte) []KeyValue
RangeScan returns all key-value pairs in [start, end]
func (*LockFreeSkipList) Search ¶
func (sl *LockFreeSkipList) Search(key []byte) ([]byte, bool)
Search looks up a key in the skip list
func (*LockFreeSkipList) Stats ¶
func (sl *LockFreeSkipList) Stats() SkipListStats
Stats returns skip list statistics
type LogBufferConfig ¶
type LogBufferConfig struct {
SlotSize int // Size of each slot in bytes
Capacity int // Number of slots (must be power of 2)
FlushOnFull bool // Auto-flush when buffer is full
}
LogBufferConfig contains configuration for the lock-free log buffer
func DefaultLogBufferConfig ¶
func DefaultLogBufferConfig() LogBufferConfig
DefaultLogBufferConfig returns default configuration
type LogBufferRecord ¶
LogBufferRecord represents a single record from the buffer
type LogBufferStats ¶
type LogBufferStats struct {
Capacity uint64 // Total capacity (number of slots)
SlotSize uint64 // Size of each slot in bytes
Available uint64 // Number of records available to read
Free uint64 // Number of free slots
Appends uint64 // Total appends
Reads uint64 // Total reads
Wraps uint64 // Number of times buffer wrapped
Contentions uint64 // Number of CAS contentions
MaxOccupancy uint64 // Maximum occupancy observed
Utilization float64 // Current utilization percentage
}
LogBufferStats contains statistics about the log buffer
type LogManager ¶
type LogManager struct {
// contains filtered or unexported fields
}
LogManager manages the write-ahead log Now supports both serial and parallel modes for better concurrency
func NewLogManager ¶
func NewLogManager(logFileName string) (*LogManager, error)
NewLogManager creates a new log manager
func NewLogManagerWithConfig ¶
func NewLogManagerWithConfig(logFileName string, useParallel bool, useCompression bool, compressionAlg string) (*LogManager, error)
NewLogManagerWithConfig creates a log manager with specific configuration
func (*LogManager) AppendLog ¶
func (lm *LogManager) AppendLog(record *LogRecord) (uint64, error)
AppendLog adds a log record and returns its LSN
func (*LogManager) Flush ¶
func (lm *LogManager) Flush() error
Flush writes buffered log records to disk
func (*LogManager) FlushToLSN ¶
func (lm *LogManager) FlushToLSN(lsn uint64) error
FlushToLSN flushes all log records up to and including the specified LSN
func (*LogManager) GetCurrentLSN ¶
func (lm *LogManager) GetCurrentLSN() uint64
GetCurrentLSN returns the current LSN
func (*LogManager) GetFlushedLSN ¶
func (lm *LogManager) GetFlushedLSN() uint64
GetFlushedLSN returns the last flushed LSN
func (*LogManager) ReadAllLogs ¶
func (lm *LogManager) ReadAllLogs() ([]*LogRecord, error)
ReadAllLogs reads all log records from the file
type LogRecord ¶
type LogRecord struct {
LSN uint64 // Log Sequence Number (unique, monotonic)
PrevLSN uint64 // Previous LSN for this transaction
TxnID uint64 // Transaction ID
Type LogType // Type of operation
PageID uint32 // Affected page
Offset uint16 // Offset within page
Length uint16 // Length of data
BeforeData []byte // Old value (for UNDO)
AfterData []byte // New value (for REDO)
}
LogRecord represents a single WAL entry
func DecompressLogRecord ¶
func DecompressLogRecord(compressed *CompressedLogRecord, beforeData []byte) (*LogRecord, error)
DecompressLogRecord reconstructs the original log record
func DeserializeLogRecord ¶
DeserializeLogRecord creates LogRecord from bytes
type LogRecycler ¶
type LogRecycler struct {
// contains filtered or unexported fields
}
LogRecycler manages WAL segment recycling
func NewLogRecycler ¶
func NewLogRecycler(baseDir string, baseFilename string) *LogRecycler
NewLogRecycler creates a new log recycler
func (*LogRecycler) Checkpoint ¶
func (lr *LogRecycler) Checkpoint(checkpointLSN uint64) error
Checkpoint marks a checkpoint LSN, allowing older segments to be archived
func (*LogRecycler) DeleteOldSegments ¶
func (lr *LogRecycler) DeleteOldSegments(beforeLSN uint64) (int, error)
DeleteOldSegments permanently deletes segments older than the given LSN
func (*LogRecycler) GetOrCreateActiveSegment ¶
func (lr *LogRecycler) GetOrCreateActiveSegment() (*LogSegment, error)
GetOrCreateActiveSegment returns the current active segment, creating new one if needed
func (*LogRecycler) GetSegmentFilename ¶
func (lr *LogRecycler) GetSegmentFilename(segmentID uint32) string
GetSegmentFilename generates a segment filename
func (*LogRecycler) GetStats ¶
func (lr *LogRecycler) GetStats() SegmentStats
func (*LogRecycler) ListSegments ¶
func (lr *LogRecycler) ListSegments() ([]string, error)
ListSegments returns all segment files in the directory
func (*LogRecycler) LoadExistingSegments ¶
func (lr *LogRecycler) LoadExistingSegments() error
LoadExistingSegments scans directory and loads existing segments
func (*LogRecycler) RecycleArchivedSegments ¶
func (lr *LogRecycler) RecycleArchivedSegments() (int, error)
RecycleArchivedSegments moves archived segments to recycle pool
func (*LogRecycler) SetMaxSegmentSize ¶
func (lr *LogRecycler) SetMaxSegmentSize(maxSize int64)
SetMaxSegmentSize configures the maximum segment size
func (*LogRecycler) WriteToSegment ¶
func (lr *LogRecycler) WriteToSegment(data []byte, lsn uint64) error
WriteToSegment writes data to the current active segment
type LogSegment ¶
type LogSegment struct {
SegmentID uint32
FilePath string
File *os.File
Size int64
MinLSN uint64 // Minimum LSN in this segment
MaxLSN uint64 // Maximum LSN in this segment
IsActive bool // Currently being written to
IsArchived bool // Can be recycled after checkpoint
}
LogSegment represents a WAL segment file
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics tracks storage engine performance metrics
func (*Metrics) GetBPTreeInsertLatency ¶
func (m *Metrics) GetBPTreeInsertLatency() HistogramSnapshot
GetBPTreeInsertLatency returns snapshot of B+ tree insert latency distribution
func (*Metrics) GetBPTreeSearchLatency ¶
func (m *Metrics) GetBPTreeSearchLatency() HistogramSnapshot
GetBPTreeSearchLatency returns snapshot of B+ tree search latency distribution
func (*Metrics) GetCacheHitRate ¶
func (*Metrics) GetCacheHits ¶
func (*Metrics) GetCacheMisses ¶
func (*Metrics) GetDirtyPageFlushes ¶
func (*Metrics) GetPageEvictions ¶
func (*Metrics) GetPageFetchLatency ¶
func (m *Metrics) GetPageFetchLatency() HistogramSnapshot
GetPageFetchLatency returns snapshot of page fetch latency distribution
func (*Metrics) GetPageFlushLatency ¶
func (m *Metrics) GetPageFlushLatency() HistogramSnapshot
GetPageFlushLatency returns snapshot of page flush latency distribution
func (*Metrics) GetRecoveries ¶
func (*Metrics) GetRedoOps ¶
func (*Metrics) GetTxnCommitLatency ¶
func (m *Metrics) GetTxnCommitLatency() HistogramSnapshot
GetTxnCommitLatency returns snapshot of transaction commit latency distribution
func (*Metrics) GetTxnsAborted ¶
func (*Metrics) GetTxnsCommitted ¶
func (*Metrics) GetTxnsStarted ¶
func (*Metrics) GetUndoOps ¶
func (*Metrics) LogMetrics ¶
LogMetrics logs all metrics using structured logging
func (*Metrics) RecordBPTreeInsertLatency ¶
RecordBPTreeInsertLatency records the latency of a B+ tree insert
func (*Metrics) RecordBPTreeSearchLatency ¶
RecordBPTreeSearchLatency records the latency of a B+ tree search
func (*Metrics) RecordCacheHit ¶
func (m *Metrics) RecordCacheHit()
func (*Metrics) RecordCacheMiss ¶
func (m *Metrics) RecordCacheMiss()
func (*Metrics) RecordDirtyPageFlush ¶
func (m *Metrics) RecordDirtyPageFlush()
func (*Metrics) RecordPageEviction ¶
func (m *Metrics) RecordPageEviction()
func (*Metrics) RecordPageFetchLatency ¶
RecordPageFetchLatency records the latency of a page fetch operation
func (*Metrics) RecordPageFlushLatency ¶
RecordPageFlushLatency records the latency of a page flush operation
func (*Metrics) RecordRecovery ¶
func (m *Metrics) RecordRecovery()
func (*Metrics) RecordRedoOp ¶
func (m *Metrics) RecordRedoOp()
func (*Metrics) RecordTxnAbort ¶
func (m *Metrics) RecordTxnAbort()
func (*Metrics) RecordTxnCommit ¶
func (m *Metrics) RecordTxnCommit()
func (*Metrics) RecordTxnCommitLatency ¶
RecordTxnCommitLatency records the latency of a transaction commit
func (*Metrics) RecordTxnStart ¶
func (m *Metrics) RecordTxnStart()
func (*Metrics) RecordUndoOp ¶
func (m *Metrics) RecordUndoOp()
type MmapDiskManager ¶
type MmapDiskManager struct {
// contains filtered or unexported fields
}
MmapDiskManager provides zero-copy disk access using memory-mapped files
func NewMmapDiskManager ¶
func NewMmapDiskManager(fileName string) (*MmapDiskManager, error)
NewMmapDiskManager creates a new memory-mapped disk manager
func (*MmapDiskManager) Advise ¶
func (dm *MmapDiskManager) Advise(pageId uint32, advice AdviceType) error
Advise provides hints to the OS about memory access patterns
func (*MmapDiskManager) AllocatePage ¶
func (dm *MmapDiskManager) AllocatePage() (uint32, error)
AllocatePage allocates a new page and returns its page ID
func (*MmapDiskManager) Close ¶
func (dm *MmapDiskManager) Close() error
Close unmaps memory and closes the file
func (*MmapDiskManager) Flush ¶
func (dm *MmapDiskManager) Flush() error
Flush ensures all dirty pages are written to disk
func (*MmapDiskManager) FlushPage ¶
func (dm *MmapDiskManager) FlushPage(pageId uint32) error
FlushPage flushes a specific page to disk
func (*MmapDiskManager) FlushPages ¶
func (dm *MmapDiskManager) FlushPages(pageIds []uint32) error
FlushPages flushes multiple pages to disk
func (*MmapDiskManager) GetFileSize ¶
func (dm *MmapDiskManager) GetFileSize() int64
GetFileSize returns the current file size
func (*MmapDiskManager) GetNextPageId ¶
func (dm *MmapDiskManager) GetNextPageId() uint32
GetNextPageId returns the next page ID that will be allocated
func (*MmapDiskManager) GetStats ¶
func (dm *MmapDiskManager) GetStats() MmapStats
func (*MmapDiskManager) ReadPage ¶
func (dm *MmapDiskManager) ReadPage(pageId uint32) ([]byte, error)
ReadPage reads a page from the memory-mapped region (zero-copy)
func (*MmapDiskManager) ReadPageCopy ¶
func (dm *MmapDiskManager) ReadPageCopy(pageId uint32) ([]byte, error)
ReadPageCopy reads a page and returns a copy (safe for modification)
func (*MmapDiskManager) WritePage ¶
func (dm *MmapDiskManager) WritePage(pageId uint32, data []byte) error
WritePage writes a page to the memory-mapped region
func (*MmapDiskManager) WritePagesV ¶
func (dm *MmapDiskManager) WritePagesV(writes []PageWrite) error
WritePagesV writes multiple pages in a single batch operation
type MmapStats ¶
type MmapStats struct {
FileSize int64
MappedSize int64
NextPageId uint32
UsedPages uint32
AllocatedMB int64
UsedMB int64
}
Stats returns statistics about the mmap disk manager
type Page ¶
type Page struct {
// contains filtered or unexported fields
}
Page represents a page in memory with metadata
type PageBloomFilter ¶
type PageBloomFilter struct {
// contains filtered or unexported fields
}
PageBloomFilter attaches a Bloom filter to a page for key lookups
func NewPageBloomFilter ¶
func NewPageBloomFilter(pageID uint32, config BloomFilterConfig) *PageBloomFilter
NewPageBloomFilter creates a new page-level Bloom filter
func (*PageBloomFilter) GetFilter ¶
func (pbf *PageBloomFilter) GetFilter() *BloomFilter
GetFilter returns the underlying Bloom filter
func (*PageBloomFilter) GetPageID ¶
func (pbf *PageBloomFilter) GetPageID() uint32
GetPageID returns the page ID this filter belongs to
func (*PageBloomFilter) GetStats ¶
func (pbf *PageBloomFilter) GetStats() PageBloomFilterStats
GetStats returns statistics about the page Bloom filter
func (*PageBloomFilter) InsertKey ¶
func (pbf *PageBloomFilter) InsertKey(key []byte)
InsertKey adds a key to the page's Bloom filter
func (*PageBloomFilter) MayContainKey ¶
func (pbf *PageBloomFilter) MayContainKey(key []byte) bool
MayContainKey checks if a key might exist in the page
type PageBloomFilterStats ¶
type PageBloomFilterStats struct {
PageID uint32
NumInserts uint32
NumBits uint32
NumHashes uint32
FillRatio float64
EstimatedFPR float64
}
PageBloomFilterStats holds statistics for a page Bloom filter
type PageCompressionStats ¶
type PageCompressionStats struct {
TotalPages uint64
CompressedPages uint64
UncompressedPages uint64
TotalBytesOriginal uint64
TotalBytesStored uint64
LZ4Count uint64
SnappyCount uint64
NoneCount uint64
}
PageCompressionStats tracks compression statistics
func (*PageCompressionStats) AddCompression ¶
func (pcs *PageCompressionStats) AddCompression(cp *CompressedPage)
AddCompression updates stats for a compression operation
func (*PageCompressionStats) GetCompressionPercentage ¶
func (pcs *PageCompressionStats) GetCompressionPercentage() float64
GetCompressionPercentage returns percentage of pages compressed
func (*PageCompressionStats) GetCompressionRatio ¶
func (pcs *PageCompressionStats) GetCompressionRatio() float64
GetCompressionRatio returns overall compression ratio
func (*PageCompressionStats) GetSpaceSavings ¶
func (pcs *PageCompressionStats) GetSpaceSavings() uint64
GetSpaceSavings returns total bytes saved
type PageDependencyGraph ¶
type PageDependencyGraph struct {
// contains filtered or unexported fields
}
PageDependencyGraph tracks dependencies between log records
func NewPageDependencyGraph ¶
func NewPageDependencyGraph() *PageDependencyGraph
NewPageDependencyGraph creates a new dependency graph
func (*PageDependencyGraph) AddRecord ¶
func (g *PageDependencyGraph) AddRecord(record *LogRecord)
AddRecord adds a log record to the dependency graph
func (*PageDependencyGraph) GetAllPages ¶
func (g *PageDependencyGraph) GetAllPages() []uint32
GetAllPages returns all page IDs in the graph
func (*PageDependencyGraph) GetPageRecords ¶
func (g *PageDependencyGraph) GetPageRecords(pageID uint32) []*LogRecord
GetPageRecords returns all records for a page
type PageHeader ¶
type PageHeader struct {
SlotCount uint16 // Number of slots in the directory (including deleted)
TupleCount uint16 // Number of active (non-deleted) tuples
SlotDirEnd uint16 // End of slot directory (grows right)
TupleDataStart uint16 // Start of tuple data area (grows left from end)
}
PageHeader contains metadata about the page
type PageLatch ¶
type PageLatch struct {
// contains filtered or unexported fields
}
PageLatch wraps a page with a lock-free RWLatch This replaces the sync.RWMutex in the Page struct
func NewPageLatch ¶
NewPageLatch creates a new page with lock-free latching
func (*PageLatch) GetData ¶
func (p *PageLatch) GetData() *SlottedPage
GetData returns the slotted page data
func (*PageLatch) GetLatchStats ¶
func (p *PageLatch) GetLatchStats() RWLatchStats
GetLatchStats returns latch statistics
func (*PageLatch) GetPinCount ¶
GetPinCount returns the pin count (atomic read)
type PageTableShard ¶
type PageTableShard struct {
// contains filtered or unexported fields
}
PageTableShard represents a single shard with its own lock
type ParallelLogManager ¶
type ParallelLogManager struct {
// contains filtered or unexported fields
}
ParallelLogManager provides lock-free LSN allocation using atomic operations, allowing multiple threads to append log records with reduced contention.
func NewParallelLogManager ¶
func NewParallelLogManager(logFileName string) (*ParallelLogManager, error)
NewParallelLogManager creates a new parallel log manager
func (*ParallelLogManager) AppendLog ¶
func (plm *ParallelLogManager) AppendLog(record *LogRecord) (uint64, error)
AppendLog adds a log record using atomic LSN allocation
func (*ParallelLogManager) Close ¶
func (plm *ParallelLogManager) Close() error
Close closes the parallel log manager
func (*ParallelLogManager) Flush ¶
func (plm *ParallelLogManager) Flush() error
Flush writes all buffered log records to disk
func (*ParallelLogManager) FlushToLSN ¶
func (plm *ParallelLogManager) FlushToLSN(lsn uint64) error
FlushToLSN flushes all log records up to the specified LSN
func (*ParallelLogManager) GetBufferStats ¶
func (plm *ParallelLogManager) GetBufferStats() int
GetBufferStats returns statistics about buffer usage
func (*ParallelLogManager) GetCurrentLSN ¶
func (plm *ParallelLogManager) GetCurrentLSN() uint64
GetCurrentLSN returns the current LSN
func (*ParallelLogManager) GetFlushedLSN ¶
func (plm *ParallelLogManager) GetFlushedLSN() uint64
GetFlushedLSN returns the last flushed LSN
func (*ParallelLogManager) ReadAllLogs ¶
func (plm *ParallelLogManager) ReadAllLogs() ([]*LogRecord, error)
ReadAllLogs reads all log records from the file
type ParallelRangeScanStats ¶
type ParallelRangeScanStats struct {
NumChunks int
NumWorkers int
EntriesScanned int64
ChunkSizes []int
}
ParallelRangeScanStats contains statistics about a parallel scan
type ParallelRecoveryConfig ¶
type ParallelRecoveryConfig struct {
NumWorkers int // Number of parallel workers
}
ParallelRecoveryConfig configures parallel recovery
func DefaultParallelRecoveryConfig ¶
func DefaultParallelRecoveryConfig() ParallelRecoveryConfig
DefaultParallelRecoveryConfig returns default configuration
type ParallelRecoveryManager ¶
type ParallelRecoveryManager struct {
// contains filtered or unexported fields
}
ParallelRecoveryManager implements parallel ARIES recovery
func NewParallelRecoveryManager ¶
func NewParallelRecoveryManager(logManager *LogManager, config ParallelRecoveryConfig) *ParallelRecoveryManager
NewParallelRecoveryManager creates a new parallel recovery manager
func (*ParallelRecoveryManager) AnalysisPass ¶
AnalysisPass identifies committed and uncommitted transactions
func (*ParallelRecoveryManager) BuildDependencyGraphs ¶
func (rm *ParallelRecoveryManager) BuildDependencyGraphs( committedTxns map[uint64]bool, uncommittedTxns map[uint64]bool, redoGraph *PageDependencyGraph, undoGraph *PageDependencyGraph, ) error
BuildDependencyGraphs builds page-level dependency graphs
func (*ParallelRecoveryManager) GetMetrics ¶
func (rm *ParallelRecoveryManager) GetMetrics() *Metrics
GetMetrics returns the recovery manager metrics
func (*ParallelRecoveryManager) GetNumWorkers ¶
func (rm *ParallelRecoveryManager) GetNumWorkers() int
GetNumWorkers returns the number of workers
func (*ParallelRecoveryManager) GetRecoveryStats ¶
func (rm *ParallelRecoveryManager) GetRecoveryStats() RecoveryStats
GetRecoveryStats returns recovery statistics
func (*ParallelRecoveryManager) ParallelRecover ¶
func (rm *ParallelRecoveryManager) ParallelRecover() error
ParallelRecover performs parallel ARIES recovery
func (*ParallelRecoveryManager) ParallelRedoPass ¶
func (rm *ParallelRecoveryManager) ParallelRedoPass(graph *PageDependencyGraph) (int64, error)
ParallelRedoPass performs parallel redo
func (*ParallelRecoveryManager) ParallelUndoPass ¶
func (rm *ParallelRecoveryManager) ParallelUndoPass(graph *PageDependencyGraph) (int64, error)
ParallelUndoPass performs parallel undo
type ParallelScanConfig ¶
type ParallelScanConfig struct {
NumWorkers int // Number of parallel workers
ChunkSize int // Number of entries per chunk
OrderedResults bool // Whether to maintain order (slower)
EnablePrefetching bool // Whether to prefetch next chunks
}
ParallelScanConfig contains configuration for parallel range scans
func DefaultParallelScanConfig ¶
func DefaultParallelScanConfig() ParallelScanConfig
DefaultParallelScanConfig returns a sensible default configuration
type PipelineCommitManager ¶
type PipelineCommitManager struct {
// contains filtered or unexported fields
}
PipelineCommitManager implements a pipelined commit protocol that overlaps validation, WAL writes, and fsync operations across consecutive batches. While one batch waits for fsync, the next batch is validated and written.
func NewPipelineCommitManager ¶
func NewPipelineCommitManager(logManager *LogManager, txnManager *TransactionManager, maxBatchSize int, maxBatchDelay time.Duration) *PipelineCommitManager
NewPipelineCommitManager creates a new pipelined commit manager
func (*PipelineCommitManager) Commit ¶
func (pcm *PipelineCommitManager) Commit(txnID uint64, lsn uint64) error
Commit submits a transaction for pipelined commit
func (*PipelineCommitManager) GetAverageBatchSize ¶
func (pcm *PipelineCommitManager) GetAverageBatchSize() float64
GetAverageBatchSize returns average commits per batch
func (*PipelineCommitManager) GetAverageFsyncTime ¶
func (pcm *PipelineCommitManager) GetAverageFsyncTime() int64
GetAverageFsyncTime returns average fsync time per batch (nanoseconds)
func (*PipelineCommitManager) GetAverageValidationTime ¶
func (pcm *PipelineCommitManager) GetAverageValidationTime() int64
GetAverageValidationTime returns average validation time per batch (nanoseconds)
func (*PipelineCommitManager) GetAverageWriteTime ¶
func (pcm *PipelineCommitManager) GetAverageWriteTime() int64
GetAverageWriteTime returns average write time per batch (nanoseconds)
func (*PipelineCommitManager) GetPipelineEfficiency ¶
func (pcm *PipelineCommitManager) GetPipelineEfficiency() float64
GetPipelineEfficiency returns ratio of actual work time to elapsed time Higher is better - indicates pipeline is keeping stages busy
func (*PipelineCommitManager) GetStats ¶
func (pcm *PipelineCommitManager) GetStats() PipelineStatsSnapshot
GetStats returns current pipeline statistics Returns a snapshot with atomic loads to avoid race conditions
func (*PipelineCommitManager) GetTotalPipelineTime ¶
func (pcm *PipelineCommitManager) GetTotalPipelineTime() int64
GetTotalPipelineTime returns total time spent in all stages (nanoseconds)
func (*PipelineCommitManager) Shutdown ¶
func (pcm *PipelineCommitManager) Shutdown()
Shutdown stops the pipeline and waits for in-flight batches
type PipelineStage ¶
type PipelineStage int
PipelineStage represents a stage in the transaction commit pipeline
const ( StageValidation PipelineStage = iota // Check transaction can commit StageWrite // Write commit record to log buffer StageFsync // Flush log to disk )
type PipelineStats ¶
type PipelineStats struct {
TotalCommits atomic.Uint64 // Total commits processed
TotalBatches atomic.Uint64 // Total batches processed
TotalFsyncs atomic.Uint64 // Total fsync operations
ValidationTime atomic.Int64 // Total time in validation (ns)
WriteTime atomic.Int64 // Total time in write (ns)
FsyncTime atomic.Int64 // Total time in fsync (ns)
PipelineUtilization atomic.Int64 // Percentage of time with work in flight
}
PipelineStats tracks pipeline performance metrics
type PipelineStatsSnapshot ¶ added in v2.9.0
type PipelineStatsSnapshot struct {
TotalCommits uint64 // Total commits processed
TotalBatches uint64 // Total batches processed
TotalFsyncs uint64 // Total fsync operations
ValidationTime int64 // Total time in validation (ns)
WriteTime int64 // Total time in write (ns)
FsyncTime int64 // Total time in fsync (ns)
PipelineUtilization int64 // Percentage of time with work in flight
}
PipelineStatsSnapshot is a plain-value snapshot of pipeline statistics
type PrefetchStats ¶
type PrefetchStats struct {
PatternsDetected uint64
PagesPrefetched uint64
PrefetchHits uint64 // Pages used before eviction
PrefetchMisses uint64 // Pages evicted before use
PrefetchQueueFull uint64
AvgConfidence float64 // Average confidence of triggered prefetches
StridesDetected uint64 // Number of non-sequential strides detected
}
PrefetchStats tracks prefetching effectiveness
type Prefetcher ¶
type Prefetcher struct {
// contains filtered or unexported fields
}
Prefetcher detects sequential access patterns and prefetches pages
func NewPrefetcher ¶
func NewPrefetcher(bpm *BufferPoolManager) *Prefetcher
NewPrefetcher creates a new prefetcher for the given buffer pool
func (*Prefetcher) Cleanup ¶
func (p *Prefetcher) Cleanup()
Cleanup removes stale patterns (called periodically)
func (*Prefetcher) ClearPattern ¶
func (p *Prefetcher) ClearPattern(contextID uint64)
ClearPattern removes pattern tracking for a context (e.g., after transaction commit)
func (*Prefetcher) Configure ¶
func (p *Prefetcher) Configure(detectionThreshold, prefetchDistance int)
Configure sets prefetcher parameters
func (*Prefetcher) GetStats ¶
func (p *Prefetcher) GetStats() PrefetchStats
GetStats returns current prefetching statistics
func (*Prefetcher) RecordAccess ¶
func (p *Prefetcher) RecordAccess(contextID uint64, pageID uint32)
RecordAccess records a page access and detects patterns with stride and confidence contextID should be transaction ID or goroutine ID
func (*Prefetcher) ResetStats ¶
func (p *Prefetcher) ResetStats()
ResetStats resets prefetching statistics
func (*Prefetcher) SetEnabled ¶
func (p *Prefetcher) SetEnabled(enabled bool)
Enable or disable prefetching
func (*Prefetcher) StartCleanupWorker ¶
func (p *Prefetcher) StartCleanupWorker(stopChan <-chan struct{})
StartCleanupWorker starts a background goroutine to clean up stale patterns
type RWLatch ¶
type RWLatch struct {
// contains filtered or unexported fields
}
RWLatch provides lock-free reader-writer synchronization
func (*RWLatch) GetReaderCount ¶
GetReaderCount returns the current number of active readers (for testing)
func (*RWLatch) GetStats ¶
func (rw *RWLatch) GetStats() RWLatchStats
GetStats returns statistics about the latch state
func (*RWLatch) GetWriterWaitingCount ¶
GetWriterWaitingCount returns the number of writers waiting (for testing)
func (*RWLatch) IsWriterActive ¶
IsWriterActive returns true if a writer currently holds the latch (for testing)
func (*RWLatch) Lock ¶
func (rw *RWLatch) Lock()
Lock acquires a write lock. Only one writer can hold the latch, and no readers can be active.
func (*RWLatch) RLock ¶
func (rw *RWLatch) RLock()
RLock acquires a read lock Multiple readers can hold the latch simultaneously
func (*RWLatch) TryLock ¶
TryLock attempts to acquire a write lock without blocking Returns true if successful, false otherwise
type RWLatchStats ¶
type RWLatchStats struct {
ReaderCount uint32 // Number of active readers
WriterActive bool // Is a writer active
WriterWaitingCount uint32 // Number of writers waiting
}
RWLatchStats contains statistics about a RWLatch
type RecoveryManager ¶
type RecoveryManager struct {
// contains filtered or unexported fields
}
RecoveryManager implements the ARIES recovery algorithm
func NewRecoveryManager ¶
func NewRecoveryManager(logManager *LogManager) *RecoveryManager
NewRecoveryManager creates a new recovery manager
func (*RecoveryManager) AnalysisPass ¶
AnalysisPass identifies committed and uncommitted transactions
func (*RecoveryManager) CreateCheckpoint ¶
func (rm *RecoveryManager) CreateCheckpoint() error
CreateCheckpoint creates a checkpoint in the log
func (*RecoveryManager) GetMetrics ¶
func (rm *RecoveryManager) GetMetrics() *Metrics
GetMetrics returns the recovery manager metrics
func (*RecoveryManager) GetRecoveryStats ¶
func (rm *RecoveryManager) GetRecoveryStats() RecoveryStats
GetRecoveryStats returns recovery statistics
func (*RecoveryManager) Recover ¶
func (rm *RecoveryManager) Recover() error
Recover performs full ARIES recovery (Analysis, Redo, Undo)
type RecoveryStats ¶
type RecoveryStats struct {
CommittedTxns int
UncommittedTxns int
RedoOperations int
UndoOperations int
}
RecoveryStats tracks recovery statistics
type Replacer ¶
type Replacer interface {
// Victim selects a frame to evict
// Returns the frame ID and true if a victim was found, false otherwise
Victim() (uint32, bool)
// Pin marks a frame as in-use (not evictable)
Pin(frameID uint32)
// Unpin marks a frame as available for eviction
Unpin(frameID uint32)
// Size returns the number of evictable frames
Size() uint32
}
Replacer interface for page replacement policies Allows different algorithms (LRU, 2Q, ARC, etc.)
func NewReplacer ¶
NewReplacer creates a replacer based on the specified algorithm
type ScanResult ¶
type ScanResult struct {
Key int64
Value int64
Chunk int // Chunk number for ordering
Index int // Index within chunk
}
ScanResult represents a result from a parallel scan
type SegmentStats ¶
type SegmentStats struct {
TotalSegments int
ActiveSegments int
ArchivedSegments int
RecycledSegments int
TotalSize int64
OldestLSN uint64
NewestLSN uint64
}
GetSegmentStats returns statistics about segments
type ShardedPageTable ¶
type ShardedPageTable struct {
// contains filtered or unexported fields
}
ShardedPageTable provides a thread-safe, sharded hash table for pages Reduces lock contention by partitioning the page table into multiple shards
func NewShardedPageTable ¶
func NewShardedPageTable(numShards uint32) *ShardedPageTable
NewShardedPageTable creates a new sharded page table numShards should be a power of 2 for efficient modulo operations Recommended: 64-256 shards for good parallelism
func (*ShardedPageTable) Clear ¶
func (spt *ShardedPageTable) Clear()
Clear removes all pages from all shards
func (*ShardedPageTable) Delete ¶
func (spt *ShardedPageTable) Delete(pageId uint32)
Delete removes a page from the table
func (*ShardedPageTable) ForEach ¶
func (spt *ShardedPageTable) ForEach(fn func(pageId uint32, page *Page) bool)
ForEach executes a function for each page in the table The function is called while holding the shard lock, so it should be fast
func (*ShardedPageTable) Get ¶
func (spt *ShardedPageTable) Get(pageId uint32) (*Page, bool)
Get retrieves a page from the table
func (*ShardedPageTable) GetAll ¶
func (spt *ShardedPageTable) GetAll() []*Page
GetAll returns all pages (useful for iteration) This acquires all locks and should be used sparingly
func (*ShardedPageTable) Put ¶
func (spt *ShardedPageTable) Put(pageId uint32, page *Page)
Put adds or updates a page in the table
func (*ShardedPageTable) Size ¶
func (spt *ShardedPageTable) Size() int
Size returns the total number of pages across all shards
type SkipListConfig ¶
type SkipListConfig struct {
MaxLevel int32 // Maximum number of levels (default: 16)
P float64 // Probability for level generation (default: 0.5)
}
SkipListConfig configures the skip list
func DefaultSkipListConfig ¶
func DefaultSkipListConfig() SkipListConfig
DefaultSkipListConfig returns default configuration
type SkipListStats ¶
SkipListStats contains skip list statistics
type SlotEntry ¶
type SlotEntry struct {
Offset uint16 // Offset of the tuple in the page
Length uint16 // Length of the tuple
}
SlotEntry represents an entry in the slot directory
type SlottedPage ¶
type SlottedPage struct {
// contains filtered or unexported fields
}
SlottedPage represents a page with slotted page layout
func DeserializeSlottedPage ¶
func DeserializeSlottedPage(data []byte) (*SlottedPage, error)
DeserializeSlottedPage reconstructs a SlottedPage from bytes
func (*SlottedPage) DeleteTuple ¶
func (sp *SlottedPage) DeleteTuple(slotId uint16) error
DeleteTuple marks a tuple as deleted
func (*SlottedPage) GetFreeSpaceSize ¶
func (sp *SlottedPage) GetFreeSpaceSize() uint16
GetFreeSpaceSize returns the amount of free space in the page
func (*SlottedPage) GetTupleCount ¶
func (sp *SlottedPage) GetTupleCount() uint16
GetTupleCount returns the number of tuples in the page
func (*SlottedPage) InsertTuple ¶
func (sp *SlottedPage) InsertTuple(tuple *Tuple) (uint16, error)
InsertTuple inserts a new tuple into the page and returns its slot ID
func (*SlottedPage) ReadTuple ¶
func (sp *SlottedPage) ReadTuple(slotId uint16) (*Tuple, error)
ReadTuple reads a tuple from the page given its slot ID
func (*SlottedPage) Serialize ¶
func (sp *SlottedPage) Serialize() []byte
Serialize converts the SlottedPage to a byte array
func (*SlottedPage) UpdateTuple ¶
func (sp *SlottedPage) UpdateTuple(slotId uint16, newTuple *Tuple) error
UpdateTuple updates an existing tuple
type Snapshot ¶
type Snapshot struct {
XminSnapshot uint64 // Oldest active transaction
XmaxSnapshot uint64 // Next transaction ID
ActiveTxns []uint64 // Active transaction IDs at snapshot time
}
Snapshot represents a point-in-time view for MVCC
type StorageError ¶
type StorageError struct {
Code ErrorCode
Message string
Op string // Operation that failed
Err error // Underlying error (if any)
}
StorageError represents a storage engine error with context
func ErrDiskOperation ¶
func ErrDiskOperation(op string, err error) *StorageError
func ErrDuplicateKey ¶
func ErrDuplicateKey(op string, key int) *StorageError
func ErrInvalidTxnState ¶
func ErrInvalidTxnState(op string, txnID uint64, state string) *StorageError
func ErrKeyNotFound ¶
func ErrKeyNotFound(op string, key int) *StorageError
func ErrLogCorrupted ¶
func ErrLogCorrupted(op string, lsn uint64) *StorageError
func ErrNoFreePages ¶
func ErrNoFreePages(op string) *StorageError
func ErrPageFull ¶
func ErrPageFull(op string, pageID uint32) *StorageError
func ErrPageNotFound ¶
func ErrPageNotFound(op string, pageID uint32) *StorageError
func ErrPagePinned ¶
func ErrPagePinned(op string, pageID uint32, pinCount int) *StorageError
func ErrTxnNotFound ¶
func ErrTxnNotFound(op string, txnID uint64) *StorageError
func NewStorageError ¶
func NewStorageError(code ErrorCode, op, message string, err error) *StorageError
NewStorageError creates a new storage error
func (*StorageError) Error ¶
func (e *StorageError) Error() string
Error implements the error interface
func (*StorageError) Is ¶
func (e *StorageError) Is(target error) bool
Is checks if the error matches a specific error code
func (*StorageError) Unwrap ¶
func (e *StorageError) Unwrap() error
Unwrap returns the underlying error
type Transaction ¶
type Transaction struct {
TxnID uint64
StartTime time.Time
LastLSN uint64 // Last log record for this transaction
UndoLog []uint64 // LSNs for undo operations
// contains filtered or unexported fields
}
Transaction represents a database transaction
func (*Transaction) CompareAndSwapState ¶
func (t *Transaction) CompareAndSwapState(oldState, newState TxnState) bool
CompareAndSwapState atomically compares and swaps the transaction state Returns true if the swap was successful
func (*Transaction) GetState ¶
func (t *Transaction) GetState() TxnState
GetState returns the current transaction state (thread-safe)
func (*Transaction) SetState ¶
func (t *Transaction) SetState(newState TxnState)
SetState sets the transaction state atomically
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager manages database transactions
func NewTransactionManager ¶
func NewTransactionManager(logManager *LogManager) *TransactionManager
NewTransactionManager creates a new transaction manager
func (*TransactionManager) Abort ¶
func (tm *TransactionManager) Abort(txnID uint64) error
Abort aborts a transaction and undoes all its changes
func (*TransactionManager) Begin ¶
func (tm *TransactionManager) Begin() (*Transaction, error)
Begin starts a new transaction
func (*TransactionManager) Close ¶
func (tm *TransactionManager) Close() error
Close gracefully shuts down the transaction manager
func (*TransactionManager) Commit ¶
func (tm *TransactionManager) Commit(txnID uint64) error
Commit commits a transaction
func (*TransactionManager) GetActiveTxns ¶
func (tm *TransactionManager) GetActiveTxns() []uint64
GetActiveTxns returns all active transaction IDs
func (*TransactionManager) GetGroupCommitStats ¶
func (tm *TransactionManager) GetGroupCommitStats() *GroupCommitStats
GetGroupCommitStats returns group commit statistics if enabled
func (*TransactionManager) GetMetrics ¶
func (tm *TransactionManager) GetMetrics() *Metrics
GetMetrics returns the transaction manager metrics
func (*TransactionManager) GetSnapshot ¶
func (tm *TransactionManager) GetSnapshot(txnID uint64) *Snapshot
GetSnapshot creates a snapshot for MVCC visibility checks
func (*TransactionManager) GetTransaction ¶
func (tm *TransactionManager) GetTransaction(txnID uint64) (*Transaction, bool)
GetTransaction returns a transaction by ID (for internal use)
func (*TransactionManager) RecordUndo ¶
func (tm *TransactionManager) RecordUndo(txnID uint64, lsn uint64) error
RecordUndo adds an LSN to the transaction's undo log
func (*TransactionManager) SetBufferPool ¶ added in v2.9.0
func (tm *TransactionManager) SetBufferPool(bpm *BufferPoolManager)
SetBufferPool sets the buffer pool manager for page modifications This allows the transaction manager to perform actual page updates during undo operations
type Tuple ¶
type Tuple struct {
// contains filtered or unexported fields
}
Tuple represents a tuple with header and data Includes MVCC fields for snapshot isolation
func NewTupleWithMVCC ¶
NewTupleWithMVCC creates a new tuple with MVCC transaction ID
type TwoQReplacer ¶
type TwoQReplacer struct {
// contains filtered or unexported fields
}
TwoQReplacer implements the 2Q cache replacement algorithm 2Q is simpler than ARC but more effective than LRU for many workloads It maintains two queues: - A1 (Am): First-time access queue (FIFO) - A2 (A1in): Frequently accessed queue (LRU) Pages graduate from A1 to A2 on second access
func NewTwoQReplacer ¶
func NewTwoQReplacer(capacity int) *TwoQReplacer
NewTwoQReplacer creates a new 2Q replacer with the given capacity Recommended size ratios (from 2Q paper): - A1: 25% of capacity (first-time pages) - A2: 75% of capacity (frequent pages) - A1out: 50% of capacity (ghost entries)
func (*TwoQReplacer) GetStats ¶
func (r *TwoQReplacer) GetStats() TwoQStats
GetStats returns statistics about the 2Q cache
func (*TwoQReplacer) Remove ¶
func (r *TwoQReplacer) Remove(frameID uint32)
Remove explicitly removes a frame from all queues
func (*TwoQReplacer) Size ¶
func (r *TwoQReplacer) Size() uint32
Size returns the number of frames being tracked
func (*TwoQReplacer) Unpin ¶
func (r *TwoQReplacer) Unpin(frameID uint32)
Unpin removes a frame from consideration
func (*TwoQReplacer) Victim ¶
func (r *TwoQReplacer) Victim() (uint32, bool)
Victim selects a frame to evict
type TwoQStats ¶
type TwoQStats struct {
A1Size int // Current pages in A1 (probationary)
A1MaxSize int // Max size of A1
A2Size int // Current pages in A2 (protected)
A2MaxSize int // Max size of A2
A1outSize int // Current ghost entries
A1outMaxSize int // Max ghost entries
TotalPages int // Total pages tracked
Capacity int // Total capacity
}
TwoQStats contains statistics about the 2Q cache state
type TxnState ¶
type TxnState int32 // Changed to int32 for atomic operations
TxnState represents the state of a transaction
type VacuumManager ¶
type VacuumManager struct {
// contains filtered or unexported fields
}
VacuumManager handles garbage collection of old MVCC tuple versions
func NewVacuumManager ¶
func NewVacuumManager(bpm *BufferPoolManager, txnMgr *TransactionManager) *VacuumManager
NewVacuumManager creates a new vacuum manager
func (*VacuumManager) GetStats ¶
func (vm *VacuumManager) GetStats() VacuumStats
GetStats returns current vacuum statistics
func (*VacuumManager) ResetStats ¶
func (vm *VacuumManager) ResetStats()
ResetStats resets all vacuum statistics
func (*VacuumManager) StartAutoVacuum ¶
func (vm *VacuumManager) StartAutoVacuum(interval time.Duration)
StartAutoVacuum starts automatic vacuum in the background
func (*VacuumManager) StopAutoVacuum ¶
func (vm *VacuumManager) StopAutoVacuum()
StopAutoVacuum stops the automatic vacuum process
func (*VacuumManager) UpdateHorizon ¶
func (vm *VacuumManager) UpdateHorizon() uint64
UpdateHorizon updates the minimum transaction ID horizon Tuples with xmax < minHorizon can be safely reclaimed
func (*VacuumManager) VacuumAll ¶
func (vm *VacuumManager) VacuumAll() (*VacuumStats, error)
VacuumAll performs garbage collection on all pages in the buffer pool
func (*VacuumManager) VacuumPage ¶
func (vm *VacuumManager) VacuumPage(pageID uint32) (int, error)
VacuumPage performs garbage collection on a single page Returns the number of tuples reclaimed
Source Files
¶
- adaptive_flusher.go
- arc_replacer.go
- bloom_filter.go
- bptree.go
- bptree_parallel.go
- buffer_pool_manager.go
- clockpro_replacer.go
- compressed_log.go
- config.go
- constants.go
- disk_manager.go
- errors.go
- group_commit.go
- lock_free_log_buffer.go
- lock_free_skiplist.go
- log_manager.go
- log_recycler.go
- lru_replacer.go
- metrics.go
- mmap_disk_manager.go
- page.go
- page_compression.go
- parallel_log_manager.go
- parallel_recovery.go
- pipeline_commit.go
- prefetcher.go
- recovery_manager.go
- replacer.go
- rwlatch.go
- sharded_page_table.go
- transaction_manager.go
- twoq_replacer.go
- vacuum.go