store

package
v9.0.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const BUFFER_SIZE int64 = 1024 * 8
View Source
const CONCURRENT_BUFFER_SIZE_BYTES = 6 * 1024 * 1024

Buffer_Sizes (must be 5MiB+ for AWS minimum chunk size)

View Source
const EXPIRY_POLICY_PREFIX = "BackupExpiryFor-"

NOTE - changing this will have negative effects on any existing system that uses these rules.

View Source
const MAX_BUFFERED_READER_BYTES = 1024 * 1024

Size of the buffered readers buffer 1MB

View Source
const MAX_CONSECUTIVE_EOF = 1

One is all that is required, if you see EOF more than twice it's the end of the actual file not just a blob.

View Source
const MAX_FILE_BYTES_BEFORE_CONCURRENT_UPLOAD = 50 * 1024 * 1024

Perform a concurrent upload at 50MiB

View Source
const MILLISECONDS_IN_A_DAY = 1000 * 60 * 60 * 24
View Source
const NO_LIFECYCLE_ERROR_CODE = "NoSuchLifecycleConfiguration"
View Source
const NUM_CONCURRENT_UPLOAD_THREADS = 10

Concurrent upload threads

View Source
const XOR_FILE_EXT = ".xor"

Variables

View Source
var STORAGE_KEY = [16]byte{0x69, 0x6c, 0x6f, 0x76, 0x65, 0x61, 0x7a, 0x75, 0x6c, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37}

Azul's XOR key (16 bytes): "iloveazul1234567"

Functions

func BaseBenchmarkReadStore

func BaseBenchmarkReadStore(b *testing.B, fs FileStorage)

func BaseBenchmarkWriteStore

func BaseBenchmarkWriteStore(b *testing.B, fs FileStorage)

func StoreImplementationBaseTests

func StoreImplementationBaseTests(t *testing.T, fs FileStorage)

Generic tests that anything implementing the store interface should pass.

func StoreImplementationListBaseTests

func StoreImplementationListBaseTests(t *testing.T, fs FileStorage)

Test the list functionality of stores

Types

type AccessError

type AccessError struct {
	// contains filtered or unexported fields
}

func (*AccessError) Error

func (e *AccessError) Error() string

type AutomaticAgeOffSettings

type AutomaticAgeOffSettings struct {
	// Create ageoff rules in S3 that removes data older than the age-off of a source (based on last modified dates).
	EnableAutomaticAgeOff bool
	// Remove any rules that were created by the automatic ageoff policies.
	EnableCleanupAutoAgeOff bool
	// Configuration of Azul sources used to determine what ageoff should be set to.
	SourceConf *models.SourcesConf
}

type CloseWrapper

type CloseWrapper struct {
	// contains filtered or unexported fields
}

func NewCloseWrapper

func NewCloseWrapper(reader io.Reader, closer io.Closer) *CloseWrapper

Create a new CloseWrapper which wraps a reader with no closer and the parent stream that is being read from.

func (*CloseWrapper) Close

func (cw *CloseWrapper) Close() error

func (*CloseWrapper) Read

func (cw *CloseWrapper) Read(p []byte) (n int, err error)

type DataSlice

type DataSlice struct {
	DataReader         io.ReadCloser
	Start, Size, Avail int64
}

func NewDataSlice

func NewDataSlice() DataSlice

Create an empty Dataslice with a reader with no bytes.

type FileStorage

type FileStorage interface {
	// Readcloser with the open file or stream and fileSize if known otherwise provide -1 for file size.
	Put(source, label, id string, data io.ReadCloser, fileSize int64) error
	// Fetch file from offset to size, if offset is 0 fetch from start, if size is -1 fetch to the end of the file.
	Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)
	// Check a file exists in the filestore.
	Exists(source, label, id string) (bool, error)
	// Delete deletes the specified key if older than supplied unix timestamp in seconds.
	Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)
	// Copy within the S3 store from old to new location
	Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error
	// List all objects in the S3 store, the provided context must be cancelled when list is no longer needed.
	List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo
}

func NewAzureStore

