Skip to content

Commit 301b664

Browse files
committed
Minor fixes, improve checkpoint feilure reporting, report checkpoint timestamp
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent d4bc9b5 commit 301b664

File tree

5 files changed

+43
-25
lines changed

5 files changed

+43
-25
lines changed

crates/adapters/src/controller.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -500,14 +500,15 @@ impl Command {
500500
}
501501
}
502502

503-
/// Converts epoch milliseconds to a `DateTime<Utc>`, falling back to
504-
/// `Utc::now()` if the value is 0 or invalid.
503+
/// Converts epoch milliseconds to a `DateTime<Utc>`.
504+
///
505+
/// The caller must ensure `epoch_ms > 0`. The atomics that feed this
506+
/// function are always set (via `set_checkpoint_coordination`) *before*
507+
/// the watch channel is updated, so any coordination state that implies
508+
/// a non-zero timestamp will always have one.
505509
fn epoch_ms_to_datetime(epoch_ms: i64) -> DateTime<Utc> {
506-
if epoch_ms > 0 {
507-
DateTime::from_timestamp_millis(epoch_ms).unwrap_or_else(Utc::now)
508-
} else {
509-
Utc::now()
510-
}
510+
DateTime::from_timestamp_millis(epoch_ms)
511+
.expect("epoch_ms should be a valid positive timestamp set by set_checkpoint_coordination")
511512
}
512513

513514
impl Controller {
@@ -969,10 +970,7 @@ impl Controller {
969970
Some(CheckpointCoordination::InProgress) => {
970971
let epoch_ms = self.checkpoint_started_epoch_ms();
971972
let started_at = epoch_ms_to_datetime(epoch_ms);
972-
CheckpointActivity::InProgress {
973-
sequence_number: 0,
974-
started_at,
975-
}
973+
CheckpointActivity::InProgress { started_at }
976974
}
977975
None | Some(CheckpointCoordination::Done) | Some(CheckpointCoordination::Error(_)) => {
978976
CheckpointActivity::Idle

crates/feldera-types/src/checkpoint.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ pub enum CheckpointActivity {
4141

4242
/// A checkpoint is currently being written to storage.
4343
InProgress {
44-
/// Sequence number of the in-flight checkpoint.
45-
sequence_number: u64,
4644
/// When the checkpoint write started (ISO 8601).
4745
started_at: DateTime<Utc>,
4846
},

js-packages/web-console/src/lib/components/pipelines/editor/performance/CheckpointActivityStatus.svelte

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@
4040
: e === 'EnterpriseFeature'
4141
? 'Enterprise feature'
4242
: e
43-
if ('UnsupportedInputEndpoint' in e)
43+
if ('UnsupportedInputEndpoint' in e) {
4444
return `Input "${e.UnsupportedInputEndpoint}" does not support checkpointing`
45+
}
4546
return String(e)
4647
}
4748
@@ -81,20 +82,27 @@
8182
</thead>
8283
<tbody>
8384
{#if isPermanentlyUnavailable}
85+
{@const errors = permanentErrors ?? []}
86+
{@const firstReason = formatPermanentError(errors[0])}
87+
{@const extraCount = errors.length - 1}
8488
<tr>
8589
<td>
8690
<div class="pointer-events-none chip w-full bg-error-50-950 uppercase">
8791
Unavailable
8892
</div>
89-
<Tooltip placement="top">
90-
<ul class="list-disc pl-4">
91-
{#each permanentErrors ?? [] as err}
92-
<li>{formatPermanentError(err)}</li>
93-
{/each}
94-
</ul>
95-
</Tooltip>
9693
</td>
97-
<td class="text-surface-500"> Checkpointing not supported </td>
94+
<td class="">
95+
{firstReason}{#if extraCount > 0} + {extraCount} other{extraCount !== 1 ? 's' : ''}{/if}
96+
{#if extraCount > 0}
97+
<Tooltip placement="top">
98+
<ul class="list-disc pl-4">
99+
{#each errors as err}
100+
<li>{formatPermanentError(err)}</li>
101+
{/each}
102+
</ul>
103+
</Tooltip>
104+
{/if}
105+
</td>
98106
<td></td>
99107
</tr>
100108
{:else if activity.status === 'delayed'}
@@ -107,7 +115,7 @@
107115
<td class="font-dm-mono">
108116
{delayDuration}s
109117
</td>
110-
<td class="text-surface-500">
118+
<td class="">
111119
{activity.reasons.length} reason{activity.reasons.length !== 1 ? 's' : ''} (hover to view)
112120
<Tooltip placement="top">
113121
<ul class="list-disc pl-4">

js-packages/web-console/src/lib/components/pipelines/editor/performance/CheckpointsStatus.svelte

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
11
<script lang="ts">
2+
import Dayjs from 'dayjs'
23
import { slide } from 'svelte/transition'
34
import InlineDropdown from '$lib/components/common/InlineDropdown.svelte'
45
import { humanSize } from '$lib/functions/common/string'
56
import type { CheckpointMetadata } from '$lib/services/manager'
67
8+
/** Extracts the timestamp from a UUID v7 (first 48 bits = unix ms).
9+
* Returns null for non-v7 UUIDs. */
10+
const uuidV7Timestamp = (uuid: string): string | null => {
11+
// UUID v7: version nibble (char at index 14) must be '7'
12+
if (uuid[14] !== '7') return null
13+
const hex = uuid.replace(/-/g, '').slice(0, 12)
14+
const ms = parseInt(hex, 16)
15+
if (!Number.isFinite(ms) || ms <= 0) return null
16+
return Dayjs(ms).format('MMM D, YYYY HH:mm:ss')
17+
}
18+
719
const {
820
checkpoints,
921
onClose,
@@ -36,6 +48,7 @@
3648
{#each checkpoints as checkpoint}
3749
<InlineDropdown>
3850
{#snippet header(isOpen, toggle)}
51+
{@const ts = uuidV7Timestamp(checkpoint.uuid)}
3952
<div
4053
class="flex w-full cursor-pointer items-center gap-2 py-2 pr-2"
4154
onclick={toggle}
@@ -48,7 +61,7 @@
4861
></div>
4962
<div class="flex flex-1 flex-col overflow-hidden">
5063
<div class="overflow-hidden text-nowrap text-ellipsis">
51-
{humanSize(checkpoint.size ?? 0)} · {checkpoint.identifier ?? ''}
64+
{humanSize(checkpoint.size ?? 0)}{#if checkpoint.identifier} · {checkpoint.identifier}{/if}{#if ts} · <span class="text-surface-600-400">{ts}</span>{/if}
5265
</div>
5366
<div class="overflow-hidden text-sm text-nowrap text-ellipsis text-surface-500">
5467
{checkpoint.uuid}

js-packages/web-console/src/lib/services/pipelineManager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,8 @@ export const getPipelineStats = async (pipeline_name: string, options?: FetchOpt
575575
if (SIMULATE_CHECKPOINTS && controllerStatus) {
576576
controllerStatus = {
577577
...controllerStatus,
578-
checkpoint_activity: _mockCheckpoints.getCheckpointActivity(pipeline_name)
578+
checkpoint_activity: _mockCheckpoints.getCheckpointActivity(pipeline_name),
579+
permanent_checkpoint_errors: null
579580
}
580581
}
581582
return {

0 commit comments

Comments
 (0)