strego

package module
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: MIT Imports: 14 Imported by: 0

README

strego

A modern, production-ready distributed task queue library for Go.

Features

  • Redis Streams - Native consumer groups, at-least-once delivery, crash recovery
  • JSON - Human-readable, debuggable, no code generation needed
  • Exactly-once processing - Idempotency via Redis SET NX
  • Dead Letter Queue - Failed tasks after max retries
  • Scheduled tasks - Delayed and cron-based execution
  • Retry with backoff - Exponential backoff strategy
  • Multi-worker - Horizontal scaling with consumer groups
  • PostgreSQL (optional) - Task history, search, filtering for UI
  • Built-in Web UI - Dashboard with HTMX + Tailwind
  • Multiple Queues - Efficient queue isolation with minimal overhead

Documentation

Installation

go get github.com/erennakbas/strego

Quick Start

Producer
package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/redis/go-redis/v9"
    "github.com/erennakbas/strego/pkg/broker"
    brokerRedis "github.com/erennakbas/strego/pkg/broker/redis"
    "github.com/erennakbas/strego/pkg/strego"
)

func main() {
    // Connect to Redis
    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // Create broker with custom consumer config
    broker := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
        Group:         "strego-example",
        BatchSize:     10,
        BlockDuration: 5 * time.Second,
    }))
    client := strego.NewClient(broker)

    // Create and enqueue a task
    task := strego.NewTaskFromBytes("email:send",
        []byte(`{"to":"[email protected]","subject":"Welcome!"}`),
        strego.WithQueue("default"),
        strego.WithMaxRetry(3),
    )

    info, err := client.Enqueue(context.Background(), task)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Task enqueued: %s\n", info.ID)
}
Consumer
package main

import (
    "context"
    "log/slog"
    "time"
    
    "github.com/redis/go-redis/v9"
    "github.com/erennakbas/strego/pkg/broker"
    brokerRedis "github.com/erennakbas/strego/pkg/broker/redis"
    "github.com/erennakbas/strego/pkg/strego"
)

func main() {
    // Connect to Redis
    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // Create broker with consumer config
    broker := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
        Group:         "strego-example",
        BatchSize:     10,
        BlockDuration: 5 * time.Second,
    }))
    
    // Create server
    server := strego.NewServer(broker,
        strego.WithConcurrency(10),
        strego.WithQueues("default", "critical"),
    )

    // Register handlers
    server.HandleFunc("email:send", func(ctx context.Context, task *strego.Task) error {
        slog.Info("processing email", "payload", string(task.Payload()))
        // Your email sending logic here
        return nil
    })

    // Start processing (blocks until shutdown)
    if err := server.Start(); err != nil {
        panic(err)
    }
}
With Web UI
// Create UI server
uiServer, _ := ui.NewServer(ui.Config{
    Addr:   ":8080",
    Broker: broker,
    Store:  pgStore, // optional PostgreSQL store
})

go uiServer.Start()
// Visit http://localhost:8080

Task Options

task := strego.NewTaskFromBytes("task:type", payload,
    strego.WithQueue("critical"),          // Queue name
    strego.WithMaxRetry(5),                // Max retry attempts
    strego.WithTimeout(30*time.Second),    // Processing timeout
    strego.WithProcessIn(10*time.Minute),  // Delay execution
    strego.WithPriority(5),                // Priority (0-10)
    strego.WithUniqueKey("key", 1*time.Hour), // Deduplication
    strego.WithLabels(map[string]string{   // Custom labels
        "user_id": "123",
    }),
)

Broker Configuration

The broker can be configured with consumer settings that control how tasks are consumed from Redis Streams:

broker := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
    Group:           "strego-workers",      // Consumer group name
    Consumer:        "",                    // Auto-generated if empty
    BatchSize:       10,                    // Tasks to fetch per batch
    BlockDuration:   5 * time.Second,       // Wait time for new messages
    ClaimStaleAfter: 5 * time.Minute,       // Claim stale pending messages
}))

Configuration Options:

  • Group: Consumer group name. Multiple workers in the same group share the workload.
  • BatchSize: Number of tasks fetched in a single Redis call. Higher values improve throughput but increase memory usage.
  • BlockDuration: How long to wait for new messages. During this time, the consumer blocks waiting for new tasks. Other consumers can still read in parallel.
  • ClaimStaleAfter: Duration after which pending messages from crashed workers are automatically claimed by other workers.

Server Options

server := strego.NewServer(broker,
    strego.WithConcurrency(10),                    // Worker count
    strego.WithQueues("default", "critical"),      // Queues to process
    strego.WithShutdownTimeout(30*time.Second),    // Graceful shutdown
    strego.WithProcessedTTL(24*time.Hour),         // Idempotency TTL
    strego.WithRetryConfig(1*time.Second, 10*time.Minute), // Backoff config
    strego.WithServerStore(pgStore),               // Optional PostgreSQL
    strego.WithServerLogger(logger),               // Custom logger
)

PostgreSQL Integration (Optional)

PostgreSQL is optional but enables task history, search, and full UI features.

import "github.com/erennakbas/strego/pkg/store/postgres"

store, _ := postgres.New(postgres.Config{
    DSN: "postgres://user:pass@localhost/strego?sslmode=disable",
})

client := strego.NewClient(broker, strego.WithStore(store))
server := strego.NewServer(broker, strego.WithServerStore(store))
uiServer, _ := ui.NewServer(ui.Config{
    Broker: broker,
    Store:  store,
})

Queue Management

Multiple Queues

Strego efficiently supports multiple queues with minimal overhead. Each queue is isolated and can have different priorities, rate limits, and monitoring.

Queue Creation:

  • Queues are created lazily on first use (no upfront cost)
  • Each queue uses ~1-2KB memory when empty
  • All queues are consumed in a single efficient Redis call

Example with 20+ Queues:

// Efficient - all queues processed in one XReadGroup call
server := strego.NewServer(broker,
    strego.WithQueues(
        "email", "notification", "report", "analytics",
        "payment", "order", "inventory", "shipping",
        // ... 20+ queues
    ),
)

Benefits:

  • Task type isolation
  • Independent monitoring and stats
  • Separate priority and rate limiting
  • One queue failure doesn't affect others

Redis Key Structure

strego:stream:{queue}         # Stream - main queue
strego:dlq:{queue}            # Stream - dead letter queue
strego:retry                  # Sorted Set - delayed retries
strego:scheduled              # Sorted Set - scheduled tasks
strego:processed:{task_id}    # String + TTL - idempotency
strego:unique:{key}           # String + TTL - deduplication
strego:stats:{queue}          # Hash - counters
strego:queues                 # Set - all queue names

Performance & Scaling

  • Horizontal Scaling: Add more workers to the same consumer group for automatic load balancing
  • Queue Isolation: Multiple queues (200+) with minimal overhead (~400-600KB for empty queues)
  • Batch Processing: Configurable batch size for optimal throughput
  • Memory Efficient: Lazy queue creation, automatic cleanup of processed tasks
  • Low Latency: Blocking reads with configurable timeout for immediate task processing

Comparison

Feature asynq machinery strego
Redis Backend Lists Lists Streams
Serialization JSON JSON JSON
Consumer Groups Custom Custom Native
Crash Recovery Limited Limited Built-in
PostgreSQL
Web UI Separate Built-in
Exactly-once UniqueTask
Multiple Queues Limited Limited Efficient

License

MIT

Documentation

Overview

Package strego provides a distributed task queue for Go applications.

Index

Constants

View Source
const (
	DefaultConcurrency       = 10
	DefaultShutdownTimeout   = 30 * time.Second
	DefaultProcessedTTL      = 24 * time.Hour
	DefaultSchedulerTick     = 1 * time.Second
	DefaultRetryBaseDuration = 1 * time.Second
	DefaultRetryMaxDuration  = 10 * time.Minute
)

Default configuration values

View Source
const (
	TaskStateUnspecified = types.TaskStateUnspecified
	TaskStatePending     = types.TaskStatePending
	TaskStateScheduled   = types.TaskStateScheduled
	TaskStateActive      = types.TaskStateActive
	TaskStateCompleted   = types.TaskStateCompleted
	TaskStateFailed      = types.TaskStateFailed
	TaskStateRetry       = types.TaskStateRetry
	TaskStateDead        = types.TaskStateDead
	TaskStateCancelled   = types.TaskStateCancelled
)

Task state constants

Variables

View Source
var ErrDuplicateTask = errors.New("duplicate task: unique key already exists")

ErrDuplicateTask is returned when a unique task already exists.

View Source
var ErrTaskDead = errors.New("task moved to dead letter queue")

ErrTaskDead is returned when a task has been moved to the dead letter queue. This is not a real error - it signals that the task should not be ACKed.

View Source
var ErrTaskRetried = errors.New("task scheduled for retry")

ErrTaskRetried is returned when a task has been scheduled for retry. This is not a real error - it signals that the task should not be ACKed.

Functions

func Unmarshal

func Unmarshal[T any](t *Task, v *T) error

Unmarshal extracts the payload into a typed struct.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is used to enqueue tasks for processing.

func NewClient

func NewClient(b broker.Broker, opts ...ClientOption) *Client

NewClient creates a new task queue client.

func (*Client) Cancel

func (c *Client) Cancel(ctx context.Context, taskID string) error

Cancel cancels a pending or scheduled task.

func (*Client) Close

func (c *Client) Close() error

Close closes the client and releases resources.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, task *Task) (*types.TaskInfo, error)

Enqueue adds a task to the queue for processing. If the task has a ProcessAt option set, it will be scheduled for later.

func (*Client) EnqueueUnique

func (c *Client) EnqueueUnique(ctx context.Context, task *Task, uniqueKey string, ttl time.Duration) (*types.TaskInfo, error)

EnqueueUnique adds a task with a unique key for deduplication. If a task with the same key already exists within the TTL, ErrDuplicateTask is returned.

func (*Client) GetQueueInfo

func (c *Client) GetQueueInfo(ctx context.Context, queue string) (*types.QueueInfo, error)

GetQueueInfo returns statistics for a queue.

func (*Client) GetQueues

func (c *Client) GetQueues(ctx context.Context) ([]string, error)

GetQueues returns all known queue names.

func (*Client) GetTask

func (c *Client) GetTask(ctx context.Context, taskID string) (*Task, error)

GetTask retrieves information about a task. Requires a store to be configured.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping checks if the broker is healthy.

func (*Client) Schedule

func (c *Client) Schedule(ctx context.Context, task *Task, processAt time.Time) (*types.TaskInfo, error)

Schedule adds a task to be processed at a specific time.

type ClientOption

type ClientOption func(*Client)

ClientOption configures the client.

func WithClientLogger

func WithClientLogger(logger Logger) ClientOption

WithClientLogger sets the logger for the client.

func WithStore

func WithStore(store Store) ClientOption

WithStore sets the optional store for task persistence.

type Handler

type Handler interface {
	ProcessTask(ctx context.Context, task *Task) error
}

Handler processes tasks of a specific type.

type HandlerFunc

type HandlerFunc func(ctx context.Context, task *Task) error

HandlerFunc is an adapter to allow the use of ordinary functions as handlers.

func (HandlerFunc) ProcessTask

func (f HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

ProcessTask calls f(ctx, task).

type Logger

type Logger = types.Logger

Logger is the interface for logging in strego. It uses logrus.FieldLogger which is implemented by both *logrus.Logger and *logrus.Entry.

type QueueInfo

type QueueInfo = types.QueueInfo

Re-export types for convenience

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a task handler multiplexer. It matches the type of each incoming task against a list of registered patterns and calls the handler that matches.

func NewServeMux

func NewServeMux() *ServeMux

NewServeMux creates a new ServeMux.

func (*ServeMux) Handle

func (m *ServeMux) Handle(taskType string, handler Handler)

Handle registers the handler for the given task type.

func (*ServeMux) HandleFunc

func (m *ServeMux) HandleFunc(taskType string, handler func(ctx context.Context, task *Task) error)

HandleFunc registers the handler function for the given task type.

func (*ServeMux) Handler

func (m *ServeMux) Handler(taskType string) Handler

Handler returns the handler for the given task type. Returns nil if no handler is registered.

func (*ServeMux) Types

func (m *ServeMux) Types() []string

Types returns all registered task types.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server processes tasks from queues.

func NewServer

func NewServer(b broker.Broker, opts ...ServerOption) *Server

NewServer creates a new task processing server.

func (*Server) Handle

func (s *Server) Handle(taskType string, handler Handler)

Handle registers a handler for the given task type.

func (*Server) HandleFunc

func (s *Server) HandleFunc(taskType string, fn func(ctx context.Context, task *Task) error)

HandleFunc registers a handler function for the given task type.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown gracefully stops the server.

func (*Server) Start

func (s *Server) Start() error

Start begins processing tasks. This method blocks until Shutdown is called or a termination signal is received.

type ServerOption

type ServerOption func(*Server)

ServerOption configures the server.

func WithConcurrency

func WithConcurrency(n int) ServerOption

WithConcurrency sets the number of concurrent workers.

func WithProcessedTTL

func WithProcessedTTL(d time.Duration) ServerOption

WithProcessedTTL sets how long to remember processed tasks for idempotency.

func WithQueues

func WithQueues(queues ...string) ServerOption

WithQueues sets the queues to process. If not set, only "default" queue is processed.

func WithRetryConfig

func WithRetryConfig(base, max time.Duration) ServerOption

WithRetryConfig sets the retry backoff configuration.

func WithServerLogger

func WithServerLogger(logger Logger) ServerOption

WithServerLogger sets the logger for the server.

func WithServerStore

func WithServerStore(store Store) ServerOption

WithServerStore sets the optional store for task persistence.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) ServerOption

WithShutdownTimeout sets the graceful shutdown timeout.

type Store

type Store interface {
	// CreateTask saves a new task to the store.
	CreateTask(ctx context.Context, task *types.TaskProto) error

	// UpdateTask updates an existing task.
	UpdateTask(ctx context.Context, task *types.TaskProto) error

	// UpdateTaskState updates only the state and error of a task.
	UpdateTaskState(ctx context.Context, taskID, state, errMsg string) error

	// GetTask retrieves a task by ID.
	GetTask(ctx context.Context, taskID string) (*types.TaskProto, error)

	// Close closes the store connection.
	Close() error
}

Store defines the interface for task persistence. This is optional and used for task history, search, and UI features. See pkg/store for the full interface and pkg/store/postgres for the PostgreSQL implementation.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a unit of work to be processed.

func NewTask

func NewTask[T any](taskType string, payload T, opts ...TaskOption) (*Task, error)

NewTask creates a new task with a typed payload. The payload will be JSON-encoded.

func NewTaskFromBytes

func NewTaskFromBytes(taskType string, payload []byte, opts ...TaskOption) *Task

NewTaskFromBytes creates a new task with raw JSON bytes payload.

func TaskFromProto

func TaskFromProto(proto *types.TaskProto) *Task

TaskFromProto creates a Task from a TaskProto.

func (*Task) ID

func (t *Task) ID() string

ID returns the task ID.

func (*Task) Labels

func (t *Task) Labels() map[string]string

Labels returns the task labels.

func (*Task) LastError

func (t *Task) LastError() string

LastError returns the last error message.

func (*Task) MaxRetry

func (t *Task) MaxRetry() int

MaxRetry returns the maximum retry attempts.

func (*Task) Payload

func (t *Task) Payload() []byte

Payload returns the raw payload bytes.

func (*Task) Proto

func (t *Task) Proto() *types.TaskProto

Proto returns the underlying task proto structure.

func (*Task) Queue

func (t *Task) Queue() string

Queue returns the queue name.

func (*Task) RetryCount

func (t *Task) RetryCount() int

RetryCount returns the number of retry attempts.

func (*Task) State

func (t *Task) State() types.TaskState

State returns the current task state.

func (*Task) Type

func (t *Task) Type() string

Type returns the task type.

type TaskInfo

type TaskInfo = types.TaskInfo

Re-export types for convenience

type TaskMetadata

type TaskMetadata = types.TaskMetadata

Re-export types for convenience

type TaskOption

type TaskOption func(*Task)

TaskOption configures a task.

func WithLabel

func WithLabel(key, value string) TaskOption

WithLabel adds a single label to the task.

func WithLabels

func WithLabels(labels map[string]string) TaskOption

WithLabels sets custom labels on the task.

func WithMaxRetry

func WithMaxRetry(n int) TaskOption

WithMaxRetry sets the maximum retry attempts.

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority sets the task priority (0-10, higher = more important).

func WithProcessAt

func WithProcessAt(at time.Time) TaskOption

WithProcessAt sets when the task should be processed.

func WithProcessIn

func WithProcessIn(d time.Duration) TaskOption

WithProcessIn sets the delay before processing.

func WithQueue

func WithQueue(queue string) TaskOption

WithQueue sets the queue name.

func WithTimeout

func WithTimeout(d time.Duration) TaskOption

WithTimeout sets the task processing timeout.

func WithUniqueKey

func WithUniqueKey(key string, ttl time.Duration) TaskOption

WithUniqueKey sets a unique key for deduplication.

type TaskOptions

type TaskOptions = types.TaskOptions

Re-export types for convenience

type TaskProto

type TaskProto = types.TaskProto

Re-export types for convenience

type TaskState

type TaskState = types.TaskState

Re-export types for convenience

Directories

Path Synopsis
Package broker defines the interface for task queue backends.
Package broker defines the interface for task queue backends.
redis
Package redis provides a Redis Streams implementation of the Broker interface.
Package redis provides a Redis Streams implementation of the Broker interface.
examples
basic command
Package main demonstrates basic usage of strego.
Package main demonstrates basic usage of strego.
production command
Package main demonstrates strego with PostgreSQL - production-like example.
Package main demonstrates strego with PostgreSQL - production-like example.
with-postgres command
Package main demonstrates strego with PostgreSQL for task history and full UI features.
Package main demonstrates strego with PostgreSQL for task history and full UI features.
Package store defines the interface for task persistence.
Package store defines the interface for task persistence.
postgres
Package postgres provides a PostgreSQL implementation of the Store interface.
Package postgres provides a PostgreSQL implementation of the Store interface.
Package types provides core types for the strego task queue system.
Package types provides core types for the strego task queue system.
Package ui provides a built-in web dashboard for strego.
Package ui provides a built-in web dashboard for strego.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL