Documentation
¶
Overview ¶
Package file provides file-based persistence implementation for workflows and triggers.
Package file provides file-based input coordination persistence.
Index ¶
- func NewPersistence(root string) persistence.Persistence
- type ExecutionContextRepository
- func (ecr *ExecutionContextRepository) GetExecutionContext(ctx context.Context, executionID string) (*models.ExecutionContext, error)
- func (ecr *ExecutionContextRepository) GetExecutionsByStatus(ctx context.Context, status models.ExecutionStatus) ([]*models.ExecutionContext, error)
- func (ecr *ExecutionContextRepository) GetExecutionsByWorkflow(ctx context.Context, workflowID string) ([]*models.ExecutionContext, error)
- func (ecr *ExecutionContextRepository) SaveExecutionContext(ctx context.Context, execCtx *models.ExecutionContext) error
- func (ecr *ExecutionContextRepository) UpdateExecutionContext(ctx context.Context, execCtx *models.ExecutionContext) error
- type FileInputCoordinationRepository
- func (r *FileInputCoordinationRepository) CleanupExpiredStates(ctx context.Context, maxAge time.Duration) error
- func (r *FileInputCoordinationRepository) DeleteInputState(ctx context.Context, nodeExecutionID string) error
- func (r *FileInputCoordinationRepository) FindPendingNodeExecution(ctx context.Context, nodeID, executionID string) (*models.NodeInputState, error)
- func (r *FileInputCoordinationRepository) LoadInputState(ctx context.Context, nodeExecutionID string) (*models.NodeInputState, error)
- func (r *FileInputCoordinationRepository) SaveInputState(ctx context.Context, state *models.NodeInputState) error
- type Persistence
- func (fp *Persistence) Close(_ context.Context) error
- func (fp *Persistence) ConnectionRepository() persistence.ConnectionRepository
- func (fp *Persistence) ExecutionContextRepository() persistence.ExecutionContextRepository
- func (fp *Persistence) HealthCheck(_ context.Context) error
- func (fp *Persistence) InputCoordinationRepository() persistence.InputCoordinationRepository
- func (fp *Persistence) NodeRepository() persistence.NodeRepository
- func (fp *Persistence) WorkflowRepository() persistence.WorkflowRepository
- type WorkflowRepository
- func (wr *WorkflowRepository) CreateDraftFromPublished(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
- func (wr *WorkflowRepository) Delete(_ context.Context, id string) error
- func (wr *WorkflowRepository) GetAll(ctx context.Context) ([]*models.Workflow, error)
- func (wr *WorkflowRepository) GetByID(_ context.Context, workflowID string) (*models.Workflow, error)
- func (wr *WorkflowRepository) GetCurrentWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
- func (wr *WorkflowRepository) GetDraftWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
- func (wr *WorkflowRepository) GetPublishedWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
- func (wr *WorkflowRepository) GetWorkflowVersions(ctx context.Context, workflowGroupID string) ([]*models.Workflow, error)
- func (wr *WorkflowRepository) PublishWorkflow(ctx context.Context, workflowID string) error
- func (wr *WorkflowRepository) Save(_ context.Context, workflow *models.Workflow) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewPersistence ¶
func NewPersistence(root string) persistence.Persistence
NewPersistence creates a new instance of Persistence with the specified root directory.
Types ¶
type ExecutionContextRepository ¶
type ExecutionContextRepository struct {
// contains filtered or unexported fields
}
ExecutionContextRepository handles execution context-related file operations.
func NewExecutionContextRepository ¶
func NewExecutionContextRepository(root string) *ExecutionContextRepository
NewExecutionContextRepository creates a new execution context repository.
func (*ExecutionContextRepository) GetExecutionContext ¶
func (ecr *ExecutionContextRepository) GetExecutionContext(ctx context.Context, executionID string) (*models.ExecutionContext, error)
GetExecutionContext retrieves an execution context by its ID from the file system.
func (*ExecutionContextRepository) GetExecutionsByStatus ¶
func (ecr *ExecutionContextRepository) GetExecutionsByStatus(ctx context.Context, status models.ExecutionStatus) ([]*models.ExecutionContext, error)
GetExecutionsByStatus retrieves all execution contexts with a specific status.
func (*ExecutionContextRepository) GetExecutionsByWorkflow ¶
func (ecr *ExecutionContextRepository) GetExecutionsByWorkflow(ctx context.Context, workflowID string) ([]*models.ExecutionContext, error)
GetExecutionsByWorkflow retrieves all execution contexts for a specific workflow.
func (*ExecutionContextRepository) SaveExecutionContext ¶
func (ecr *ExecutionContextRepository) SaveExecutionContext(ctx context.Context, execCtx *models.ExecutionContext) error
SaveExecutionContext saves an execution context to the file system.
func (*ExecutionContextRepository) UpdateExecutionContext ¶
func (ecr *ExecutionContextRepository) UpdateExecutionContext(ctx context.Context, execCtx *models.ExecutionContext) error
UpdateExecutionContext updates an existing execution context in the file system.
type FileInputCoordinationRepository ¶
type FileInputCoordinationRepository struct {
// contains filtered or unexported fields
}
FileInputCoordinationRepository implements input coordination persistence using JSON files.
func NewFileInputCoordinationRepository ¶
func NewFileInputCoordinationRepository(baseDir string) *FileInputCoordinationRepository
NewFileInputCoordinationRepository creates a new file-based input coordination repository.
func (*FileInputCoordinationRepository) CleanupExpiredStates ¶
func (r *FileInputCoordinationRepository) CleanupExpiredStates(ctx context.Context, maxAge time.Duration) error
CleanupExpiredStates removes old input state files.
func (*FileInputCoordinationRepository) DeleteInputState ¶
func (r *FileInputCoordinationRepository) DeleteInputState(ctx context.Context, nodeExecutionID string) error
DeleteInputState removes input state file.
func (*FileInputCoordinationRepository) FindPendingNodeExecution ¶
func (r *FileInputCoordinationRepository) FindPendingNodeExecution(ctx context.Context, nodeID, executionID string) (*models.NodeInputState, error)
FindPendingNodeExecution finds the first pending execution for a node (for FIFO loop handling).
func (*FileInputCoordinationRepository) LoadInputState ¶
func (r *FileInputCoordinationRepository) LoadInputState(ctx context.Context, nodeExecutionID string) (*models.NodeInputState, error)
LoadInputState loads input state from a JSON file.
func (*FileInputCoordinationRepository) SaveInputState ¶
func (r *FileInputCoordinationRepository) SaveInputState(ctx context.Context, state *models.NodeInputState) error
SaveInputState persists the input state to a JSON file.
type Persistence ¶
type Persistence struct {
// contains filtered or unexported fields
}
Persistence implements the persistence.Persistence interface using the file system.
func (*Persistence) Close ¶
func (fp *Persistence) Close(_ context.Context) error
Close performs any necessary cleanup. For file-based persistence, there is nothing to clean up.
func (*Persistence) ConnectionRepository ¶
func (fp *Persistence) ConnectionRepository() persistence.ConnectionRepository
func (*Persistence) ExecutionContextRepository ¶
func (fp *Persistence) ExecutionContextRepository() persistence.ExecutionContextRepository
func (*Persistence) HealthCheck ¶
func (fp *Persistence) HealthCheck(_ context.Context) error
HealthCheck checks if the file persistence layer is healthy by verifying the root directory exists.
func (*Persistence) InputCoordinationRepository ¶
func (fp *Persistence) InputCoordinationRepository() persistence.InputCoordinationRepository
func (*Persistence) NodeRepository ¶
func (fp *Persistence) NodeRepository() persistence.NodeRepository
func (*Persistence) WorkflowRepository ¶
func (fp *Persistence) WorkflowRepository() persistence.WorkflowRepository
WorkflowRepository returns the workflow repository implementation for file persistence.
type WorkflowRepository ¶
type WorkflowRepository struct {
// contains filtered or unexported fields
}
WorkflowRepository handles workflow-related file operations.
func NewWorkflowRepository ¶
func NewWorkflowRepository(root string) *WorkflowRepository
NewWorkflowRepository creates a new workflow repository.
func (*WorkflowRepository) CreateDraftFromPublished ¶
func (wr *WorkflowRepository) CreateDraftFromPublished(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
CreateDraftFromPublished creates a draft copy from published version.
func (*WorkflowRepository) Delete ¶
func (wr *WorkflowRepository) Delete(_ context.Context, id string) error
Delete removes a workflow by its ID.
func (*WorkflowRepository) GetByID ¶
func (wr *WorkflowRepository) GetByID(_ context.Context, workflowID string) (*models.Workflow, error)
GetByID retrieves a workflow by its ID from the file system.
func (*WorkflowRepository) GetCurrentWorkflow ¶
func (wr *WorkflowRepository) GetCurrentWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
GetCurrentWorkflow returns the current version (published if exists, otherwise draft).
func (*WorkflowRepository) GetDraftWorkflow ¶
func (wr *WorkflowRepository) GetDraftWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
GetDraftWorkflow returns the draft version of a workflow group.
func (*WorkflowRepository) GetPublishedWorkflow ¶
func (wr *WorkflowRepository) GetPublishedWorkflow(ctx context.Context, workflowGroupID string) (*models.Workflow, error)
GetPublishedWorkflow returns the published version of a workflow group.
func (*WorkflowRepository) GetWorkflowVersions ¶
func (wr *WorkflowRepository) GetWorkflowVersions(ctx context.Context, workflowGroupID string) ([]*models.Workflow, error)
GetWorkflowVersions returns all versions of a workflow by group ID.
func (*WorkflowRepository) PublishWorkflow ¶
func (wr *WorkflowRepository) PublishWorkflow(ctx context.Context, workflowID string) error
PublishWorkflow handles the publish operation.