Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Bad WIP 2
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
  • Loading branch information
Karakatiza666 committed Dec 2, 2025
commit 7e633af424b2ca12ac744ec4e244afae9a160855
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,10 @@ example = "1.0"`
{changes}
onCancel={() => (contextDrawer.content = null)}
onApprove={async () => {
const { waitFor } = await pipelineAction.postPipelineAction(
await pipelineAction.postPipelineAction(
pipeline.current.name,
'approve_changes'
)
await waitFor()
}}
>
{#snippet titleEnd()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ groups related actions into multi-action dropdowns when multiple options are ava
import { slide } from 'svelte/transition'
import { useIsMobile } from '$lib/compositions/layout/useIsMobile.svelte'
import { usePipelineAction } from '$lib/compositions/usePipelineAction.svelte'
import { usePipelineActionCallbacks } from '$lib/compositions/pipelines/usePipelineActionCallbacks.svelte'
import type { WritablePipeline } from '$lib/compositions/useWritablePipeline.svelte'

let {
Expand Down Expand Up @@ -328,24 +327,13 @@ groups related actions into multi-action dropdowns when multiple options are ava
const importantBtnColor = 'preset-filled-primary-500'

const { postPipelineAction } = usePipelineAction()
const pipelineActionCallbacks = usePipelineActionCallbacks()

const performStartAction = async (action: PipelineAction) => {
const callbacks =
action === 'start'
? {
onPausedReady: async (pipelineName: string) => {
const cbs = pipelineActionCallbacks.getAll(pipelineName, 'start_paused')
await Promise.allSettled(cbs.map((x) => x(pipelineName)))
}
}
: undefined

const { waitFor } = await postPipelineAction(pipeline.current.name, action, callbacks)
waitFor().then(
() => {},
toastError
)
try {
await postPipelineAction(pipeline.current.name, action)
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
}

// Static multi-action dropdown configurations
Expand Down Expand Up @@ -411,9 +399,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
label: 'Pause',
description: 'Pause the running pipeline',
onclick: async () => {
const pipelineName = pipeline.current.name
const { waitFor } = await postPipelineAction(pipelineName, 'pause')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipeline.current.name, 'pause')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
disabled: () => false,
standaloneButton: _pause
Expand All @@ -422,9 +412,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
label: 'Start in Standby',
description: 'Put the pipeline in standby mode',
onclick: async () => {
const pipelineName = pipeline.current.name
const { waitFor } = await postPipelineAction(pipelineName, 'standby')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipeline.current.name, 'standby')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
disabled: () => false,
standaloneButton: _standby
Expand All @@ -433,9 +425,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
label: 'Activate',
description: 'Activate the pipeline to start data ingress and processing',
onclick: async () => {
const pipelineName = pipeline.current.name
const { waitFor } = await postPipelineAction(pipelineName, 'activate')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipeline.current.name, 'activate')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
disabled: () => false,
standaloneButton: _activate
Expand All @@ -449,8 +443,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
'Clear',
(name) => `Clear ${name} pipeline storage?`,
async (pipelineName: string) => {
const { waitFor } = await postPipelineAction(pipelineName, 'clear')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipelineName, 'clear')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
'This will delete all checkpoints.'
)(pipeline.current.name)}
Expand All @@ -477,8 +474,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
'Force stop',
(name) => `Force stop ${name} pipeline?`,
async (pipelineName: string) => {
const { waitFor } = await postPipelineAction(pipelineName, 'kill')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipelineName, 'kill')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
'The pipeline will stop processing inputs without making a checkpoint, leaving only a previous one, if any.'
)(pipeline.current.name)}
Expand All @@ -492,8 +492,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
'Stop',
(name) => `Stop ${name} pipeline?`,
async (pipelineName: string) => {
const { waitFor } = await postPipelineAction(pipelineName, 'stop')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipelineName, 'stop')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
},
'The pipeline will stop processing inputs and make a checkpoint of its state.'
)(pipeline.current.name)}
Expand Down Expand Up @@ -738,9 +741,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
<button
class="hidden sm:flex {buttonClass} {longClass} {basicBtnColor}"
onclick={async () => {
const pipelineName = pipeline.current.name
const { waitFor } = await postPipelineAction(pipelineName, 'pause')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipeline.current.name, 'pause')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
}}
>
<span class="fd fd-pause {iconClass}"></span>
Expand All @@ -750,9 +755,11 @@ groups related actions into multi-action dropdowns when multiple options are ava
<button
class="flex sm:hidden {buttonClass} {shortClass} {basicBtnColor} {iconClass}"
onclick={async () => {
const pipelineName = pipeline.current.name
const { waitFor } = await postPipelineAction(pipelineName, 'pause')
waitFor().then(() => {}, toastError)
try {
await postPipelineAction(pipeline.current.name, 'pause')
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
}}
>
<span class="fd fd-pause {iconClass}"></span>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,27 @@
import { usePipelineManager } from '$lib/compositions/usePipelineManager.svelte'
import { usePremiumFeatures } from '$lib/compositions/usePremiumFeatures.svelte'
import { usePipelineAction } from '$lib/compositions/usePipelineAction.svelte'
import { usePipelineActionCallbacks } from '$lib/compositions/pipelines/usePipelineActionCallbacks.svelte'
import type { PipelineAction as Action } from '$lib/services/pipelineManager'

let {
pipelines,
selectedPipelines = $bindable()
}: { pipelines: PipelineThumb[]; selectedPipelines: string[] } = $props()
const { updatePipelines, updatePipeline } = useUpdatePipelineList()
const pipelineActionCallbacks = usePipelineActionCallbacks()
const sortedSelectedPipelines = $derived([...selectedPipelines].sort())

// Helper to wait for a pipeline action to complete
const waitForAction = (pipelineName: string, action: Action): Promise<void> => {
return new Promise((resolve, reject) => {
const callback = async () => {
pipelineActionCallbacks.remove(pipelineName, action, callback)
resolve()
}
pipelineActionCallbacks.add(pipelineName, action, callback)
})
}
const availableActions = [
'start' as const,
'resume' as const,
Expand Down Expand Up @@ -120,23 +135,25 @@
const { postPipelineAction } = usePipelineAction()
let deletePipelines = () => {
selected.forEach(async (pipeline) => {
if (!isPipelineCodeEditable(pipeline.status)) {
const { waitFor } = await postPipelineAction(
pipeline.name,
isPremium.value ? 'stop' : 'kill'
)
updatePipeline(pipeline.name, (p) => ({
...p,
status: isPremium.value ? 'Stopping' : 'Stopping'
}))
await waitFor().catch(toastError)
}
if (pipeline.storageStatus !== 'Cleared') {
const { waitFor } = await postPipelineAction(pipeline.name, 'clear')
updatePipeline(pipeline.name, (p) => ({ ...p, storageStatus: 'Clearing' }))
await waitFor().catch(toastError)
try {
// Step 1: Stop the pipeline if it's running
if (!isPipelineCodeEditable(pipeline.status)) {
const stopAction = isPremium.value ? 'stop' : 'kill'
await postPipelineAction(pipeline.name, stopAction)
await waitForAction(pipeline.name, stopAction)
}

// Step 2: Clear storage if in use
if (pipeline.storageStatus !== 'Cleared') {
await postPipelineAction(pipeline.name, 'clear')
await waitForAction(pipeline.name, 'clear')
}

// Step 3: Delete the pipeline
await api.deletePipeline(pipeline.name)
} catch (error) {
toastError(error instanceof Error ? error : new Error(String(error)))
}
return api.deletePipeline(pipeline.name)
})
selectedPipelines = []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@ type Cb = (pipelineName: string) => Promise<void>

type Action = PipelineAction | 'delete'

type PendingAction = {
pipelineName: string
action: Action
isDesiredState: (pipeline: PipelineThumb) => boolean
}

const callbacks: Record<string, Partial<Record<Action, Cb[]>>> = $state({})
const pendingActions: PendingAction[] = $state([])

// Map of action types to their desired state predicates
const isDesiredState: Record<Action, (pipeline: PipelineThumb) => boolean> = {
start: (p) => p.status === 'Running',
resume: (p) => p.status === 'Running',
pause: (p) => p.status === 'Paused',
start_paused: (p) => p.status === 'Paused',
stop: (p) => p.status === 'Stopped',
kill: (p) => p.status === 'Stopped',
clear: (p) => p.storageStatus === 'Cleared',
standby: (p) => p.status === 'Standby',
activate: (p) => p.status === 'Running',
approve_changes: (p) => p.status !== 'AwaitingApproval',
delete: () => false // Never check for desired state on delete - handled separately
}

const pop = (pipelineName: string, action: Action) => {
callbacks[pipelineName] ??= {}
Expand Down Expand Up @@ -48,48 +56,35 @@ export function usePipelineActionCallbacks(preloaded?: { pipelines: PipelineThum
// Execute callbacks
Promise.allSettled(allDeleteCbs.map(cb => cb(deletedPipelineName)))

// Remove pending actions for deleted pipeline
const toRemove = pendingActions.filter(pa => pa.pipelineName === deletedPipelineName)
toRemove.forEach(pa => {
const idx = pendingActions.indexOf(pa)
if (idx !== -1) {
pendingActions.splice(idx, 1)
}
})
// Clean up callbacks for deleted pipeline
delete callbacks[deletedPipelineName]
}
}

// Check pending actions for state matches
const toRemove: PendingAction[] = []
for (const pendingAction of pendingActions) {
const pipeline = currentPipelines.find(p => p.name === pendingAction.pipelineName)
// Check all registered callbacks to see if their desired states are reached
for (const pipelineName in callbacks) {
if (pipelineName === '') continue // Skip global callbacks (only used for delete)

if (!pipeline) {
// Pipeline not found, skip for now (might be deleted, handled above)
continue
}
const pipeline = currentPipelines.find(p => p.name === pipelineName)
if (!pipeline) continue // Pipeline not found

// Check if desired state is reached
if (pendingAction.isDesiredState(pipeline)) {
// Get callbacks for this action
const cbs = callbacks[pendingAction.pipelineName]?.[pendingAction.action] ?? []
for (const action in callbacks[pipelineName]) {
const actionKey = action as Action
if (actionKey === 'delete') continue // Delete handled above

// Execute callbacks
Promise.allSettled(cbs.map(cb => cb(pendingAction.pipelineName)))
const cbs = callbacks[pipelineName][actionKey] ?? []
if (cbs.length === 0) continue

// Mark for removal
toRemove.push(pendingAction)
// Check if desired state is reached
if (isDesiredState[actionKey](pipeline)) {
// Execute all callbacks for this action
// Note: Callbacks are NOT automatically removed - callers must remove them explicitly
// callbacks[pipelineName][actionKey] = []
Promise.allSettled(cbs.map(cb => cb(pipelineName)))
}
}
}

// Remove completed pending actions
toRemove.forEach(pa => {
const idx = pendingActions.indexOf(pa)
if (idx !== -1) {
pendingActions.splice(idx, 1)
}
})

// Update previous pipeline names
previousPipelineNames = currentPipelineNames
})
Expand All @@ -109,17 +104,6 @@ export function usePipelineActionCallbacks(preloaded?: { pipelines: PipelineThum
}
callbacks[pipelineName][action].splice(idx, 1)
},
registerPendingAction(
pipelineName: string,
action: Action,
isDesiredState: (pipeline: PipelineThumb) => boolean
) {
pendingActions.push({
pipelineName,
action,
isDesiredState
})
},
pop,
popIterator: (pipelineName: string, action: Action) => ({
[Symbol.iterator](): Iterator<Cb> {
Expand Down
Loading