Interfaces#

IBaseETL#

class core_etl.base.IBaseETL(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#

Bases: ITask, ABC

Base class for an ETL task. A task defines the operations over a service or a platform from where you need to extract, transform and/or store information…

This class defines some commons (maybe useful) attributes/methods for future implementations of the ETL tasks.

__init__(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs) None[source]#
Parameters:
  • time_zone – The time zone to use in the ETL for date/datetime processing.

  • temp_folder – For task that need to store local files temporarily.

execute(*args, **kwargs) int[source]#

Executes the ETL task following the template method pattern.

Orchestrates the complete ETL workflow:
  1. Sets status to EXECUTING.

  2. Runs pre-processing actions.

  3. Executes main ETL logic via _execute().

  4. Runs post-processing actions.

  5. Handles errors with logging and status updates.

  6. Cleans up resources.

Parameters:
  • args – Positional arguments passed to lifecycle methods.

  • kwargs – Keyword arguments passed to lifecycle methods.

Returns:

Number of processed elements (0 if error or invalid return).

pre_processing() None[source]#

Pre-processing actions…

abstractmethod _execute(*args, **kwargs) int[source]#

Generic implementation for all the extract, transform and load processes…

Returns:

It returns the number of elements were processed.

post_processing() None[source]#

Post-processing actions…

save_logs(processed_elements: int = 0, error_type: str = '', error_message: str = '', error_traceback: List[Dict] | None = None, **kwargs) None[source]#

Save executions logs/metadata if required…

Parameters:
  • processed_elements – The number of processed elements.

  • error_type – The error type if exists.

  • error_message – The error message if exists.

  • error_traceback – The error traceback if exists.

clean_resources() None[source]#

In case you need to close/remove some resources

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

IBaseEtlFromFile#

class core_etl.file_based.IBaseEtlFromFile(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#

Bases: IBaseETL, ABC

Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…

_execute(*args, **kwargs) int[source]#

Executes file processing workflow for all files from the source.

Workflow: 1. Retrieves file paths via get_paths() iterator

  1. For each file path:

  • Logs processing start

  • Calls process_file() with the path

  • Calls on_success() callback if processing succeeds

  • Calls on_error() callback if processing fails

  • Increments processed count only on success

  1. Returns total number of successfully processed files

Individual file errors do not stop the batch processing.

Returns:

Number of files successfully processed

abstractmethod get_paths() Iterator[str][source]#

Retrieves file paths from the source and returns an iterator. This method must be implemented by subclasses to define how files are discovered and listed from the source system (e.g., SFTP, local filesystem, cloud storage, etc.).

Returns:

Iterator yielding file paths as strings

abstractmethod process_file(path: str)[source]#

Processes a single file from the given path.

This method must be implemented by subclasses to define the core file processing logic. Common implementations include:

  • Copying/moving files between systems (SFTP → S3).

  • Transforming file formats (CSV → JSON).

  • Validating and processing file contents.

  • Uploading files to cloud storage.

Parameters:

path – File path to process.

Raises:

Should raise exceptions on processing failures.

on_success(path: str)[source]#

Called after a file is successfully processed. Override this method to implement post-processing actions such as:

  • Archiving processed files

  • Moving files to a “completed” directory

  • Updating processing logs or databases

  • Sending notifications

Parameters:

path – Path of the successfully processed file.

on_error(path: str)[source]#

Called when an error occurs during file processing. Override this method to implement error handling actions such as:

  • Moving failed files to an “error” directory

  • Logging detailed error information

  • Sending error notifications

  • Quarantining problematic files

Parameters:

path – Path of the file that failed processing.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

IBaseEtlFromRecord#

class core_etl.record_based.IBaseEtlFromRecord(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs)[source]#

Bases: IBaseETL, ABC

Base class for an ETL task that need to do ETLs processes over data (records, rows) retrieved from different sources like: file, sFTP server, SQS queues, APIs or another data source…

__init__(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs) None[source]#

Initialize record-based ETL with transformation configuration.

Parameters:
  • name_mapper – Dictionary mapping old field names to new field names. Example: {“old_name”: “new_name”, “id”: “user_id”}.

  • type_mapper – Dictionary defining data type conversions for fields. Example: {“age”: “int”, “price”: “float”, “active”: “bool”}

  • attrs_to_remove – List of field names to remove from each record. Example: [“temp_field”, “internal_id”, “debug_info”].

  • kwargs – Additional keyword arguments passed to parent IBaseETL.

_execute(*args, **kwargs) int[source]#

Executes record processing workflow with configurable transformations.

Workflow:
  1. Retrieves records in batches via retrieve_records()

  2. For each batch:

    • Logs batch processing start

    • For each record in batch: a. Applies pre_transformations() (custom logic). b. Removes unwanted attributes (attrs_to_remove). c. Renames fields (name_mapper). d. Converts data types (type_mapper). e. Applies post_transformations() (custom logic).

    • Processes complete transformed batch via process_records().

  3. Returns total number of records processed across all batches.

Individual record errors do not stop batch processing. A failing record is logged and skipped; remaining records and batches continue.

Returns:

Total number of records processed across all batches.

abstractmethod retrieve_records() Iterator[List[Dict]][source]#

Retrieves records from data sources in batches.

This method must be implemented by subclasses to define how records are fetched from the source system. Returns batches of records to enable memory-efficient processing and prevent resource exhaustion.

Common implementations include:

  • Database queries with pagination.

  • API calls with page-based retrieval.

  • File reading with chunk processing.

  • Message queue consumption.

  • Stream processing.

Returns:

Iterator yielding batches (lists) of records as dictionaries. Batch sizing is the subclass’s responsibility.

pre_transformations(record: Dict) None[source]#

Apply custom transformations before built-in transformations.

Override this method to implement custom business logic that should be applied before the standard transformations (remove, rename, cast). This allows for data preparation, validation, or enrichment.

Common use cases:

  • Data validation and cleanup.

  • Field normalization (e.g., date formatting).

  • Data enrichment from external sources.

  • Business-specific calculations.

  • Record filtering or marking.

Parameters:

record – Record dictionary to transform (modified in-place).

post_transformations(record: Dict) None[source]#

Apply custom transformations after built-in transformations.

Override this method to implement custom business logic that should be applied after the standard transformations (remove, rename, cast). This allows for final data processing before the load phase.

Common use cases:

  • Final data validation.

  • Derived field calculations.

  • Data formatting for destination system.

  • Business rule application.

  • Data quality checks.

Parameters:

record – Record dictionary to transform (modified in-place).

abstractmethod process_records(records: List[Dict])[source]#

Processes a batch of transformed records.

This method must be implemented by subclasses to define what happens to records after all transformations have been applied. This is the “Load” phase of the ETL process.

Common implementations include:

  • Storing records in databases (SQL, NoSQL).

  • Writing to files (CSV, JSON, Parquet).

  • Sending to message queues (SQS, Kafka).

  • Uploading to cloud storage (S3, Azure Blob).

  • Streaming to real-time systems (Kinesis, Pub/Sub).

  • Calling external APIs for data ingestion.

Parameters:

records – List of transformed record dictionaries ready for processing.

Raises:

Should raise exceptions on processing failures to trigger error handling.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

IAsyncETL#

class core_etl.async_based.IAsyncETL(max_queue_size: int = 1000, max_workers: int = 10, **kwargs)[source]#

Bases: IBaseETL, ABC

Base class for an ETL task that need to process elements in an asynchronous manner.

Note: execute() calls asyncio.run() internally and cannot be invoked from within a running event loop. In async contexts use: await asyncio.to_thread(task.execute)

__init__(max_queue_size: int = 1000, max_workers: int = 10, **kwargs) None[source]#
Parameters:
  • time_zone – The time zone to use in the ETL for date/datetime processing.

  • temp_folder – For task that need to store local files temporarily.

pre_processing() None[source]#

Pre-processing actions…

_execute(*args, **kwargs) int[source]#

Generic implementation for all the extract, transform and load processes…

Returns:

It returns the number of elements were processed.

async _execute_async(*_args, **_kwargs) int[source]#
abstractmethod async produce_records()[source]#

Must be implemented in subclasses. It must populate the asyncio.Queue with the records will be processed via consumers in _process_record function. Ensure: await queue.put(record) is called.

async _consume_record(worker_id: str) None[source]#

Pulls records from the queue and dispatches them to _process_record.

abstractmethod async _process_record(record: Any) None[source]#

Must be implemented by child classes

async _stop()[source]#

Send stop signal to all consumers/workers.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#