Skip to content

Commit edb8a01

Browse files
committed
adapters: sync: always error on network failures
Fixes the behavior where `fail_if_no_checkpoint` would also control if the pipeline fails in case of network failures. Now, the pipeline always fails in case of network failures. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent e8313ec commit edb8a01

File tree

3 files changed

+36
-37
lines changed

3 files changed

+36
-37
lines changed

crates/adapters/src/controller/sync.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,7 @@ pub fn is_pull_necessary(storage: &CircuitStorageConfig) -> Option<&SyncConfig>
128128

129129
#[cfg(feature = "feldera-enterprise")]
130130
pub fn pull_once(storage: &CircuitStorageConfig, sync: &SyncConfig) -> Result<(), ControllerError> {
131-
if let Err(err) = pull_and_gc(storage.backend.clone(), sync, &mut uuid::Uuid::nil()) {
132-
if sync.fail_if_no_checkpoint {
133-
return Err(err);
134-
}
135-
}
131+
pull_and_gc(storage.backend.clone(), sync, &mut uuid::Uuid::nil())?;
136132

137133
Ok(())
138134
}
@@ -200,7 +196,8 @@ where
200196
loop {
201197
match pull_and_gc(storage.backend.clone(), sync, &mut prev) {
202198
Err(err) => {
203-
if sync.fail_if_no_checkpoint {
199+
// On our final attempt to pull the checkpoint after activation, if we fail, we should error out and not activate with a potentially stale or missing checkpoint.
200+
if pull_once_again_after_activation {
204201
return Err(err);
205202
}
206203
}

docs.feldera.com/docs/pipelines/checkpoint-sync.md

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,29 @@ Here is a sample configuration:
3838
### `sync` configuration fields
3939

4040
| Field | Type | Default | Description |
41-
|-------------------------|-----------------|-------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
41+
| ----------------------- | --------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
4242
| `endpoint` | `string` | | The S3-compatible object store endpoint (e.g., `http://localhost:9000` for MinIO). |
4343
| `bucket`\* | `string` | | The bucket name and optional prefix to store checkpoints (e.g., `mybucket/checkpoints`). |
4444
| `region` | `string` | `us-east-1` | The region of the bucket. Leave empty for MinIO. If `provider` is AWS, and no region is specified, `us-east-1` is used. |
4545
| `provider`\* | `string` | | The S3 provider identifier. Must match [rclone’s list](https://rclone.org/s3/#providers). Case-sensitive. Use `"Other"` if unsure. |
4646
| `access_key` | `string` | | S3 access key. Not required if using environment-based auth (e.g., IRSA). |
4747
| `secret_key` | `string` | | S3 secret key. Not required if using environment-based auth. |
4848
| `start_from_checkpoint` | `string` | | Checkpoint UUID to resume from, or `latest` to restore from the latest checkpoint. |
49-
| `fail_if_no_checkpoint` | `boolean` | `false` | When `true` the pipeline will fail to initialize if fetching the specified checkpoint fails. <p> When `false`, the pipeline will start from scratch instead. Ignored if `start_from_checkpoint` is not set. </p> |
49+
| `fail_if_no_checkpoint` | `boolean` | `false` | Only applies when `start_from_checkpoint` is set to `latest`. <p> When `true`, the pipeline fails to start if no checkpoint exists in remote storage. When `false`, the pipeline starts from scratch instead. </p> |
5050
| `standby` | `boolean` | `false` | When `true`, the pipeline starts in **standby** mode. <p> To start processing the data the pipeline must be activated (`POST /activate`). </p> <p> If a previously activated pipeline is restarted without clearing storage, it auto-activates. </p> `start_from_checkpoint` must be set to use standby mode. |
5151
| `pull_interval` | `integer(u64)` | `10` | Interval (in seconds) between fetch attempts for the latest checkpoint while standby. |
52-
| `push_interval` | `integer(u64)` | | Interval (in seconds) between [automatic sync](/pipelines/checkpoint-sync#automatic-checkpoint-synchronization) of a local checkpoint to object store measured from the completion of the previous sync attempt. Disabled by default. |
52+
| `push_interval` | `integer(u64)` | | Interval (in seconds) between [automatic sync](/pipelines/checkpoint-sync#automatic-checkpoint-synchronization) of a local checkpoint to object store measured from the completion of the previous sync attempt. Disabled by default. |
5353
| `transfers` | `integer (u8)` | `20` | Number of concurrent file transfers. |
5454
| `checkers` | `integer (u8)` | `20` | Number of parallel checkers for verification. |
5555
| `ignore_checksum` | `boolean` | `false` | Skip checksum verification after transfer and only check the file size. Might improve throughput. |
5656
| `multi_thread_streams` | `integer (u8)` | `10` | Number of streams for multi-threaded downloads. |
5757
| `multi_thread_cutoff` | `string` | `100M` | File size threshold to enable multi-threaded downloads (e.g., `100M`, `1G`). Supported suffixes: `k`, `M`, `G`, `T`. |
5858
| `upload_concurrency` | `integer (u8)` | `10` | Number of concurrent chunks to upload during multipart uploads. |
5959
| `flags` | `array[string]` | | Extra flags to pass to `rclone`.<p> ⚠️ Incorrect or conflicting flags may break behavior. See [rclone flags](https://rclone.org/flags/) and [S3 flags](https://rclone.org/s3/). </p> |
60-
| `retention_min_count` | `integer (u32)` | `10` | The minimum number of checkpoints to retain in object store. No checkpoints will be deleted if the total count is below this threshold. |
61-
| `retention_min_age` | `integer (u32)` | `30` | The minimum age (in days) a checkpoint must reach before it becomes eligible for deletion. All younger checkpoints will be preserved. |
60+
| `retention_min_count` | `integer (u32)` | `10` | The minimum number of checkpoints to retain in object store. No checkpoints will be deleted if the total count is below this threshold. |
61+
| `retention_min_age` | `integer (u32)` | `30` | The minimum age (in days) a checkpoint must reach before it becomes eligible for deletion. All younger checkpoints will be preserved. |
6262

63-
*Fields marked with an asterisk are required.
63+
\*Fields marked with an asterisk are required.
6464

6565
## S3 permissions
6666

@@ -92,8 +92,8 @@ Example policy:
9292
"s3:PutObjectAcl"
9393
],
9494
"Resource": [
95-
"arn:aws:s3:::BUCKET_NAME/*",
96-
"arn:aws:s3:::BUCKET_NAME"
95+
"arn:aws:s3:::BUCKET_NAME/*",
96+
"arn:aws:s3:::BUCKET_NAME"
9797
]
9898
},
9999
{
@@ -137,6 +137,7 @@ than they are created.
137137
### Automatic checkpoint synchronization trigger conditions
138138

139139
An automatic checkpoint synchronization is only triggered when all of the following conditions are met:
140+
140141
- The configured `push_interval` has elapsed.
141142
- No checkpoint sync is currently in progress.
142143
- Checkpoint sync has not been manually requested.
@@ -172,28 +173,27 @@ When Pipeline **A** fails, you can trigger Pipeline **B** to activate and start
172173
the latest checkpoint (Checkpoint 2 in this case).
173174

174175
| Time | Pipeline A (Primary) | Pipeline B (Standby) |
175-
|---------|----------------------|-------------------------------|
176+
| ------- | -------------------- | ----------------------------- |
176177
| Step 1 | **Start** | **Standby Start** |
177-
| Step 2 | *Processing* | *Standby* |
178-
| Step 3 | *Checkpoint 1* | *Standby* |
179-
| Step 4 | *Sync 1 to S3* | *Standby* |
180-
| Step 5 | *Processing* | *Pulls Checkpoint 1* |
181-
| Step 6 | *Checkpoint 2* | *Standby* |
182-
| Step 7 | *Sync 2 to S3* | *Standby* |
183-
| Step 8 | *Processing* | *Pulls Checkpoint 2* |
184-
| Step 9 | *Failed* | *Standby* |
178+
| Step 2 | _Processing_ | _Standby_ |
179+
| Step 3 | _Checkpoint 1_ | _Standby_ |
180+
| Step 4 | _Sync 1 to S3_ | _Standby_ |
181+
| Step 5 | _Processing_ | _Pulls Checkpoint 1_ |
182+
| Step 6 | _Checkpoint 2_ | _Standby_ |
183+
| Step 7 | _Sync 2 to S3_ | _Standby_ |
184+
| Step 8 | _Processing_ | _Pulls Checkpoint 2_ |
185+
| Step 9 | _Failed_ | _Standby_ |
185186
| Step 10 | | **Activate** |
186187
| Step 11 | | **Running From Checkpoint 2** |
187188

188-
189-
190189
## Buckets with server side encryption
191190

192191
If the bucket has server side encryption enabled, set the flag
193192
[--s3-server-side-encryption](https://rclone.org/s3/#s3-server-side-encryption)
194193
in the `flags` field.
195194

196195
Example:
196+
197197
```json
198198
"sync": {
199199
"bucket": "BUCKET_NAME/DIRECTORY_NAME",
@@ -209,7 +209,7 @@ Sync performance may vary based on configuration and environment. In our testing
209209
we observed the following average speeds:
210210

211211
| Storage Type | Avg Upload Speed | Avg Download Speed | Avg Download Speed (Ignore Checksum) |
212-
|--------------|------------------|--------------------|--------------------------------------|
212+
| ------------ | ---------------- | ------------------ | ------------------------------------ |
213213
| GP3 | 650 MiB/s | 650 MiB/s | 850 MiB/s |
214214
| GP2 | 125 MiB/s | 125 MiB/s | 250 MiB/s |
215215
| NVMe | 1.5 GiB/s | 2.2 GiB/s | 2.3 GiB/s |
@@ -231,7 +231,7 @@ curl -X POST http://localhost/v0/pipelines/{PIPELINE_NAME}/checkpoint/sync
231231
This initiates the sync and returns the UUID of the checkpoint being synced:
232232

233233
```json
234-
{"checkpoint_uuid":"019779b4-8760-75f2-bdf0-71b825e63610"}
234+
{ "checkpoint_uuid": "019779b4-8760-75f2-bdf0-71b825e63610" }
235235
```
236236

237237
## Checking sync status
@@ -247,23 +247,23 @@ curl http://localhost/v0/pipelines/{PIPELINE_NAME}/checkpoint/sync_status
247247
**In Progress:**
248248

249249
```json
250-
{"success":null,"failure":null}
250+
{ "success": null, "failure": null }
251251
```
252252

253253
**Success:**
254254

255255
```json
256-
{"success":"019779b4-8760-75f2-bdf0-71b825e63610","failure":null}
256+
{ "success": "019779b4-8760-75f2-bdf0-71b825e63610", "failure": null }
257257
```
258258

259259
**Failure:**
260260

261261
```json
262262
{
263-
"success": null,
264-
"failure": {
265-
"uuid": "019779c1-8317-7a71-bd78-7b971f4a3c43",
266-
"error": "Error pushing checkpoint to object store: ... SignatureDoesNotMatch ..."
267-
}
263+
"success": null,
264+
"failure": {
265+
"uuid": "019779c1-8317-7a71-bd78-7b971f4a3c43",
266+
"error": "Error pushing checkpoint to object store: ... SignatureDoesNotMatch ..."
267+
}
268268
}
269269
```

python/tests/platform/test_checkpoint_sync.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44
import time
55
from typing import Optional
6-
from uuid import uuid4, UUID
6+
from uuid import UUID, uuid4
77

88
from feldera.enums import FaultToleranceModel, PipelineStatus
99
from feldera.runtime_config import RuntimeConfig, Storage
@@ -281,7 +281,8 @@ def test_autherr_fail(self):
281281
@enterprise_only
282282
@single_host_only
283283
def test_autherr(self):
284-
self.test_checkpoint_sync(auth_err=True, strict=False, expect_empty=True)
284+
with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"):
285+
self.test_checkpoint_sync(auth_err=True, strict=False)
285286

286287
@enterprise_only
287288
@single_host_only
@@ -292,7 +293,8 @@ def test_nonexistent_checkpoint_fail(self):
292293
@enterprise_only
293294
@single_host_only
294295
def test_nonexistent_checkpoint(self):
295-
self.test_checkpoint_sync(random_uuid=True, from_uuid=True, expect_empty=True)
296+
with self.assertRaisesRegex(RuntimeError, "were not found in source"):
297+
self.test_checkpoint_sync(random_uuid=True, from_uuid=True)
296298

297299
@enterprise_only
298300
@single_host_only

0 commit comments

Comments
 (0)