func NewAzureStore(endpoint string, containerName string, storageAccount string, accessKey string, operationMetricHistogram *prometheus.HistogramVec) (FileStorage, error)

NewAzureStore instantiates a new StoreAzure FileStore instance. The storage account name is specified by endpoint and must be provided in the format: "https://<storage-account-name>.blob.core.windows.net/". Files will be stored in the container named containerName. storageAccount is optional, and if empty the name will be extracted from the endpoint. AccessKey, use an empty string if service principal with secret is being used, otherwise provide a valid access key operationMetricHistogram is optional, if left as nil no metrics are collected.

func NewDataCache

func NewDataCache(maxsize, ttl, shards int, store FileStorage, metricCollector StoreCacheMetricCollectors) (FileStorage, error)

func NewEmptyLocalStore

func NewEmptyLocalStore(root string) (FileStorage, error)

NewEmptyLocalStore returns a FileStorage wil no data. Intended for testing, as aborted tests may otherwise leave files on disk.

func NewLocalStore

func NewLocalStore(root string) (FileStorage, error)

func NewS3Store

func NewS3Store(endpoint string, accessKey string, secretKey string, secure bool, bucket string, region string, promStreamsOperationDuration *prometheus.HistogramVec, autoAgeoffSettings AutomaticAgeOffSettings) (FileStorage, error)

* Creates a new S3 store with static credentials.

func NewS3StoreIAM

func NewS3StoreIAM(endpoint string, secure bool, bucket string, region string, promStreamsOperationDuration *prometheus.HistogramVec, autoAgeoffSettings AutomaticAgeOffSettings) (FileStorage, error)

* Creates a new S3 store using IAM credentials.

func NewXORStore

func NewXORStore(Backend FileStorage, enabled bool) FileStorage

Creates a new XOR wrapper.

type FileStorageDeleteOption

type FileStorageDeleteOption func(*FileStorageDeleteOptions)

func WithDeleteIfOlderThan

func WithDeleteIfOlderThan(ifOlderThan int64) FileStorageDeleteOption

Set the option tto only delete files if they are older than a certain date time (expressed as seconds since epoch)

type FileStorageDeleteOptions

type FileStorageDeleteOptions struct {
	IfOlderThan int64
}

-------------------------- Delete options --------------------------

func NewFileStorageDeleteOptions

func NewFileStorageDeleteOptions(opts ...FileStorageDeleteOption) *FileStorageDeleteOptions

Default constructor for file delete options

type FileStorageFetchOption

type FileStorageFetchOption func(*FileStorageFetchOptions)

func WithOffsetAndSize

func WithOffsetAndSize(offset int64, size int64) FileStorageFetchOption

Set the offset and size for a fetch operation

type FileStorageFetchOptions

type FileStorageFetchOptions struct {
	Offset int64
	Size   int64
}

-------------------------- Fetch options --------------------------

func NewFileStorageFetchOptions

func NewFileStorageFetchOptions(opts ...FileStorageFetchOption) *FileStorageFetchOptions

Default constructor for file fetch

type FileStorageObjectListInfo

type FileStorageObjectListInfo struct {
	Key string
	// Expected form of the key is: 'Source/Label/Id' (if it isn't use the key directly.)
	// Source section of the object
	Source string
	// Label section of the object
	Label string
	// Is section of the object.
	Id string
}

type MockFileStorage

type MockFileStorage struct {
	mock.Mock
}

MockFileStorage is an autogenerated mock type for the FileStorage type

func NewMockFileStorage

func NewMockFileStorage(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockFileStorage

NewMockFileStorage creates a new instance of MockFileStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockFileStorage) Copy

func (_m *MockFileStorage) Copy(sourceOld string, labelOld string, idOld string, sourceNew string, labelNew string, idNew string) error

Copy provides a mock function with given fields: sourceOld, labelOld, idOld, sourceNew, labelNew, idNew

func (*MockFileStorage) Delete

func (_m *MockFileStorage) Delete(source string, label string, id string, opts ...FileStorageDeleteOption) (bool, error)

Delete provides a mock function with given fields: source, label, id, opts

func (*MockFileStorage) EXPECT

func (*MockFileStorage) Exists

