Interfaces#
This file defines IBaseETL, an abstract base class for ETL tasks.
- Structure & Design:
Inherits from ITask and ABC (Abstract Base Class)
Follows the Template Method pattern with execute() orchestrating the ETL workflow
Well-documented with clear docstrings
- Key Features:
Timezone support via time_zone parameter (defaults to UTC)
Temporary folder management for local file operations
Status tracking using TaskStatus enum
Error handling with proper exception capture and logging
Lifecycle hooks: pre_processing(), _execute(), post_processing(), clean_resources()
- Workflow:
Sets status to EXECUTING
Runs pre-processing
Executes main ETL logic (_execute() - abstract method)
Runs post-processing
Handles errors with logging and status updates
Cleans up resources in finally block
- class core_etl.base.IBaseETL(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#
Bases:
ITask,ABCBase class for an ETL task. A task defines the operations over a service or a platform from where you need to extract, transform and/or store information…
This class defines some commons (maybe useful) attributes/methods for future implementations of the ETL tasks.
- __init__(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs) None[source]#
- Parameters:
time_zone – The time zone to use in the ETL for date/datetime processing.
temp_folder – For task that need to store local files temporarily.
- execute(*args, **kwargs) int[source]#
Executes the ETL task following the template method pattern.
- Orchestrates the complete ETL workflow:
Sets status to EXECUTING.
Runs pre-processing actions.
Executes main ETL logic via _execute().
Runs post-processing actions.
Handles errors with logging and status updates.
Cleans up resources.
- Parameters:
args – Positional arguments passed to lifecycle methods.
kwargs – Keyword arguments passed to lifecycle methods.
- Returns:
Number of processed elements (0 if error or invalid return).
- abstractmethod _execute(*args, **kwargs) int[source]#
Generic implementation for all the extract, transform and load processes…
- Returns:
It returns the number of elements were processed.
- save_logs(processed_elements: int = 0, error_type: str = '', error_message: str = '', error_traceback: List[Dict] | None = None, **kwargs) None[source]#
Save executions logs/metadata if required…
- Parameters:
processed_elements – The number of processed elements.
error_type – The error type if exists.
error_message – The error message if exists.
error_traceback – The error traceback if exists.
- _abc_impl = <_abc._abc_data object>#
Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…
- Structure & Design:
Extends IBaseETL for file-based ETL operations
Implements the abstract _execute() method with file processing logic
Uses iterator pattern for handling multiple files
Key Features:
File Processing Pipeline:
Iterates through file paths from get_paths()
Processes each file individually with error isolation
Tracks processed file count
Handles success/error callbacks per file
Abstract Interface:
get_paths() must be implemented by subclasses
process_file() must be implemented by subclasses
Returns iterator of file paths to process
Supports last_processed parameter for incremental processing
Lifecycle Hooks:
process_file(): Core file processing logic (required implementation)
on_success(): Success callback for cleanup/archiving
on_error(): Error callback for error handling
Error Handling:
Individual file errors don’t stop the entire batch
Errors are logged with file path context
Failed files are tracked via on_error() callback
- class core_etl.file_based.IBaseEtlFromFile(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#
-
Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…
- _execute(*args, **kwargs) int[source]#
Executes file processing workflow for all files from the source.
Workflow: 1. Retrieves file paths via get_paths() iterator
For each file path:
Logs processing start
Calls process_file() with the path
Calls on_success() callback if processing succeeds
Calls on_error() callback if processing fails
Increments processed count only on success
Returns total number of successfully processed files
Individual file errors do not stop the batch processing.
- Returns:
Number of files successfully processed
- abstractmethod get_paths() Iterator[str][source]#
Retrieves file paths from the source and returns an iterator. This method must be implemented by subclasses to define how files are discovered and listed from the source system (e.g., SFTP, local filesystem, cloud storage, etc.).
- Returns:
Iterator yielding file paths as strings
- abstractmethod process_file(path: str)[source]#
Processes a single file from the given path.
This method must be implemented by subclasses to define the core file processing logic. Common implementations include:
Copying/moving files between systems (SFTP → S3).
Transforming file formats (CSV → JSON).
Validating and processing file contents.
Uploading files to cloud storage.
- Parameters:
path – File path to process.
- Raises:
Should raise exceptions on processing failures.
- on_success(path: str)[source]#
Called after a file is successfully processed. Override this method to implement post-processing actions such as:
Archiving processed files
Moving files to a “completed” directory
Updating processing logs or databases
Sending notifications
- Parameters:
path – Path of the successfully processed file.
- on_error(path: str)[source]#
Called when an error occurs during file processing. Override this method to implement error handling actions such as:
Moving failed files to an “error” directory
Logging detailed error information
Sending error notifications
Quarantining problematic files
- Parameters:
path – Path of the file that failed processing.
- _abc_impl = <_abc._abc_data object>#
Base class for record-based ETL operations that process data records/rows from various sources like APIs, databases, files, SQS queues, etc.
Structure & Design:
Extends IBaseETL for record-based data processing.
Implements configurable transformation pipeline.
Processes data in memory-efficient batches.
Provides built-in transformations (rename, remove, type cast).
Key Features:
Batch Processing Pipeline:
Retrieves records in configurable batches.
Applies transformation sequence per record.
Processes complete batches for efficiency.
Tracks total processed record count.
Built-in Transformations:
Field removal via attrs_to_remove.
Field renaming via name_mapper.
Data type casting via type_mapper.
Configurable batch size for memory management.
Transformation Hooks:
pre_transformations(): Custom logic before built-in transforms.
post_transformations(): Custom logic after built-in transforms.
Extensible pipeline for specific business logic.
Abstract Interface:
retrieve_records(): Must implement data source integration.
process_records(): Must implement data destination handling.
Error Handling:
Individual record errors don’t stop batch processing.
Batch-level processing with configurable sizes.
Memory-efficient for large datasets.
- class core_etl.record_based.IBaseEtlFromRecord(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs)[source]#
-
Base class for an ETL task that need to do ETLs processes over data (records, rows) retrieved from different sources like: file, sFTP server, SQS queues, APIs or another data source…
- __init__(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs) None[source]#
Initialize record-based ETL with transformation configuration.
- Parameters:
name_mapper – Dictionary mapping old field names to new field names. Example: {“old_name”: “new_name”, “id”: “user_id”}.
type_mapper – Dictionary defining data type conversions for fields. Example: {“age”: “int”, “price”: “float”, “active”: “bool”}
attrs_to_remove – List of field names to remove from each record. Example: [“temp_field”, “internal_id”, “debug_info”].
kwargs – Additional keyword arguments passed to parent IBaseETL.
- _execute(*args, **kwargs) int[source]#
Executes record processing workflow with configurable transformations.
- Workflow:
Retrieves records in batches via retrieve_records()
For each batch:
Logs batch processing start
For each record in batch: a. Applies pre_transformations() (custom logic). b. Removes unwanted attributes (attrs_to_remove). c. Renames fields (name_mapper). d. Converts data types (type_mapper). e. Applies post_transformations() (custom logic).
Processes complete transformed batch via process_records().
Returns total number of records processed across all batches.
Individual record errors do not stop batch processing. A failing record is logged and skipped; remaining records and batches continue.
- Returns:
Total number of records processed across all batches.
- abstractmethod retrieve_records() Iterator[List[Dict]][source]#
Retrieves records from data sources in batches.
This method must be implemented by subclasses to define how records are fetched from the source system. Returns batches of records to enable memory-efficient processing and prevent resource exhaustion.
Common implementations include:
Database queries with pagination.
API calls with page-based retrieval.
File reading with chunk processing.
Message queue consumption.
Stream processing.
- Returns:
Iterator yielding batches (lists) of records as dictionaries. Batch sizing is the subclass’s responsibility.
- pre_transformations(record: Dict) None[source]#
Apply custom transformations before built-in transformations.
Override this method to implement custom business logic that should be applied before the standard transformations (remove, rename, cast). This allows for data preparation, validation, or enrichment.
Common use cases:
Data validation and cleanup.
Field normalization (e.g., date formatting).
Data enrichment from external sources.
Business-specific calculations.
Record filtering or marking.
- Parameters:
record – Record dictionary to transform (modified in-place).
- post_transformations(record: Dict) None[source]#
Apply custom transformations after built-in transformations.
Override this method to implement custom business logic that should be applied after the standard transformations (remove, rename, cast). This allows for final data processing before the load phase.
Common use cases:
Final data validation.
Derived field calculations.
Data formatting for destination system.
Business rule application.
Data quality checks.
- Parameters:
record – Record dictionary to transform (modified in-place).
- abstractmethod process_records(records: List[Dict])[source]#
Processes a batch of transformed records.
This method must be implemented by subclasses to define what happens to records after all transformations have been applied. This is the “Load” phase of the ETL process.
Common implementations include:
Storing records in databases (SQL, NoSQL).
Writing to files (CSV, JSON, Parquet).
Sending to message queues (SQS, Kafka).
Uploading to cloud storage (S3, Azure Blob).
Streaming to real-time systems (Kinesis, Pub/Sub).
Calling external APIs for data ingestion.
- Parameters:
records – List of transformed record dictionaries ready for processing.
- Raises:
Should raise exceptions on processing failures to trigger error handling.
- _abc_impl = <_abc._abc_data object>#