Interfaces#
IBaseETL#
- class core_etl.base.IBaseETL(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#
Bases:
ITask,ABCBase class for an ETL task. A task defines the operations over a service or a platform from where you need to extract, transform and/or store information…
This class defines some commons (maybe useful) attributes/methods for future implementations of the ETL tasks.
- __init__(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs) None[source]#
- Parameters:
time_zone – The time zone to use in the ETL for date/datetime processing.
temp_folder – For task that need to store local files temporarily.
- execute(*args, **kwargs) int[source]#
Executes the ETL task following the template method pattern.
- Orchestrates the complete ETL workflow:
Sets status to EXECUTING.
Runs pre-processing actions.
Executes main ETL logic via _execute().
Runs post-processing actions.
Handles errors with logging and status updates.
Cleans up resources.
- Parameters:
args – Positional arguments passed to lifecycle methods.
kwargs – Keyword arguments passed to lifecycle methods.
- Returns:
Number of processed elements (0 if error or invalid return).
- abstractmethod _execute(*args, **kwargs) int[source]#
Generic implementation for all the extract, transform and load processes…
- Returns:
It returns the number of elements were processed.
- save_logs(processed_elements: int = 0, error_type: str = '', error_message: str = '', error_traceback: List[Dict] | None = None, **kwargs) None[source]#
Save executions logs/metadata if required…
- Parameters:
processed_elements – The number of processed elements.
error_type – The error type if exists.
error_message – The error message if exists.
error_traceback – The error traceback if exists.
- _abc_impl = <_abc._abc_data object>#
IBaseEtlFromFile#
- class core_etl.file_based.IBaseEtlFromFile(time_zone: timezone = datetime.timezone.utc, temp_folder: str | None = None, **kwargs)[source]#
-
Base class for an ETL task that need to do something with a file retrieved from a source. Like copy the file from sFTP to S3…
- _execute(*args, **kwargs) int[source]#
Executes file processing workflow for all files from the source.
Workflow: 1. Retrieves file paths via get_paths() iterator
For each file path:
Logs processing start
Calls process_file() with the path
Calls on_success() callback if processing succeeds
Calls on_error() callback if processing fails
Increments processed count only on success
Returns total number of successfully processed files
Individual file errors do not stop the batch processing.
- Returns:
Number of files successfully processed
- abstractmethod get_paths() Iterator[str][source]#
Retrieves file paths from the source and returns an iterator. This method must be implemented by subclasses to define how files are discovered and listed from the source system (e.g., SFTP, local filesystem, cloud storage, etc.).
- Returns:
Iterator yielding file paths as strings
- abstractmethod process_file(path: str)[source]#
Processes a single file from the given path.
This method must be implemented by subclasses to define the core file processing logic. Common implementations include:
Copying/moving files between systems (SFTP → S3).
Transforming file formats (CSV → JSON).
Validating and processing file contents.
Uploading files to cloud storage.
- Parameters:
path – File path to process.
- Raises:
Should raise exceptions on processing failures.
- on_success(path: str)[source]#
Called after a file is successfully processed. Override this method to implement post-processing actions such as:
Archiving processed files
Moving files to a “completed” directory
Updating processing logs or databases
Sending notifications
- Parameters:
path – Path of the successfully processed file.
- on_error(path: str)[source]#
Called when an error occurs during file processing. Override this method to implement error handling actions such as:
Moving failed files to an “error” directory
Logging detailed error information
Sending error notifications
Quarantining problematic files
- Parameters:
path – Path of the file that failed processing.
- _abc_impl = <_abc._abc_data object>#
IBaseEtlFromRecord#
- class core_etl.record_based.IBaseEtlFromRecord(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs)[source]#
-
Base class for an ETL task that need to do ETLs processes over data (records, rows) retrieved from different sources like: file, sFTP server, SQS queues, APIs or another data source…
- __init__(name_mapper: Dict[str, str] | None = None, type_mapper: Dict[str, str] | None = None, attrs_to_remove: List[str] | None = None, **kwargs) None[source]#
Initialize record-based ETL with transformation configuration.
- Parameters:
name_mapper – Dictionary mapping old field names to new field names. Example: {“old_name”: “new_name”, “id”: “user_id”}.
type_mapper – Dictionary defining data type conversions for fields. Example: {“age”: “int”, “price”: “float”, “active”: “bool”}
attrs_to_remove – List of field names to remove from each record. Example: [“temp_field”, “internal_id”, “debug_info”].
kwargs – Additional keyword arguments passed to parent IBaseETL.
- _execute(*args, **kwargs) int[source]#
Executes record processing workflow with configurable transformations.
- Workflow:
Retrieves records in batches via retrieve_records()
For each batch:
Logs batch processing start
For each record in batch: a. Applies pre_transformations() (custom logic). b. Removes unwanted attributes (attrs_to_remove). c. Renames fields (name_mapper). d. Converts data types (type_mapper). e. Applies post_transformations() (custom logic).
Processes complete transformed batch via process_records().
Returns total number of records processed across all batches.
Individual record errors do not stop batch processing. A failing record is logged and skipped; remaining records and batches continue.
- Returns:
Total number of records processed across all batches.
- abstractmethod retrieve_records() Iterator[List[Dict]][source]#
Retrieves records from data sources in batches.
This method must be implemented by subclasses to define how records are fetched from the source system. Returns batches of records to enable memory-efficient processing and prevent resource exhaustion.
Common implementations include:
Database queries with pagination.
API calls with page-based retrieval.
File reading with chunk processing.
Message queue consumption.
Stream processing.
- Returns:
Iterator yielding batches (lists) of records as dictionaries. Batch sizing is the subclass’s responsibility.
- pre_transformations(record: Dict) None[source]#
Apply custom transformations before built-in transformations.
Override this method to implement custom business logic that should be applied before the standard transformations (remove, rename, cast). This allows for data preparation, validation, or enrichment.
Common use cases:
Data validation and cleanup.
Field normalization (e.g., date formatting).
Data enrichment from external sources.
Business-specific calculations.
Record filtering or marking.
- Parameters:
record – Record dictionary to transform (modified in-place).
- post_transformations(record: Dict) None[source]#
Apply custom transformations after built-in transformations.
Override this method to implement custom business logic that should be applied after the standard transformations (remove, rename, cast). This allows for final data processing before the load phase.
Common use cases:
Final data validation.
Derived field calculations.
Data formatting for destination system.
Business rule application.
Data quality checks.
- Parameters:
record – Record dictionary to transform (modified in-place).
- abstractmethod process_records(records: List[Dict])[source]#
Processes a batch of transformed records.
This method must be implemented by subclasses to define what happens to records after all transformations have been applied. This is the “Load” phase of the ETL process.
Common implementations include:
Storing records in databases (SQL, NoSQL).
Writing to files (CSV, JSON, Parquet).
Sending to message queues (SQS, Kafka).
Uploading to cloud storage (S3, Azure Blob).
Streaming to real-time systems (Kinesis, Pub/Sub).
Calling external APIs for data ingestion.
- Parameters:
records – List of transformed record dictionaries ready for processing.
- Raises:
Should raise exceptions on processing failures to trigger error handling.
- _abc_impl = <_abc._abc_data object>#
IAsyncETL#
- class core_etl.async_based.IAsyncETL(max_queue_size: int = 1000, max_workers: int = 10, **kwargs)[source]#
-
Base class for an ETL task that need to process elements in an asynchronous manner.
Note:
execute()callsasyncio.run()internally and cannot be invoked from within a running event loop. In async contexts use:await asyncio.to_thread(task.execute)- __init__(max_queue_size: int = 1000, max_workers: int = 10, **kwargs) None[source]#
- Parameters:
time_zone – The time zone to use in the ETL for date/datetime processing.
temp_folder – For task that need to store local files temporarily.
- _execute(*args, **kwargs) int[source]#
Generic implementation for all the extract, transform and load processes…
- Returns:
It returns the number of elements were processed.
- abstractmethod async produce_records()[source]#
Must be implemented in subclasses. It must populate the
asyncio.Queuewith the records will be processed via consumers in_process_recordfunction. Ensure:await queue.put(record)is called.
- async _consume_record(worker_id: str) None[source]#
Pulls records from the queue and dispatches them to
_process_record.
- abstractmethod async _process_record(record: Any) None[source]#
Must be implemented by child classes
- _abc_impl = <_abc._abc_data object>#