func (_m *MockFileStorage) Exists(source string, label string, id string) (bool, error)

Exists provides a mock function with given fields: source, label, id

func (*MockFileStorage) Fetch

func (_m *MockFileStorage) Fetch(source string, label string, id string, opts ...FileStorageFetchOption) (DataSlice, error)

Fetch provides a mock function with given fields: source, label, id, opts

func (*MockFileStorage) List

func (_m *MockFileStorage) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

List provides a mock function with given fields: ctx, prefix, startAfter

func (*MockFileStorage) Put

func (_m *MockFileStorage) Put(source string, label string, id string, data io.ReadCloser, fileSize int64) error

Put provides a mock function with given fields: source, label, id, data, fileSize

type MockFileStorage_Copy_Call

type MockFileStorage_Copy_Call struct {
	*mock.Call
}

MockFileStorage_Copy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Copy'

func (*MockFileStorage_Copy_Call) Return

func (*MockFileStorage_Copy_Call) Run

func (_c *MockFileStorage_Copy_Call) Run(run func(sourceOld string, labelOld string, idOld string, sourceNew string, labelNew string, idNew string)) *MockFileStorage_Copy_Call

func (*MockFileStorage_Copy_Call) RunAndReturn

type MockFileStorage_Delete_Call

type MockFileStorage_Delete_Call struct {
	*mock.Call
}

MockFileStorage_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete'

func (*MockFileStorage_Delete_Call) Return

func (*MockFileStorage_Delete_Call) Run

func (*MockFileStorage_Delete_Call) RunAndReturn

type MockFileStorage_Exists_Call

type MockFileStorage_Exists_Call struct {
	*mock.Call
}

MockFileStorage_Exists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exists'

func (*MockFileStorage_Exists_Call) Return

func (*MockFileStorage_Exists_Call) Run

func (_c *MockFileStorage_Exists_Call) Run(run func(source string, label string, id string)) *MockFileStorage_Exists_Call

func (*MockFileStorage_Exists_Call) RunAndReturn

type MockFileStorage_Expecter

type MockFileStorage_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockFileStorage_Expecter) Copy

func (_e *MockFileStorage_Expecter) Copy(sourceOld interface{}, labelOld interface{}, idOld interface{}, sourceNew interface{}, labelNew interface{}, idNew interface{}) *MockFileStorage_Copy_Call

Copy is a helper method to define mock.On call

  • sourceOld string
  • labelOld string
  • idOld string
  • sourceNew string
  • labelNew string
  • idNew string

func (*MockFileStorage_Expecter) Delete

func (_e *MockFileStorage_Expecter) Delete(source interface{}, label interface{}, id interface{}, opts ...interface{}) *MockFileStorage_Delete_Call

Delete is a helper method to define mock.On call

  • source string
  • label string
  • id string
  • opts ...FileStorageDeleteOption

func (*MockFileStorage_Expecter) Exists

func (_e *MockFileStorage_Expecter) Exists(source interface{}, label interface{}, id interface{}) *MockFileStorage_Exists_Call

Exists is a helper method to define mock.On call

  • source string
  • label string
  • id string

func (*MockFileStorage_Expecter) Fetch

func (_e *MockFileStorage_Expecter) Fetch(source interface{}, label interface{}, id interface{}, opts ...interface{}) *MockFileStorage_Fetch_Call

Fetch is a helper method to define mock.On call

  • source string
  • label string
  • id string
  • opts ...FileStorageFetchOption

func (*MockFileStorage_Expecter) List

func (_e *MockFileStorage_Expecter) List(ctx interface{}, prefix interface{}, startAfter interface{}) *MockFileStorage_List_Call

List is a helper method to define mock.On call

  • ctx context.Context
  • prefix string
  • startAfter string

func (*MockFileStorage_Expecter) Put

func (_e *MockFileStorage_Expecter) Put(source interface{}, label interface{}, id interface{}, data interface{}, fileSize interface{}) *MockFileStorage_Put_Call

Put is a helper method to define mock.On call

  • source string
  • label string
  • id string
  • data io.ReadCloser
  • fileSize int64

type MockFileStorage_Fetch_Call

type MockFileStorage_Fetch_Call struct {
	*mock.Call
}

MockFileStorage_Fetch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Fetch'

func (*MockFileStorage_Fetch_Call) Return

func (*MockFileStorage_Fetch_Call) Run

func (*MockFileStorage_Fetch_Call) RunAndReturn

type MockFileStorage_List_Call

type MockFileStorage_List_Call struct {
	*mock.Call
}

MockFileStorage_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List'

func (*MockFileStorage_List_Call) Return

func (*MockFileStorage_List_Call) Run

func (_c *MockFileStorage_List_Call) Run(run func(ctx context.Context, prefix string, startAfter string)) *MockFileStorage_List_Call

func (*MockFileStorage_List_Call) RunAndReturn

type MockFileStorage_Put_Call

type MockFileStorage_Put_Call struct {
	*mock.Call
}

MockFileStorage_Put_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Put'

func (*MockFileStorage_Put_Call) Return

func (*MockFileStorage_Put_Call) Run

func (_c *MockFileStorage_Put_Call) Run(run func(source string, label string, id string, data io.ReadCloser, fileSize int64)) *MockFileStorage_Put_Call

func (*MockFileStorage_Put_Call) RunAndReturn

type NotFoundError

type NotFoundError struct{}

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type OffsetAfterEnd

type OffsetAfterEnd struct {
	// contains filtered or unexported fields
}

func (*OffsetAfterEnd) Error

func (r *OffsetAfterEnd) Error() string

type ReadError

type ReadError struct {
	// contains filtered or unexported fields
}

func (*ReadError) Error

func (e *ReadError) Error() string

type RetryReaderWrapper

type RetryReaderWrapper struct {
	// contains filtered or unexported fields
}

func (*RetryReaderWrapper) Close

func (rrw *RetryReaderWrapper) Close() error

func (*RetryReaderWrapper) Read

func (rrw *RetryReaderWrapper) Read(p []byte) (n int, err error)

Continually retry reading the retryReader until we are confident we have all the data from azure.

This wrapper is required because the azure RetryReader will return EOF at the end of each chunk of a blob rather than at the end of the file. So even if an EOF is received you have to try the read again and every it's the actual EOF and not end of a blob.

type StoreAzure

type StoreAzure struct {
	// contains filtered or unexported fields
}

StoreAzure is a FileStorage implementation to store files via an Azure blob store.

func (*StoreAzure) Copy

func (s *StoreAzure) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreAzure) Delete

func (s *StoreAzure) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

Delete marks the specified file for deletion. The file is later deleted during garbage collection. Note that deleting a file also deletes all its snapshots. For more information, see https://docs.microsoft.com/rest/api/storageservices/delete-blob.

If onlyIfOlderThan is a zero or negative value, then the file is always deleted. If onlyIfOlderThan is a non-zero value the file is only deleted if the file has not been modifed since the given number of seconds since epoch.

A successful Delete will return true. If the file or container do not exist then Delete will return NotFoundError. In the event of an error, a false value will be returned along side err == AccessError.

func (*StoreAzure) Exists

func (s *StoreAzure) Exists(source, label, id string) (bool, error)

Exists reports whether the file given by id exists in the filestore.

A successful Exists check will return true if the file exists and false if the files does not exist. In the event of an error, a false value will be returned along side err == the error.

func (*StoreAzure) Fetch

func (s *StoreAzure) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

Fetch downloads the requested file given by the id and returns the data in a DataSlice. Partial files can be retrieved by specifying the offset and/or size.

A zero or negative size indicates the whole file. A zero offset indicates the start of a file. A negative offset is treated as relative to the end of the file and if the negative offset is larger than the max filesize then the offset will be set to the start of the file. If an offset has a value in combination with a zero value size, this indicates from the offset to the end of the file.

An offset that is equal to or greater than the size of the file will result in an OffsetAfterEnd error. A file that does not exist will return a NotFoundError error.

Warning: If the size + offset is larger than the filesize then the data from the given offset to the end of file will be given instead.

func (*StoreAzure) List

func (s *StoreAzure) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

