Skip to content

Commit 669d3fb

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[adapters+py] Improve transaction status reporting.
* Expose fewer transaction states via the API by merging TransactionRequested into TransactionInProgress and CommitRequested into CommitInProgress. * Add Python API to retrieve transaction status. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent f15cf25 commit 669d3fb

File tree

6 files changed

+75
-15
lines changed

6 files changed

+75
-15
lines changed

crates/adapters/src/controller/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4215,7 +4215,7 @@ impl ControllerInner {
42154215
self.status
42164216
.global_metrics
42174217
.transaction_status
4218-
.store(TransactionStatus::None, Ordering::Release);
4218+
.store(TransactionStatus::NoTransaction, Ordering::Release);
42194219
}
42204220
(TransactionState::Started(tid), None) => {
42214221
assert_ne!(tid, 0);
@@ -4227,7 +4227,7 @@ impl ControllerInner {
42274227
self.status
42284228
.global_metrics
42294229
.transaction_status
4230-
.store(TransactionStatus::CommitRequested, Ordering::Release);
4230+
.store(TransactionStatus::CommitInProgress, Ordering::Release);
42314231
}
42324232
(TransactionState::Committing(tid), None) => {
42334233
assert_ne!(tid, 0);
@@ -4251,7 +4251,7 @@ impl ControllerInner {
42514251
self.status
42524252
.global_metrics
42534253
.transaction_status
4254-
.store(TransactionStatus::TransactionStarting, Ordering::Release);
4254+
.store(TransactionStatus::TransactionInProgress, Ordering::Release);
42554255
}
42564256
(TransactionState::Started(tid), Some(tid2)) => {
42574257
assert_ne!(tid, 0);

crates/adapters/src/controller/stats.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,8 @@ impl CompletionToken {
126126
#[repr(u8)]
127127
pub enum TransactionStatus {
128128
#[default]
129-
None,
130-
TransactionStarting,
129+
NoTransaction,
131130
TransactionInProgress,
132-
CommitRequested,
133131
CommitInProgress,
134132
}
135133

@@ -273,7 +271,7 @@ impl GlobalControllerMetrics {
273271
state: Atomic::new(PipelineState::Paused),
274272
bootstrap_in_progress: AtomicBool::new(false),
275273
transaction_id: Atomic::new(0),
276-
transaction_status: Atomic::new(TransactionStatus::None),
274+
transaction_status: Atomic::new(TransactionStatus::NoTransaction),
277275
rss_bytes: AtomicU64::new(0),
278276
cpu_msecs: AtomicU64::new(0),
279277
uptime_msecs: AtomicU64::new(0),
@@ -676,7 +674,7 @@ impl ControllerStatus {
676674
self.global_metrics
677675
.transaction_status
678676
.load(Ordering::Acquire)
679-
!= TransactionStatus::None
677+
!= TransactionStatus::NoTransaction
680678
}
681679

682680
pub fn request_step(&self, circuit_thread_unparker: &Unparker) {

python/feldera/enums.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,33 @@ def __eq__(self, other):
170170
return self.value == other.value
171171

172172

173+
class TransactionStatus(Enum):
174+
"""
175+
Represents the transaction handling status of a pipeline.
176+
"""
177+
178+
NoTransaction = 1
179+
"""There is currently no active transaction."""
180+
181+
TransactionInProgress = 2
182+
"""There is an active transaction in progress."""
183+
184+
CommitInProgress = 3
185+
"""A commit is currently in progress."""
186+
187+
@staticmethod
188+
def from_str(value):
189+
for member in TransactionStatus:
190+
if member.name.lower() == value.lower():
191+
return member
192+
raise ValueError(
193+
f"Unknown value '{value}' for enum {TransactionStatus.__name__}"
194+
)
195+
196+
def __eq__(self, other):
197+
return self.value == other.value
198+
199+
173200
class ProgramStatus(Enum):
174201
Pending = 1
175202
CompilingSql = 2

python/feldera/pipeline.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from queue import Queue
1212

1313
from feldera.rest.errors import FelderaAPIError
14-
from feldera.enums import PipelineStatus, ProgramStatus, CheckpointStatus
14+
from feldera.enums import (
15+
PipelineStatus,
16+
ProgramStatus,
17+
CheckpointStatus,
18+
TransactionStatus,
19+
)
1520
from feldera.enums import StorageStatus
1621
from feldera.rest.pipeline import Pipeline as InnerPipeline
1722
from feldera.rest.feldera_client import FelderaClient
@@ -563,8 +568,10 @@ def start_transaction(self) -> int:
563568
"""
564569
Start a new transaction.
565570
566-
Returns:
567-
Transaction ID.
571+
:return: Transaction ID.
572+
573+
:raises FelderaAPIError: If the pipeline fails to start a transaction, e.g., if the pipeline is not running or
574+
there is already an active transaction.
568575
"""
569576

570577
return self.client.start_transaction(self.name)
@@ -576,7 +583,7 @@ def commit_transaction(
576583
timeout_s: Optional[float] = None,
577584
):
578585
"""
579-
Commits the currently active transaction.
586+
Commit the currently active transaction.
580587
581588
:param transaction_id: If provided, the function verifies that the currently active transaction matches this ID.
582589
If the active transaction ID does not match, the function raises an error.
@@ -590,11 +597,36 @@ def commit_transaction(
590597
:raises RuntimeError: If there is currently no transaction in progress.
591598
:raises ValueError: If the provided `transaction_id` does not match the current transaction.
592599
:raises TimeoutError: If the transaction does not commit within the specified timeout (when `wait` is True).
593-
:raises FelderaAPIError: If the pipeline fails to start a transaction.
600+
:raises FelderaAPIError: If the pipeline fails to commit a transaction.
594601
"""
595602

596603
self.client.commit_transaction(self.name, transaction_id, wait, timeout_s)
597604

605+
def transaction_status(self) -> TransactionStatus:
606+
"""
607+
Get pipeline's transaction handling status.
608+
609+
:return: Current transaction handling status of the pipeline.
610+
611+
:raises FelderaAPIError: If pipeline's status couldn't be read, e.g., because the pipeline is not currently running.
612+
"""
613+
614+
return self.stats().global_metrics.transaction_status
615+
616+
def transaction_id(self) -> Optional[int]:
617+
"""
618+
Gets the ID of the currently active transaction or None if there is no active transaction.
619+
620+
:return: The ID of the transaction.
621+
"""
622+
623+
transaction_id = self.stats().global_metrics.transaction_id
624+
625+
if transaction_id == 0:
626+
return None
627+
else:
628+
return transaction_id
629+
598630
def delete(self, clear_storage: bool = False):
599631
"""
600632
Deletes the pipeline.

python/feldera/rest/feldera_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ def commit_transaction(
565565
:raises RuntimeError: If there is currently no transaction in progress.
566566
:raises ValueError: If the provided `transaction_id` does not match the current transaction.
567567
:raises TimeoutError: If the transaction does not commit within the specified timeout (when `wait` is True).
568-
:raises FelderaAPIError: If the pipeline fails to start a transaction.
568+
:raises FelderaAPIError: If the pipeline fails to commit a transaction.
569569
"""
570570

571571
# TODO: implement this without using /stats when we have a better pipeline status reporting API.

python/feldera/stats.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Mapping, Any, Optional, List
2-
from feldera.enums import PipelineStatus
2+
from feldera.enums import PipelineStatus, TransactionStatus
33
from datetime import datetime
44
import uuid
55

@@ -55,6 +55,8 @@ def __init__(self):
5555
self.total_processed_records: Optional[int] = None
5656
self.total_completed_records: Optional[int] = None
5757
self.pipeline_complete: Optional[bool] = None
58+
self.transaction_status: Optional[TransactionStatus] = None
59+
self.transaction_id: Optional[int] = None
5860

5961
@classmethod
6062
def from_dict(cls, d: Mapping[str, Any]):
@@ -63,6 +65,7 @@ def from_dict(cls, d: Mapping[str, Any]):
6365
metrics.state = PipelineStatus.from_str(d["state"])
6466
metrics.incarnation_uuid = uuid.UUID(d["incarnation_uuid"])
6567
metrics.start_time = datetime.fromtimestamp(d["start_time"])
68+
metrics.transaction_status = TransactionStatus.from_str(d["transaction_status"])
6669
return metrics
6770

6871

0 commit comments

Comments
 (0)