Documentation
¶
Index ¶
- type BoundedQueue
- func (q *BoundedQueue) Capacity() int
- func (q *BoundedQueue) Produce(item interface{}) bool
- func (q *BoundedQueue) Resize(capacity int) bool
- func (q *BoundedQueue) Size() int
- func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{}) error)
- func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer)
- func (q *BoundedQueue) Stop()
- func (q *BoundedQueue) StopConsumers()
- type Consumer
- type ConsumerFunc
- type Queue
- type QueueSpec
- type RetryConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoundedQueue ¶ added in v2.1.1
type BoundedQueue struct {
// contains filtered or unexported fields
}
BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, where the queue is bounded and if it fills up due to slow consumers, the new items written by the producer force the earliest items to be dropped. The implementation is actually based on channels, with a special Reaper goroutine that wakes up when the queue is full and consumers the items from the top of the queue until its size drops back to maxSize
func NewBoundedQueue ¶ added in v2.1.1
func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue
NewBoundedQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).
func NewBoundedQueueWithRetry ¶ added in v2.1.1
func NewBoundedQueueWithRetry(capacity int, onDroppedItem func(item interface{}), retryMaxCount uint32, retryDelay time.Duration) *BoundedQueue
NewBoundedQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).
func (*BoundedQueue) Capacity ¶ added in v2.1.1
func (q *BoundedQueue) Capacity() int
Capacity returns capacity of the queue
func (*BoundedQueue) Produce ¶ added in v2.1.1
func (q *BoundedQueue) Produce(item interface{}) bool
Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (*BoundedQueue) Resize ¶ added in v2.1.1
func (q *BoundedQueue) Resize(capacity int) bool
Resize changes the capacity of the queue, returning whether the action was successful
func (*BoundedQueue) Size ¶ added in v2.1.1
func (q *BoundedQueue) Size() int
Size returns the current size of the queue
func (*BoundedQueue) StartConsumers ¶ added in v2.1.1
func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{}) error)
StartConsumers starts a given number of goroutines consuming items from the queue and passing them into the consumer callback.
func (*BoundedQueue) StartConsumersWithFactory ¶ added in v2.1.1
func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer)
StartConsumersWithFactory creates a given number of consumers consuming items from the queue in separate goroutines.
func (*BoundedQueue) Stop ¶ added in v2.1.1
func (q *BoundedQueue) Stop()
Stop stops all consumers, as well as the length reporter if started, and releases the items channel. It blocks until all consumers have stopped.
func (*BoundedQueue) StopConsumers ¶ added in v2.1.1
func (q *BoundedQueue) StopConsumers()
StopConsumers stops all the consumers should be started again with StartConsumers func
type Consumer ¶ added in v2.1.1
type Consumer interface {
Consume(item interface{}) error
}
Consumer consumes data from a bounded queue
type ConsumerFunc ¶ added in v2.1.1
type ConsumerFunc func(item interface{}) error
ConsumerFunc is an adapter to allow the use of a consume function callback as a Consumer.
func (ConsumerFunc) Consume ¶ added in v2.1.1
func (c ConsumerFunc) Consume(item interface{}) error
Consume calls c(item)
type Queue ¶
type Queue struct {
Name string
Queue *BoundedQueue
Limit int
Workers int
// contains filtered or unexported fields
}
Queue to hold items
func New ¶
func New(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int) *Queue
New returns brandnew queue
func NewWithRetry ¶ added in v2.1.1
func NewWithRetry(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int, isRetryEnabled bool, maxRetryCount uint32, retryDelay time.Duration) *Queue
New returns brandnew queue with retry options
type RetryConfig ¶ added in v2.1.1
type RetryConfig struct {
// contains filtered or unexported fields
}
RetryConfig holds configuration for retrying failed items