List with the provided context, context must be cancelled when list is no longer needed.

func (*StoreAzure) Put

func (s *StoreAzure) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

Put uploads a file from the data buffer to the blob store.

It only returns the first error encountered, if any.

type StoreCache

type StoreCache struct {
	// contains filtered or unexported fields
}

func (*StoreCache) Copy

func (c *StoreCache) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreCache) Delete

func (c *StoreCache) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

func (*StoreCache) Exists

func (c *StoreCache) Exists(source, label, id string) (bool, error)

func (*StoreCache) Fetch

func (c *StoreCache) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

func (*StoreCache) List

func (c *StoreCache) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

func (*StoreCache) Put

func (c *StoreCache) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

type StoreCacheMetricCollectors

type StoreCacheMetricCollectors struct {
	CacheLookup                  prometheus.Counter
	CacheHits                    prometheus.Counter
	PromStreamsOperationDuration *prometheus.HistogramVec
}

type StoreFilesystem

type StoreFilesystem struct {
	// contains filtered or unexported fields
}

Store file on local filesystem.

func (*StoreFilesystem) Copy

func (s *StoreFilesystem) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreFilesystem) Delete

func (s *StoreFilesystem) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

func (*StoreFilesystem) Exists

func (s *StoreFilesystem) Exists(source, label, id string) (bool, error)

func (*StoreFilesystem) Fetch

func (s *StoreFilesystem) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

func (*StoreFilesystem) GetRootPath

func (s *StoreFilesystem) GetRootPath() string

func (*StoreFilesystem) List

func (s *StoreFilesystem) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

func (*StoreFilesystem) Put

func (s *StoreFilesystem) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

type StoreMem

type StoreMem struct {
	Data map[string][]byte
	// contains filtered or unexported fields
}

In memory version of store for testing, storage of files into an s3 provider.

func NewStoreMem

func NewStoreMem() *StoreMem

func (*StoreMem) Copy

func (s *StoreMem) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreMem) Delete

func (s *StoreMem) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

func (*StoreMem) Exists

func (s *StoreMem) Exists(source, label, id string) (bool, error)

func (*StoreMem) Fetch

func (s *StoreMem) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

func (*StoreMem) List

func (s *StoreMem) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

List the contents of the current S3 Bucket.

func (*StoreMem) Put

func (s *StoreMem) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

type StoreS3

type StoreS3 struct {
	// contains filtered or unexported fields
}

Store files via s3 provider.

func (*StoreS3) Copy

func (s *StoreS3) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreS3) Delete

func (s *StoreS3) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

func (*StoreS3) Exists

func (s *StoreS3) Exists(source, label, id string) (bool, error)

func (*StoreS3) Fetch

func (s *StoreS3) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

func (*StoreS3) List

func (s *StoreS3) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

func (*StoreS3) Put

func (s *StoreS3) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

type StoreXOR

type StoreXOR struct {
	Backend FileStorage
	// contains filtered or unexported fields
}

Store files with a XOR cipher.

func (*StoreXOR) Copy

func (s *StoreXOR) Copy(sourceOld, labelOld, idOld, sourceNew, labelNew, idNew string) error

func (*StoreXOR) Delete

func (s *StoreXOR) Delete(source, label, id string, opts ...FileStorageDeleteOption) (bool, error)

func (*StoreXOR) Exists

func (s *StoreXOR) Exists(source, label, id string) (bool, error)

func (*StoreXOR) Fetch

func (s *StoreXOR) Fetch(source, label, id string, opts ...FileStorageFetchOption) (DataSlice, error)

func (*StoreXOR) List

func (s *StoreXOR) List(ctx context.Context, prefix string, startAfter string) <-chan FileStorageObjectListInfo

func (*StoreXOR) Put

func (s *StoreXOR) Put(source, label, id string, data io.ReadCloser, fileSize int64) error

type XORWrapper

type XORWrapper struct {
	// contains filtered or unexported fields
}

Wrapper used to XOR encode as data passes through

func (XORWrapper) Close

func (w XORWrapper) Close() error

func (*XORWrapper) Read

func (w *XORWrapper) Read(buffer []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL