1111from queue import Queue
1212
1313from feldera .rest .errors import FelderaAPIError
14- from feldera .enums import PipelineStatus , ProgramStatus , CheckpointStatus
14+ from feldera .enums import (
15+ PipelineStatus ,
16+ ProgramStatus ,
17+ CheckpointStatus ,
18+ TransactionStatus ,
19+ )
1520from feldera .enums import StorageStatus
1621from feldera .rest .pipeline import Pipeline as InnerPipeline
1722from feldera .rest .feldera_client import FelderaClient
@@ -563,8 +568,10 @@ def start_transaction(self) -> int:
563568 """
564569 Start a new transaction.
565570
566- Returns:
567- Transaction ID.
571+ :return: Transaction ID.
572+
573+ :raises FelderaAPIError: If the pipeline fails to start a transaction, e.g., if the pipeline is not running or
574+ there is already an active transaction.
568575 """
569576
570577 return self .client .start_transaction (self .name )
@@ -576,7 +583,7 @@ def commit_transaction(
576583 timeout_s : Optional [float ] = None ,
577584 ):
578585 """
579- Commits the currently active transaction.
586+ Commit the currently active transaction.
580587
581588 :param transaction_id: If provided, the function verifies that the currently active transaction matches this ID.
582589 If the active transaction ID does not match, the function raises an error.
@@ -590,11 +597,36 @@ def commit_transaction(
590597 :raises RuntimeError: If there is currently no transaction in progress.
591598 :raises ValueError: If the provided `transaction_id` does not match the current transaction.
592599 :raises TimeoutError: If the transaction does not commit within the specified timeout (when `wait` is True).
593- :raises FelderaAPIError: If the pipeline fails to start a transaction.
600+ :raises FelderaAPIError: If the pipeline fails to commit a transaction.
594601 """
595602
596603 self .client .commit_transaction (self .name , transaction_id , wait , timeout_s )
597604
605+ def transaction_status (self ) -> TransactionStatus :
606+ """
607+ Get pipeline's transaction handling status.
608+
609+ :return: Current transaction handling status of the pipeline.
610+
611+ :raises FelderaAPIError: If pipeline's status couldn't be read, e.g., because the pipeline is not currently running.
612+ """
613+
614+ return self .stats ().global_metrics .transaction_status
615+
616+ def transaction_id (self ) -> Optional [int ]:
617+ """
618+ Gets the ID of the currently active transaction or None if there is no active transaction.
619+
620+ :return: The ID of the transaction.
621+ """
622+
623+ transaction_id = self .stats ().global_metrics .transaction_id
624+
625+ if transaction_id == 0 :
626+ return None
627+ else :
628+ return transaction_id
629+
598630 def delete (self , clear_storage : bool = False ):
599631 """
600632 Deletes the pipeline.
0 commit comments