jobqueue

package
v0.0.0-...-4a6608b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Redis key prefixes
	JobKeyPrefix     = "job:"
	JobQueueKey      = "job_queue"
	JobProcessingKey = "job_processing"
	JobRetryKey      = "job_retry"
	JobStatsKey      = "job_stats"

	// Job settings
	DefaultMaxRetries = 3
	JobTTL            = 24 * time.Hour // Jobs expire after 24 hours
)

Variables

View Source
var ErrRequeue = fmt.Errorf("requeue job for another node")

Functions

func EnqueueImageProcessing

func EnqueueImageProcessing(image *models.Image) error

EnqueueImageProcessing enqueues an image processing job in the unified queue This replaces the old imageprocessor.ProcessImage function

func ProcessImageUnified

func ProcessImageUnified(image *models.Image) error

ProcessImageUnified is the new unified function that replaces imageprocessor.ProcessImage This function should be used instead of the old imageprocessor.ProcessImage

Types

type DeleteImageJobPayload

type DeleteImageJobPayload struct {
	ImageID       uint   `json:"image_id"`
	ImageUUID     string `json:"image_uuid"`
	FromReportID  *uint  `json:"from_report_id,omitempty"`
	InitiatedByID *uint  `json:"initiated_by_id,omitempty"`
}

DeleteImageJobPayload contains payload for deleting an image and its variants/files asynchronously

func DeleteImageJobPayloadFromMap

func DeleteImageJobPayloadFromMap(data map[string]interface{}) (*DeleteImageJobPayload, error)

func (DeleteImageJobPayload) ToMap

func (p DeleteImageJobPayload) ToMap() map[string]interface{}

type ImageProcessingJobPayload

type ImageProcessingJobPayload struct {
	ImageID   uint   `json:"image_id"`
	ImageUUID string `json:"image_uuid"`
	FilePath  string `json:"file_path"` // Original file path
	FileName  string `json:"file_name"` // Original file name
	FileType  string `json:"file_type"` // File extension (.jpg, .png, etc.)
	PoolID    uint   `json:"pool_id"`   // Storage pool ID (routing hint)
	NodeID    string `json:"node_id"`   // Optional node ID (routing hint)
}

ImageProcessingJobPayload contains the payload for image processing jobs

func ImageProcessingJobPayloadFromMap

func ImageProcessingJobPayloadFromMap(data map[string]interface{}) (*ImageProcessingJobPayload, error)

FromMap creates a payload from a map

func (ImageProcessingJobPayload) ToMap

func (p ImageProcessingJobPayload) ToMap() map[string]interface{}

ToMap converts the payload to a map for storage

type Job

type Job struct {
	ID          string                 `json:"id"`
	Type        JobType                `json:"type"`
	Status      JobStatus              `json:"status"`
	Payload     map[string]interface{} `json:"payload"`
	CreatedAt   time.Time              `json:"created_at"`
	UpdatedAt   time.Time              `json:"updated_at"`
	ProcessedAt *time.Time             `json:"processed_at,omitempty"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	ErrorMsg    string                 `json:"error_msg,omitempty"`
	RetryCount  int                    `json:"retry_count"`
	MaxRetries  int                    `json:"max_retries"`
}

Job represents a background job

func (*Job) IsRetryable

func (j *Job) IsRetryable() bool

IsRetryable checks if the job can be retried

func (*Job) MarkAsCompleted

func (j *Job) MarkAsCompleted()

MarkAsCompleted updates the job status to completed

func (*Job) MarkAsFailed

func (j *Job) MarkAsFailed(errorMsg string)

MarkAsFailed updates the job status to failed

func (*Job) MarkAsProcessing

func (j *Job) MarkAsProcessing()

MarkAsProcessing updates the job status to processing

func (*Job) MarkAsRetrying

func (j *Job) MarkAsRetrying()

MarkAsRetrying updates the job status to retrying

type JobStatus

type JobStatus string

JobStatus defines the status of a job

const (
	JobStatusPending    JobStatus = "pending"
	JobStatusProcessing JobStatus = "processing"
	JobStatusCompleted  JobStatus = "completed"
	JobStatusFailed     JobStatus = "failed"
	JobStatusRetrying   JobStatus = "retrying"
)

type JobType

type JobType string

JobType defines the type of job

const (
	JobTypeImageProcessing   JobType = "image_processing"
	JobTypePoolMoveEnqueue   JobType = "pool_move_enqueue"
	JobTypeMoveImage         JobType = "move_image"
	JobTypeDeleteImage       JobType = "delete_image"
	JobTypeReconcileVariants JobType = "reconcile_variants"
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the global job queue and background tasks

func GetManager

func GetManager() *Manager

GetManager returns the global job queue manager (singleton)

func (*Manager) GetQueue

func (m *Manager) GetQueue() *Queue

GetQueue returns the managed job queue

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is currently running

func (*Manager) RunTieringSweepOnce

func (m *Manager) RunTieringSweepOnce() error

RunTieringSweepOnce exposes a manual trigger for a single tiering sweep (admin use).

func (*Manager) Start

func (m *Manager) Start()

Start starts the job queue and background tasks

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the job queue and background tasks

type MoveImageJobPayload

type MoveImageJobPayload struct {
	ImageID      uint `json:"image_id"`
	SourcePoolID uint `json:"source_pool_id"`
	TargetPoolID uint `json:"target_pool_id"`
}

MoveImageJobPayload contains payload for moving a single image+variants between pools

func MoveImageJobPayloadFromMap

func MoveImageJobPayloadFromMap(data map[string]interface{}) (*MoveImageJobPayload, error)

func (MoveImageJobPayload) ToMap

func (p MoveImageJobPayload) ToMap() map[string]interface{}

type PoolMoveEnqueueJobPayload

type PoolMoveEnqueueJobPayload struct {
	SourcePoolID uint `json:"source_pool_id"`
	TargetPoolID uint `json:"target_pool_id"`
	CursorID     uint `json:"cursor_id"` // last processed Image.ID; 0 = start
}

PoolMoveEnqueueJobPayload contains payload for scanning a source pool and enqueuing per-image move jobs

func PoolMoveEnqueueJobPayloadFromMap

func PoolMoveEnqueueJobPayloadFromMap(data map[string]interface{}) (*PoolMoveEnqueueJobPayload, error)

func (PoolMoveEnqueueJobPayload) ToMap

func (p PoolMoveEnqueueJobPayload) ToMap() map[string]interface{}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue manages background jobs using Redis

func NewQueue

func NewQueue(workers int) *Queue

NewQueue creates a new job queue

func (*Queue) EnqueueDeleteImageJob

func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)

EnqueueDeleteImageJob enqueues an asynchronous delete job for an image

func (*Queue) EnqueueJob

func (q *Queue) EnqueueJob(jobType JobType, payload map[string]interface{}) (*Job, error)

EnqueueJob adds a new job to the queue

func (*Queue) GetJob

func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID

func (*Queue) GetJobStats

func (q *Queue) GetJobStats(ctx context.Context) (map[JobStatus]int64, error)

GetJobStats returns statistics about job statuses

func (*Queue) GetProcessingSize

func (q *Queue) GetProcessingSize(ctx context.Context) (int64, error)

GetProcessingSize returns the number of jobs being processed

func (*Queue) GetQueueSize

func (q *Queue) GetQueueSize(ctx context.Context) (int64, error)

GetQueueSize returns the number of pending jobs

func (*Queue) Start

func (q *Queue) Start()

Start starts the job queue workers

func (*Queue) Stop

func (q *Queue) Stop()

Stop stops the job queue workers

type ReconcileVariantsJobPayload

type ReconcileVariantsJobPayload struct {
	ImageID      uint   `json:"image_id"`
	ImageUUID    string `json:"image_uuid"`
	TargetPoolID uint   `json:"target_pool_id"`
}

ReconcileVariantsJobPayload contains payload for moving late-created variants to the image's current pool

func ReconcileVariantsJobPayloadFromMap

func ReconcileVariantsJobPayloadFromMap(data map[string]interface{}) (*ReconcileVariantsJobPayload, error)

func (ReconcileVariantsJobPayload) ToMap

func (p ReconcileVariantsJobPayload) ToMap() map[string]interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL