Source code for datacube.index.abstract._transactions

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
from abc import ABC, abstractmethod
from threading import Lock
from typing import Any

from typing_extensions import override

from datacube.index.exceptions import TransactionException
from datacube.utils.generic import thread_local_cache


[docs] class AbstractTransaction(ABC): """ Abstract base class for a Transaction Manager. All index implementations should extend this base class. Thread-local storage and locks ensures one active transaction per index per thread. """ def __init__(self, index_id: str) -> None: self._connection: Any = None self._tls_id = f"txn-{index_id}" self._obj_lock = Lock() self._controlling_trans = None # Main Transaction API
[docs] def begin(self) -> None: """ Start a new transaction. Raises an error if a transaction is already active for this thread. Calls implementation-specific _new_connection() method and manages thread local storage and locks. """ with self._obj_lock: if self._connection is not None: raise ValueError( "Cannot start a new transaction as one is already active" ) self._tls_stash()
[docs] def commit(self) -> None: """ Commit the transaction. Raises an error if transaction is not active. Calls implementation-specific _commit() method, and manages thread local storage and locks. """ with self._obj_lock: if self._connection is None: raise ValueError("Cannot commit inactive transaction") self._commit() self._release_connection() self._connection = None self._tls_purge()
[docs] def rollback(self) -> None: """ Rollback the transaction. Raises an error if transaction is not active. Calls implementation-specific _rollback() method, and manages thread local storage and locks. """ with self._obj_lock: if self._connection is None: raise ValueError("Cannot rollback inactive transaction") self._rollback() self._release_connection() self._connection = None self._tls_purge()
@property def active(self) -> bool: """ :return: True if the transaction is active. """ return self._connection is not None # Manage thread-local storage def _tls_stash(self) -> None: """ Check TLS is empty, create a new connection and stash it. :return: """ stored_val = thread_local_cache(self._tls_id) if stored_val is not None: # stored_val is outermost transaction in a stack of nested transaction. self._controlling_trans = stored_val self._connection = stored_val._connection else: self._connection = self._new_connection() thread_local_cache(self._tls_id, purge=True) thread_local_cache(self._tls_id, self) def _tls_purge(self) -> None: thread_local_cache(self._tls_id, purge=True) # Commit/Rollback exceptions for Context Manager usage patterns def commit_exception(self, errmsg: str) -> TransactionException: return TransactionException(errmsg, commit=True) def rollback_exception(self, errmsg: str) -> TransactionException: return TransactionException(errmsg, commit=False) # Context Manager Interface def __enter__(self) -> "AbstractTransaction": self.begin() return self def __exit__(self, exc_type, exc_value, traceback) -> bool: if not self.active: # User has already manually committed or rolled back. return True if exc_type is not None and issubclass(exc_type, TransactionException): # User raised a TransactionException, if self._controlling_trans: # Nested transaction - reraise TransactionException return False # Commit or rollback as per exception if exc_value.commit: self.commit() else: self.rollback() # Tell runtime exception is caught and handled. return True elif exc_value is not None: # Any other exception - reraise. Rollback if outermost transaction if not self._controlling_trans: self.rollback() # Instruct runtime to rethrow exception return False else: # Exited without exception. Commit if outermost transaction if not self._controlling_trans: self.commit() return True # Internal abstract methods for implementation-specific functionality @abstractmethod def _new_connection(self) -> Any: """ :return: a new index driver object representing a database connection or equivalent against which transactions will be executed. """ @abstractmethod def _commit(self) -> None: """ Commit the transaction. """ @abstractmethod def _rollback(self) -> None: """ Rollback the transaction. """ @abstractmethod def _release_connection(self) -> None: """ Release the connection object stored in self._connection """
[docs] class UnhandledTransaction(AbstractTransaction): # Minimal implementation for index drivers with no transaction handling. @override def _new_connection(self) -> Any: return True @override def _commit(self) -> None: pass @override def _rollback(self) -> None: pass @override def _release_connection(self) -> None: pass