Documentation
¶
Index ¶
- Constants
- Variables
- func EnqueueImageProcessing(image *models.Image) error
- func ProcessImageUnified(image *models.Image) error
- type DeleteImageJobPayload
- type ImageProcessingJobPayload
- type Job
- type JobStatus
- type JobType
- type Manager
- type MoveImageJobPayload
- type PoolMoveEnqueueJobPayload
- type Queue
- func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)
- func (q *Queue) EnqueueJob(jobType JobType, payload map[string]interface{}) (*Job, error)
- func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (q *Queue) GetJobStats(ctx context.Context) (map[JobStatus]int64, error)
- func (q *Queue) GetProcessingSize(ctx context.Context) (int64, error)
- func (q *Queue) GetQueueSize(ctx context.Context) (int64, error)
- func (q *Queue) Start()
- func (q *Queue) Stop()
- type ReconcileVariantsJobPayload
Constants ¶
const ( // Redis key prefixes JobKeyPrefix = "job:" JobQueueKey = "job_queue" JobProcessingKey = "job_processing" JobRetryKey = "job_retry" JobStatsKey = "job_stats" // Job settings DefaultMaxRetries = 3 JobTTL = 24 * time.Hour // Jobs expire after 24 hours )
Variables ¶
var ErrRequeue = fmt.Errorf("requeue job for another node")
Functions ¶
func EnqueueImageProcessing ¶
EnqueueImageProcessing enqueues an image processing job in the unified queue This replaces the old imageprocessor.ProcessImage function
func ProcessImageUnified ¶
ProcessImageUnified is the new unified function that replaces imageprocessor.ProcessImage This function should be used instead of the old imageprocessor.ProcessImage
Types ¶
type DeleteImageJobPayload ¶
type DeleteImageJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
FromReportID *uint `json:"from_report_id,omitempty"`
InitiatedByID *uint `json:"initiated_by_id,omitempty"`
}
DeleteImageJobPayload contains payload for deleting an image and its variants/files asynchronously
func DeleteImageJobPayloadFromMap ¶
func DeleteImageJobPayloadFromMap(data map[string]interface{}) (*DeleteImageJobPayload, error)
func (DeleteImageJobPayload) ToMap ¶
func (p DeleteImageJobPayload) ToMap() map[string]interface{}
type ImageProcessingJobPayload ¶
type ImageProcessingJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
FilePath string `json:"file_path"` // Original file path
FileName string `json:"file_name"` // Original file name
FileType string `json:"file_type"` // File extension (.jpg, .png, etc.)
PoolID uint `json:"pool_id"` // Storage pool ID (routing hint)
NodeID string `json:"node_id"` // Optional node ID (routing hint)
}
ImageProcessingJobPayload contains the payload for image processing jobs
func ImageProcessingJobPayloadFromMap ¶
func ImageProcessingJobPayloadFromMap(data map[string]interface{}) (*ImageProcessingJobPayload, error)
FromMap creates a payload from a map
func (ImageProcessingJobPayload) ToMap ¶
func (p ImageProcessingJobPayload) ToMap() map[string]interface{}
ToMap converts the payload to a map for storage
type Job ¶
type Job struct {
ID string `json:"id"`
Type JobType `json:"type"`
Status JobStatus `json:"status"`
Payload map[string]interface{} `json:"payload"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ProcessedAt *time.Time `json:"processed_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
Job represents a background job
func (*Job) IsRetryable ¶
IsRetryable checks if the job can be retried
func (*Job) MarkAsCompleted ¶
func (j *Job) MarkAsCompleted()
MarkAsCompleted updates the job status to completed
func (*Job) MarkAsFailed ¶
MarkAsFailed updates the job status to failed
func (*Job) MarkAsProcessing ¶
func (j *Job) MarkAsProcessing()
MarkAsProcessing updates the job status to processing
func (*Job) MarkAsRetrying ¶
func (j *Job) MarkAsRetrying()
MarkAsRetrying updates the job status to retrying
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the global job queue and background tasks
func GetManager ¶
func GetManager() *Manager
GetManager returns the global job queue manager (singleton)
func (*Manager) RunTieringSweepOnce ¶
RunTieringSweepOnce exposes a manual trigger for a single tiering sweep (admin use).
type MoveImageJobPayload ¶
type MoveImageJobPayload struct {
ImageID uint `json:"image_id"`
SourcePoolID uint `json:"source_pool_id"`
TargetPoolID uint `json:"target_pool_id"`
}
MoveImageJobPayload contains payload for moving a single image+variants between pools
func MoveImageJobPayloadFromMap ¶
func MoveImageJobPayloadFromMap(data map[string]interface{}) (*MoveImageJobPayload, error)
func (MoveImageJobPayload) ToMap ¶
func (p MoveImageJobPayload) ToMap() map[string]interface{}
type PoolMoveEnqueueJobPayload ¶
type PoolMoveEnqueueJobPayload struct {
SourcePoolID uint `json:"source_pool_id"`
TargetPoolID uint `json:"target_pool_id"`
CursorID uint `json:"cursor_id"` // last processed Image.ID; 0 = start
}
PoolMoveEnqueueJobPayload contains payload for scanning a source pool and enqueuing per-image move jobs
func PoolMoveEnqueueJobPayloadFromMap ¶
func PoolMoveEnqueueJobPayloadFromMap(data map[string]interface{}) (*PoolMoveEnqueueJobPayload, error)
func (PoolMoveEnqueueJobPayload) ToMap ¶
func (p PoolMoveEnqueueJobPayload) ToMap() map[string]interface{}
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue manages background jobs using Redis
func (*Queue) EnqueueDeleteImageJob ¶
func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)
EnqueueDeleteImageJob enqueues an asynchronous delete job for an image
func (*Queue) EnqueueJob ¶
EnqueueJob adds a new job to the queue
func (*Queue) GetJobStats ¶
GetJobStats returns statistics about job statuses
func (*Queue) GetProcessingSize ¶
GetProcessingSize returns the number of jobs being processed
func (*Queue) GetQueueSize ¶
GetQueueSize returns the number of pending jobs
type ReconcileVariantsJobPayload ¶
type ReconcileVariantsJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
TargetPoolID uint `json:"target_pool_id"`
}
ReconcileVariantsJobPayload contains payload for moving late-created variants to the image's current pool
func ReconcileVariantsJobPayloadFromMap ¶
func ReconcileVariantsJobPayloadFromMap(data map[string]interface{}) (*ReconcileVariantsJobPayload, error)
func (ReconcileVariantsJobPayload) ToMap ¶
func (p ReconcileVariantsJobPayload) ToMap() map[string]interface{}