queue

package
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoundedQueue added in v2.1.1

type BoundedQueue struct {
	// contains filtered or unexported fields
}

BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, where the queue is bounded and if it fills up due to slow consumers, the new items written by the producer force the earliest items to be dropped. The implementation is actually based on channels, with a special Reaper goroutine that wakes up when the queue is full and consumers the items from the top of the queue until its size drops back to maxSize

func NewBoundedQueue added in v2.1.1

func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue

NewBoundedQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func NewBoundedQueueWithRetry added in v2.1.1

func NewBoundedQueueWithRetry(capacity int, onDroppedItem func(item interface{}), retryMaxCount uint32, retryDelay time.Duration) *BoundedQueue

NewBoundedQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).

func (*BoundedQueue) Capacity added in v2.1.1

func (q *BoundedQueue) Capacity() int

Capacity returns capacity of the queue

func (*BoundedQueue) Produce added in v2.1.1

func (q *BoundedQueue) Produce(item interface{}) bool

Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.

func (*BoundedQueue) Resize added in v2.1.1

func (q *BoundedQueue) Resize(capacity int) bool

Resize changes the capacity of the queue, returning whether the action was successful

func (*BoundedQueue) Size added in v2.1.1

func (q *BoundedQueue) Size() int

Size returns the current size of the queue

func (*BoundedQueue) StartConsumers added in v2.1.1

func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{}) error)

StartConsumers starts a given number of goroutines consuming items from the queue and passing them into the consumer callback.

func (*BoundedQueue) StartConsumersWithFactory added in v2.1.1

func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer)

StartConsumersWithFactory creates a given number of consumers consuming items from the queue in separate goroutines.

func (*BoundedQueue) Stop added in v2.1.1

func (q *BoundedQueue) Stop()

Stop stops all consumers, as well as the length reporter if started, and releases the items channel. It blocks until all consumers have stopped.

func (*BoundedQueue) StopConsumers added in v2.1.1

func (q *BoundedQueue) StopConsumers()

StopConsumers stops all the consumers should be started again with StartConsumers func

type Consumer added in v2.1.1

type Consumer interface {
	Consume(item interface{}) error
}

Consumer consumes data from a bounded queue

type ConsumerFunc added in v2.1.1

type ConsumerFunc func(item interface{}) error

ConsumerFunc is an adapter to allow the use of a consume function callback as a Consumer.

func (ConsumerFunc) Consume added in v2.1.1

func (c ConsumerFunc) Consume(item interface{}) error

Consume calls c(item)

type Queue

type Queue struct {
	Name    string
	Queue   *BoundedQueue
	Limit   int
	Workers int
	// contains filtered or unexported fields
}

Queue to hold items

func New

func New(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int) *Queue

New returns brandnew queue

func NewWithRetry added in v2.1.1

func NewWithRetry(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int, isRetryEnabled bool, maxRetryCount uint32, retryDelay time.Duration) *Queue

New returns brandnew queue with retry options

func (*Queue) Close

func (q *Queue) Close()

Close the queue

func (*Queue) Produce

func (q *Queue) Produce(item interface{}) bool

Produce adds an item to the queue

func (*Queue) Size

func (q *Queue) Size() int

Size returns current size of the queue

type QueueSpec

type QueueSpec struct {
	Topic          string
	SubscriptionId int64
	Queue          *Queue
}

used to hold queue and subscription details

func (*QueueSpec) Close

func (qs *QueueSpec) Close()

func (*QueueSpec) Produce

func (qs *QueueSpec) Produce(item interface{}) bool

func (*QueueSpec) Size

func (qs *QueueSpec) Size() int

type RetryConfig added in v2.1.1

type RetryConfig struct {
	// contains filtered or unexported fields
}

RetryConfig holds configuration for retrying failed items

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL