Documentation
¶
Overview ¶
Package strego provides a distributed task queue for Go applications.
Index ¶
- Constants
- Variables
- func Unmarshal[T any](t *Task, v *T) error
- type Client
- func (c *Client) Cancel(ctx context.Context, taskID string) error
- func (c *Client) Close() error
- func (c *Client) Enqueue(ctx context.Context, task *Task) (*types.TaskInfo, error)
- func (c *Client) EnqueueUnique(ctx context.Context, task *Task, uniqueKey string, ttl time.Duration) (*types.TaskInfo, error)
- func (c *Client) GetQueueInfo(ctx context.Context, queue string) (*types.QueueInfo, error)
- func (c *Client) GetQueues(ctx context.Context) ([]string, error)
- func (c *Client) GetTask(ctx context.Context, taskID string) (*Task, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) Schedule(ctx context.Context, task *Task, processAt time.Time) (*types.TaskInfo, error)
- type ClientOption
- type Handler
- type HandlerFunc
- type Logger
- type QueueInfo
- type ServeMux
- type Server
- type ServerOption
- func WithConcurrency(n int) ServerOption
- func WithProcessedTTL(d time.Duration) ServerOption
- func WithQueues(queues ...string) ServerOption
- func WithRetryConfig(base, max time.Duration) ServerOption
- func WithServerLogger(logger Logger) ServerOption
- func WithServerStore(store Store) ServerOption
- func WithShutdownTimeout(d time.Duration) ServerOption
- type Store
- type Task
- func (t *Task) ID() string
- func (t *Task) Labels() map[string]string
- func (t *Task) LastError() string
- func (t *Task) MaxRetry() int
- func (t *Task) Payload() []byte
- func (t *Task) Proto() *types.TaskProto
- func (t *Task) Queue() string
- func (t *Task) RetryCount() int
- func (t *Task) State() types.TaskState
- func (t *Task) Type() string
- type TaskInfo
- type TaskMetadata
- type TaskOption
- func WithLabel(key, value string) TaskOption
- func WithLabels(labels map[string]string) TaskOption
- func WithMaxRetry(n int) TaskOption
- func WithPriority(priority int) TaskOption
- func WithProcessAt(at time.Time) TaskOption
- func WithProcessIn(d time.Duration) TaskOption
- func WithQueue(queue string) TaskOption
- func WithTimeout(d time.Duration) TaskOption
- func WithUniqueKey(key string, ttl time.Duration) TaskOption
- type TaskOptions
- type TaskProto
- type TaskState
Constants ¶
const ( DefaultConcurrency = 10 DefaultShutdownTimeout = 30 * time.Second DefaultProcessedTTL = 24 * time.Hour DefaultSchedulerTick = 1 * time.Second DefaultRetryBaseDuration = 1 * time.Second DefaultRetryMaxDuration = 10 * time.Minute )
Default configuration values
const ( TaskStateUnspecified = types.TaskStateUnspecified TaskStatePending = types.TaskStatePending TaskStateScheduled = types.TaskStateScheduled TaskStateActive = types.TaskStateActive TaskStateCompleted = types.TaskStateCompleted TaskStateFailed = types.TaskStateFailed TaskStateRetry = types.TaskStateRetry TaskStateDead = types.TaskStateDead TaskStateCancelled = types.TaskStateCancelled )
Task state constants
Variables ¶
var ErrDuplicateTask = errors.New("duplicate task: unique key already exists")
ErrDuplicateTask is returned when a unique task already exists.
var ErrTaskDead = errors.New("task moved to dead letter queue")
ErrTaskDead is returned when a task has been moved to the dead letter queue. This is not a real error - it signals that the task should not be ACKed.
var ErrTaskRetried = errors.New("task scheduled for retry")
ErrTaskRetried is returned when a task has been scheduled for retry. This is not a real error - it signals that the task should not be ACKed.
Functions ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is used to enqueue tasks for processing.
func NewClient ¶
func NewClient(b broker.Broker, opts ...ClientOption) *Client
NewClient creates a new task queue client.
func (*Client) Enqueue ¶
Enqueue adds a task to the queue for processing. If the task has a ProcessAt option set, it will be scheduled for later.
func (*Client) EnqueueUnique ¶
func (c *Client) EnqueueUnique(ctx context.Context, task *Task, uniqueKey string, ttl time.Duration) (*types.TaskInfo, error)
EnqueueUnique adds a task with a unique key for deduplication. If a task with the same key already exists within the TTL, ErrDuplicateTask is returned.
func (*Client) GetQueueInfo ¶
GetQueueInfo returns statistics for a queue.
func (*Client) GetTask ¶
GetTask retrieves information about a task. Requires a store to be configured.
type ClientOption ¶
type ClientOption func(*Client)
ClientOption configures the client.
func WithClientLogger ¶
func WithClientLogger(logger Logger) ClientOption
WithClientLogger sets the logger for the client.
func WithStore ¶
func WithStore(store Store) ClientOption
WithStore sets the optional store for task persistence.
type HandlerFunc ¶
HandlerFunc is an adapter to allow the use of ordinary functions as handlers.
func (HandlerFunc) ProcessTask ¶
func (f HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
ProcessTask calls f(ctx, task).
type Logger ¶
Logger is the interface for logging in strego. It uses logrus.FieldLogger which is implemented by both *logrus.Logger and *logrus.Entry.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a task handler multiplexer. It matches the type of each incoming task against a list of registered patterns and calls the handler that matches.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given task type.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server processes tasks from queues.
func NewServer ¶
func NewServer(b broker.Broker, opts ...ServerOption) *Server
NewServer creates a new task processing server.
func (*Server) HandleFunc ¶
HandleFunc registers a handler function for the given task type.
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures the server.
func WithConcurrency ¶
func WithConcurrency(n int) ServerOption
WithConcurrency sets the number of concurrent workers.
func WithProcessedTTL ¶
func WithProcessedTTL(d time.Duration) ServerOption
WithProcessedTTL sets how long to remember processed tasks for idempotency.
func WithQueues ¶
func WithQueues(queues ...string) ServerOption
WithQueues sets the queues to process. If not set, only "default" queue is processed.
func WithRetryConfig ¶
func WithRetryConfig(base, max time.Duration) ServerOption
WithRetryConfig sets the retry backoff configuration.
func WithServerLogger ¶
func WithServerLogger(logger Logger) ServerOption
WithServerLogger sets the logger for the server.
func WithServerStore ¶
func WithServerStore(store Store) ServerOption
WithServerStore sets the optional store for task persistence.
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) ServerOption
WithShutdownTimeout sets the graceful shutdown timeout.
type Store ¶
type Store interface {
// CreateTask saves a new task to the store.
CreateTask(ctx context.Context, task *types.TaskProto) error
// UpdateTask updates an existing task.
UpdateTask(ctx context.Context, task *types.TaskProto) error
// UpdateTaskState updates only the state and error of a task.
UpdateTaskState(ctx context.Context, taskID, state, errMsg string) error
// GetTask retrieves a task by ID.
GetTask(ctx context.Context, taskID string) (*types.TaskProto, error)
// Close closes the store connection.
Close() error
}
Store defines the interface for task persistence. This is optional and used for task history, search, and UI features. See pkg/store for the full interface and pkg/store/postgres for the PostgreSQL implementation.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a unit of work to be processed.
func NewTask ¶
func NewTask[T any](taskType string, payload T, opts ...TaskOption) (*Task, error)
NewTask creates a new task with a typed payload. The payload will be JSON-encoded.
func NewTaskFromBytes ¶
func NewTaskFromBytes(taskType string, payload []byte, opts ...TaskOption) *Task
NewTaskFromBytes creates a new task with raw JSON bytes payload.
func TaskFromProto ¶
TaskFromProto creates a Task from a TaskProto.
func (*Task) RetryCount ¶
RetryCount returns the number of retry attempts.
type TaskOption ¶
type TaskOption func(*Task)
TaskOption configures a task.
func WithLabel ¶
func WithLabel(key, value string) TaskOption
WithLabel adds a single label to the task.
func WithLabels ¶
func WithLabels(labels map[string]string) TaskOption
WithLabels sets custom labels on the task.
func WithMaxRetry ¶
func WithMaxRetry(n int) TaskOption
WithMaxRetry sets the maximum retry attempts.
func WithPriority ¶
func WithPriority(priority int) TaskOption
WithPriority sets the task priority (0-10, higher = more important).
func WithProcessAt ¶
func WithProcessAt(at time.Time) TaskOption
WithProcessAt sets when the task should be processed.
func WithProcessIn ¶
func WithProcessIn(d time.Duration) TaskOption
WithProcessIn sets the delay before processing.
func WithTimeout ¶
func WithTimeout(d time.Duration) TaskOption
WithTimeout sets the task processing timeout.
func WithUniqueKey ¶
func WithUniqueKey(key string, ttl time.Duration) TaskOption
WithUniqueKey sets a unique key for deduplication.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package broker defines the interface for task queue backends.
|
Package broker defines the interface for task queue backends. |
|
redis
Package redis provides a Redis Streams implementation of the Broker interface.
|
Package redis provides a Redis Streams implementation of the Broker interface. |
|
examples
|
|
|
basic
command
Package main demonstrates basic usage of strego.
|
Package main demonstrates basic usage of strego. |
|
production
command
Package main demonstrates strego with PostgreSQL - production-like example.
|
Package main demonstrates strego with PostgreSQL - production-like example. |
|
with-postgres
command
Package main demonstrates strego with PostgreSQL for task history and full UI features.
|
Package main demonstrates strego with PostgreSQL for task history and full UI features. |
|
Package store defines the interface for task persistence.
|
Package store defines the interface for task persistence. |
|
postgres
Package postgres provides a PostgreSQL implementation of the Store interface.
|
Package postgres provides a PostgreSQL implementation of the Store interface. |
|
Package types provides core types for the strego task queue system.
|
Package types provides core types for the strego task queue system. |
|
Package ui provides a built-in web dashboard for strego.
|
Package ui provides a built-in web dashboard for strego. |