Interfaces#

This file defines IBaseETL, an abstract base class for ETL tasks.

Structure & Design:
  • Inherits from ITask and ABC (Abstract Base Class)

  • Follows the Template Method pattern with execute() orchestrating the ETL workflow

  • Well-documented with clear docstrings

Key Features:
  • Timezone support via time_zone parameter (defaults to UTC)

  • Temporary folder management for local file operations

  • Status tracking using TaskStatus enum

  • Error handling with proper exception capture and logging

  • Lifecycle hooks: pre_processing(), _execute(), post_processing(), clean_resources()

Workflow:
  1. Sets status to EXECUTING

  2. Runs pre-processing

  3. Executes main ETL logic (_execute() - abstract method)

  4. Runs post-processing

  5. Handles errors with logging and status updates

  6. Cleans up resources in finally block

class core_etl.base.IBaseETL(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#

Bases: ITask, ABC

Base class for an ETL task. A task defines the operations over a service or a platform from where you need to extract, transform and/or store information…

This class defines some commons (maybe useful) attributes/methods for future implementations of the ETL tasks.

__init__(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs) None[source]#
Parameters:
  • time_zone – The time zone to use in the ETL for date/datetime processing.

  • temp_folder – For task that need to store local files temporarily.

execute(*args, **kwargs) int[source]#

Executes the ETL task following the template method pattern.

Orchestrates the complete ETL workflow:
  1. Sets status to EXECUTING.

  2. Runs pre-processing actions.

  3. Executes main ETL logic via _execute().

  4. Runs post-processing actions.

  5. Handles errors with logging and status updates.

  6. Cleans up resources.

Parameters:
  • args – Positional arguments passed to lifecycle methods.

  • kwargs – Keyword arguments passed to lifecycle methods.

Returns:

Number of processed elements (0 if error or invalid return).

pre_processing() None[source]#

Pre-processing actions…

abstractmethod _execute(*args, **kwargs) int[source]#

Generic implementation for all the extract, transform and load processes…

Returns:

It returns the number of elements were processed.

post_processing() None[source]#

Post-processing actions…

save_logs(processed_elements: int = 0, error_type: str = '', error_message: str = '', error_traceback: List[Dict] | None = None, **kwargs) None[source]#

Save executions logs/metadata if required…

Parameters:
  • processed_elements – The number of processed elements.

  • error_type – The error type if exists.

  • error_message – The error message if exists.

  • error_traceback – The error traceback if exists.

clean_resources() None[source]#

In case you need to close/remove some resources

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…

Structure & Design:
  • Extends IBaseETL for file-based ETL operations

  • Implements the abstract _execute() method with file processing logic

  • Uses iterator pattern for handling multiple files

Key Features:

  1. File Processing Pipeline:

  • Iterates through file paths from get_paths()

  • Processes each file individually with error isolation

  • Tracks processed file count

  • Handles success/error callbacks per file

  1. Abstract Interface:

  • get_paths() must be implemented by subclasses

  • process_file() must be implemented by subclasses

  • Returns iterator of file paths to process

  • Supports last_processed parameter for incremental processing

  1. Lifecycle Hooks:

  • process_file(): Core file processing logic (required implementation)

  • on_success(): Success callback for cleanup/archiving

  • on_error(): Error callback for error handling

Error Handling:

  • Individual file errors don’t stop the entire batch

  • Errors are logged with file path context

  • Failed files are tracked via on_error() callback

class core_etl.file_based.IBaseEtlFromFile(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#

Bases: IBaseETL, ABC

Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…

_execute(*args, **kwargs) int[source]#

Executes file processing workflow for all files from the source.

Workflow: 1. Retrieves file paths via get_paths() iterator

  1. For each file path:

  • Logs processing start

  • Calls process_file() with the path

  • Calls on_success() callback if processing succeeds

  • Calls on_error() callback if processing fails

  • Increments processed count only on success

  1. Returns total number of successfully processed files

Individual file errors do not stop the batch processing.

Returns:

Number of files successfully processed

abstractmethod get_paths() Iterator[str][source]#

Retrieves file paths from the source and returns an iterator. This method must be implemented by subclasses to define how files are discovered and listed from the source system (e.g., SFTP, local filesystem, cloud storage, etc.).

Returns:

Iterator yielding file paths as strings

abstractmethod process_file(path: str)[source]#

Processes a single file from the given path.

This method must be implemented by subclasses to define the core file processing logic. Common implementations include:

  • Copying/moving files between systems (SFTP → S3).

  • Transforming file formats (CSV → JSON).

  • Validating and processing file contents.

  • Uploading files to cloud storage.

Parameters:

path – File path to process.

Raises:

Should raise exceptions on processing failures.

on_success(path: str)[source]#

Called after a file is successfully processed. Override this method to implement post-processing actions such as:

  • Archiving processed files

  • Moving files to a “completed” directory

  • Updating processing logs or databases

  • Sending notifications

Parameters:

path – Path of the successfully processed file.

on_error(path: str)[source]#

Called when an error occurs during file processing. Override this method to implement error handling actions such as:

  • Moving failed files to an “error” directory

  • Logging detailed error information

  • Sending error notifications

  • Quarantining problematic files

Parameters:

path – Path of the file that failed processing.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

Base class for record-based ETL operations that process data records/rows from various sources like APIs, databases, files, SQS queues, etc.

Structure & Design:

  • Extends IBaseETL for record-based data processing.

  • Implements configurable transformation pipeline.

  • Processes data in memory-efficient batches.

  • Provides built-in transformations (rename, remove, type cast).

Key Features:

  1. Batch Processing Pipeline:

  • Retrieves records in configurable batches.

  • Applies transformation sequence per record.

  • Processes complete batches for efficiency.

  • Tracks total processed record count.

  1. Built-in Transformations:

  • Field removal via attrs_to_remove.

  • Field renaming via name_mapper.

  • Data type casting via type_mapper.

  • Configurable batch size for memory management.

  1. Transformation Hooks:

  • pre_transformations(): Custom logic before built-in transforms.

  • post_transformations(): Custom logic after built-in transforms.

  • Extensible pipeline for specific business logic.

  1. Abstract Interface:

  • retrieve_records(): Must implement data source integration.

  • process_records(): Must implement data destination handling.

Error Handling:

  • Individual record errors don’t stop batch processing.

  • Batch-level processing with configurable sizes.

  • Memory-efficient for large datasets.

class core_etl.record_based.IBaseEtlFromRecord(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs)[source]#

Bases: IBaseETL, ABC

Base class for an ETL task that need to do ETLs processes over data (records, rows) retrieved from different sources like: file, sFTP server, SQS queues, APIs or another data source…

__init__(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs) None[source]#

Initialize record-based ETL with transformation configuration.

Parameters:
  • name_mapper – Dictionary mapping old field names to new field names. Example: {“old_name”: “new_name”, “id”: “user_id”}.

  • type_mapper – Dictionary defining data type conversions for fields. Example: {“age”: “int”, “price”: “float”, “active”: “bool”}

  • attrs_to_remove – List of field names to remove from each record. Example: [“temp_field”, “internal_id”, “debug_info”].

  • kwargs – Additional keyword arguments passed to parent IBaseETL.

_execute(*args, **kwargs) int[source]#

Executes record processing workflow with configurable transformations.

Workflow:
  1. Retrieves records in batches via retrieve_records()

  2. For each batch:

    • Logs batch processing start

    • For each record in batch: a. Applies pre_transformations() (custom logic). b. Removes unwanted attributes (attrs_to_remove). c. Renames fields (name_mapper). d. Converts data types (type_mapper). e. Applies post_transformations() (custom logic).

    • Processes complete transformed batch via process_records().

  3. Returns total number of records processed across all batches.

Individual record errors do not stop batch processing. A failing record is logged and skipped; remaining records and batches continue.

Returns:

Total number of records processed across all batches.

abstractmethod retrieve_records() Iterator[List[Dict]][source]#

Retrieves records from data sources in batches.

This method must be implemented by subclasses to define how records are fetched from the source system. Returns batches of records to enable memory-efficient processing and prevent resource exhaustion.

Common implementations include:

  • Database queries with pagination.

  • API calls with page-based retrieval.

  • File reading with chunk processing.

  • Message queue consumption.

  • Stream processing.

Returns:

Iterator yielding batches (lists) of records as dictionaries. Batch sizing is the subclass’s responsibility.

pre_transformations(record: Dict) None[source]#

Apply custom transformations before built-in transformations.

Override this method to implement custom business logic that should be applied before the standard transformations (remove, rename, cast). This allows for data preparation, validation, or enrichment.

Common use cases:

  • Data validation and cleanup.

  • Field normalization (e.g., date formatting).

  • Data enrichment from external sources.

  • Business-specific calculations.

  • Record filtering or marking.

Parameters:

record – Record dictionary to transform (modified in-place).

post_transformations(record: Dict) None[source]#

Apply custom transformations after built-in transformations.

Override this method to implement custom business logic that should be applied after the standard transformations (remove, rename, cast). This allows for final data processing before the load phase.

Common use cases:

  • Final data validation.

  • Derived field calculations.

  • Data formatting for destination system.

  • Business rule application.

  • Data quality checks.

Parameters:

record – Record dictionary to transform (modified in-place).

abstractmethod process_records(records: List[Dict])[source]#

Processes a batch of transformed records.

This method must be implemented by subclasses to define what happens to records after all transformations have been applied. This is the “Load” phase of the ETL process.

Common implementations include:

  • Storing records in databases (SQL, NoSQL).

  • Writing to files (CSV, JSON, Parquet).

  • Sending to message queues (SQS, Kafka).

  • Uploading to cloud storage (S3, Azure Blob).

  • Streaming to real-time systems (Kinesis, Pub/Sub).

  • Calling external APIs for data ingestion.

Parameters:

records – List of transformed record dictionaries ready for processing.

Raises:

Should raise exceptions on processing failures to trigger error handling.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#