Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
WIP 6
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
  • Loading branch information
Karakatiza666 committed Dec 2, 2025
commit 17a3054cdf914a2c3c63cb8ae409eb8ba9b1605e
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
$effect(() => {
untrack(() => pipelineActionCallbacks.add('', 'delete', forgetCurrentTab))
return () => {
pipelineActionCallbacks.remove('', 'start_paused', forgetCurrentTab)
pipelineActionCallbacks.remove('', 'delete', forgetCurrentTab)
}
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import type {
PipelineAction,
PipelineStatus,
PipelineThumb
} from '$lib/services/pipelineManager'
import type { PipelineAction, PipelineStatus, PipelineThumb } from '$lib/services/pipelineManager'
import { untrack } from 'svelte'
import { usePipelineList } from './usePipelineList.svelte'
import { fsm } from '@githubnext/tiny-svelte-fsm'
import { match, P } from 'ts-pattern'

type Cb = (pipelineName: string) => Promise<void>

type Action = PipelineAction | 'delete'
type Action = PipelineAction | 'delete' | 'rename'

const callbacks: Record<string, Partial<Record<Action, Cb[]>>> = $state({})

Expand Down Expand Up @@ -71,81 +68,65 @@ const parseStateKey = (key: FSMState): { status: string; storageStatus: StorageS
const isStoppedOrError = (status: string) =>
['Stopped', 'SqlError', 'RustError', 'SystemError'].includes(status)

// Check which action matches a transition
const isRunning = (status: string) => ['Running', 'Paused'].includes(status)

/**
* Check if an action matches a transition
*/
const matchActionForTransition = (from: FSMState, to: FSMState): Action[] => {
const fromState = parseStateKey(from)
const toState = parseStateKey(to)

const matchedActions: Action[] = []

// Pipeline status transitions
if (fromState.status !== toState.status) {
// start_paused: Stopped/Error → Paused
if (isStoppedOrError(fromState.status) && toState.status === 'Paused') {
matchedActions.push('start_paused')
}

// start: Stopped/Error → Running
if (isStoppedOrError(fromState.status) && toState.status === 'Running') {
matchedActions.push('start')
}

// resume: Paused → Running
if (fromState.status === 'Paused' && toState.status === 'Running') {
matchedActions.push('resume')
}

// pause: Running → Paused
if (fromState.status === 'Running' && toState.status === 'Paused') {
matchedActions.push('pause')
}

// stop: Running/Paused → Stopped
if (['Running', 'Paused'].includes(fromState.status) && toState.status === 'Stopped') {
matchedActions.push('stop')
}

// kill: Any non-Stopped → Stopped
if (fromState.status !== 'Stopped' && toState.status === 'Stopped') {
matchedActions.push('kill')
}

// standby: Any → Standby
if (toState.status === 'Standby') {
matchedActions.push('standby')
}

// activate: Standby → Running
if (fromState.status === 'Standby' && toState.status === 'Running') {
matchedActions.push('activate')
}

// approve_changes: AwaitingApproval → Any other
if (fromState.status === 'AwaitingApproval' && toState.status !== 'AwaitingApproval') {
matchedActions.push('approve_changes')
}
}
matchedActions.push(
...match([fromState.status, toState.status])
.returnType<Action[]>()
.when(
([from, to]) => from === to,
() => []
)
.with([P.when(isStoppedOrError), 'Paused'], () => ['start_paused'])
.with([P.when(isStoppedOrError), 'Running'], () => ['start'])
.with(['Paused', 'Running'], () => ['resume'])
.with(['Running', 'Paused'], () => ['pause'])
.with([P.when(isRunning), 'Stopped'], () => ['stop'])
.with([P.not('Stopped'), 'Stopped'], () => ['kill'])
.with([P.any, 'Standby'], () => ['standby'])
.with(['Standby', 'Running'], () => ['activate'])
.with(['AwaitingApproval', P.when(isRunning)], () => ['approve_changes'])
.otherwise(() => [])
)

// Storage status transitions
if (fromState.storageStatus !== toState.storageStatus) {
// clear: Any → Cleared
if (fromState.storageStatus !== 'Cleared' && toState.storageStatus === 'Cleared') {
matchedActions.push('clear')
}
}
matchedActions.push(
...match([fromState.storageStatus, toState.storageStatus])
.returnType<Action[]>()
.when(
([from, to]) => from === to,
() => []
)
.with([P.not('Cleared'), 'Cleared'], () => ['clear'])
.otherwise(() => [])
)

return matchedActions
}

// Create FSM for tracking pipeline state transitions
/**
* Create FSM for tracking pipeline state transitions
*/
const createPipelineFSM = (initialState: FSMState, pipelineName: string) => {
// Use wildcard pattern to allow any state to transition to any other state
return fsm<FSMState, FSMEvent>(initialState, {
'*': {
transition: (newState: FSMState) => newState,
_enter: ({ from, to }: { from: FSMState | null; to: FSMState }) => {
// Skip initial state entry
if (from === null) return
if (from === null) {
return
}

// Detect which actions this transition represents
const actions = matchActionForTransition(from, to)
Expand All @@ -171,38 +152,78 @@ const pop = (pipelineName: string, action: Action) => {
return callbacks[pipelineName][action].pop()
}

/**
* Reactive composition for monitoring pipeline state transitions and executing callbacks.
*
* Tracks stable state transitions (Stopped → Running, Running → Paused, etc.) and automatically
* executes registered callbacks when actions complete. Transitional states (Preparing, Provisioning,
* etc.) are ignored.
*
* - Registered callbacks are NOT automatically removed - callers control lifecycle
* - Uses FSM per pipeline to track state transitions and derive actions based on that
* - Detects pipeline deletions (by ID) and executes 'delete' callbacks
* - Handles pipeline renames by migrating callbacks and FSM state, and executes 'rename' callbacks
*
* @param preloaded - Optional preloaded pipeline data
*/
export function usePipelineActionCallbacks(preloaded?: { pipelines: PipelineThumb[] }) {
const pipelineList = usePipelineList(preloaded)

// Track previous pipeline names to detect deletions
let previousPipelineNames = $state<Set<string>>(new Set())
// Track previous pipelines by ID to detect deletions and renames
let previousPipelinesById = $state<Map<string, PipelineThumb>>(new Map())

// Reactive effect to monitor pipeline changes and trigger callbacks
$effect(() => {
const currentPipelines = pipelineList.pipelines
untrack(() => {
const currentPipelineNames = new Set(currentPipelines.map((p) => p.name))
const currentPipelinesById = new Map(currentPipelines.map((p) => [p.id, p]))

// Check for deleted pipelines
const deletedPipelines = Array.from(previousPipelineNames).filter(
(name) => !currentPipelineNames.has(name)
)
// Only process changes if we had pipelines before (not initial load)
// Check for deleted and renamed pipelines
for (const [id, oldPipeline] of previousPipelinesById) {
const currentPipeline = currentPipelinesById.get(id)

if (!currentPipeline) {
// Pipeline deleted (ID no longer exists)
const oldName = oldPipeline.name

// Only process deletions if we had pipelines before (not initial load)
if (previousPipelineNames.size > 0) {
for (const deletedPipelineName of deletedPipelines) {
// Call delete callbacks for specific pipeline
const specificCbs = callbacks[deletedPipelineName]?.['delete'] ?? []
const specificCbs = callbacks[oldName]?.['delete'] ?? []
// Call global delete callbacks
const globalCbs = callbacks['']?.['delete'] ?? []
const allDeleteCbs = [...specificCbs, ...globalCbs]

// Execute callbacks
Promise.allSettled(allDeleteCbs.map((cb) => cb(deletedPipelineName)))
Promise.allSettled(allDeleteCbs.map((cb) => cb(oldName)))

// Clean up callbacks and FSM for deleted pipeline
delete callbacks[deletedPipelineName]
delete pipelineFSMs[deletedPipelineName]
delete callbacks[oldName]
delete pipelineFSMs[oldName]
} else if (oldPipeline.name !== currentPipeline.name) {
// Pipeline renamed (same ID, different name)
const oldName = oldPipeline.name
const newName = currentPipeline.name

// Call rename callbacks for specific pipeline
const specificCbs = callbacks[oldName]?.['rename'] ?? []
// Call global rename callbacks
const globalCbs = callbacks['']?.['rename'] ?? []
const allRenameCbs = [...specificCbs, ...globalCbs]

// Execute callbacks (passing the old name)
Promise.allSettled(allRenameCbs.map((cb) => cb(oldName)))

// Migrate callbacks from old name to new name (excluding rename callbacks)
if (callbacks[oldName]) {
callbacks[newName] = callbacks[oldName]
delete callbacks[oldName]
}

// Migrate FSM from old name to new name
if (pipelineFSMs[oldName]) {
pipelineFSMs[newName] = pipelineFSMs[oldName]
delete pipelineFSMs[oldName]
}
}
}

Expand All @@ -227,12 +248,37 @@ export function usePipelineActionCallbacks(preloaded?: { pipelines: PipelineThum
pipelineFSMs[pipelineName].send('transition', currentStateKey)
}

// Update previous pipeline names
previousPipelineNames = currentPipelineNames
// Update previous pipelines
previousPipelinesById = currentPipelinesById
})
})
}

/**
* Get API for managing pipeline action callbacks.
*
* Provides methods to register, remove, and query callbacks that execute when pipeline
* actions complete (detected via state transitions).
*
* Use empty string `''` as pipelineName for global callbacks (supported for 'delete' and 'rename' actions).
*
* @returns Callback management API with add, remove, pop, popAll, and getAll methods
*
* @example
* ```ts
* const api = getPipelineActionCallbacks()
*
* // Register callback for when pipeline starts
* const callback = async (name) => console.log(`${name} started`)
* api.add('my-pipeline', 'start', callback)
*
* // Global rename callback (receives old pipeline name)
* api.add('', 'rename', async (oldName) => console.log(`Pipeline ${oldName} was renamed`))
*
* // Remove callback when done
* api.remove('my-pipeline', 'start', callback)
* ```
*/
export function getPipelineActionCallbacks() {
return {
add(pipelineName: string, action: Action, callback: Cb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ export const programStatusOf = (status: PipelineStatus) =>
const toPipelineThumb = (
pipeline: Omit<ExtendedPipelineDescr, 'program_code' | 'program_error' | 'udf_rust' | 'udf_toml'>
) => ({
id: pipeline.id,
name: pipeline.name,
description: pipeline.description,
storageStatus: pipeline.storage_status,
Expand Down