core-etl#
This library provides essential components for ETL processes, offering reusable interfaces for seamless data extraction, transformation, and loading…
Documentation Contents#
Index:
Features#
Base ETL Framework
Template method pattern for ETL workflow orchestration
Comprehensive lifecycle hooks (pre-processing, execution, post-processing, cleanup)
Built-in error handling with detailed exception logging
Task status tracking (CREATED, EXECUTING, SUCCESS, ERROR)
Timezone support for date/datetime processing (defaults to UTC)
Temporary folder management for local file operations
Extensible resource cleanup mechanisms
File-Based ETL (IBaseEtlFromFile)
Process files from various sources (SFTP, local filesystem, cloud storage)
Iterator-based file processing with error isolation per file
Individual file success/error callbacks for custom handling
Batch file operations with automatic error recovery
Extensible hooks:
get_paths(),process_file(),on_success(),on_error()
Record-Based ETL (IBaseEtlFromRecord)
Process records from APIs, databases, files, message queues, and data streams
Memory-efficient batch processing, batch sizing is the subclass’s responsibility via
retrieve_records()Built-in transformation pipeline:
Field removal (
attrs_to_remove)Field renaming (
name_mapper)Data type casting (
type_mapper)
Pre and post transformation hooks for custom business logic
Extensible methods:
retrieve_records(),process_records(),pre_transformations(),post_transformations()
Async ETL (IAsyncETL)
Concurrent record processing via asyncio producer/consumer pattern
Configurable worker pool size (
max_workers) and queue capacity (max_queue_size)Individual record failures are isolated, failed records are logged and skipped without aborting the pipeline
Extensible methods:
produce_records(),_process_record()Note:
execute()usesasyncio.run()internally; callawait asyncio.to_thread(task.execute)from async contexts
Quick Start#
All base classes are importable directly from the top-level package:
from core_etl import (
IBaseETL,
IBaseEtlFromFile,
IBaseEtlFromRecord,
IAsyncETL,
)
Installation#
Install from PyPI using pip:
pip install core-etl
uv pip install core-etl # Or using UV...
pip install -e ".[dev]" # For development...
Setting Up Environment#
Install required libraries:
pip install --upgrade pip
pip install virtualenv
Create Python virtual environment:
virtualenv --python=python3.12 .venv
Activate the virtual environment:
source .venv/bin/activate
Install packages#
pip install .
pip install -e ".[dev]"
Check tests and coverage#
python manager.py run-tests
python manager.py run-coverage
Contributing#
Contributions are welcome! Please:
Fork the repository
Create a feature branch
Write tests for new functionality
Ensure all tests pass:
python manager.py run-testsRun linting:
pylint core_etlRun security checks:
bandit -r core_etlSubmit a pull request
License#
This project is licensed under the MIT License. See the LICENSE file for details.
Links#
Documentation: https://core-etl.readthedocs.io/en/latest/
Repository: bytecode-solutions/core/core-etl
Changelog: bytecode-solutions/core/core-etl/-/blob/master/CHANGELOG.md
Support#
For questions or support, please open an issue on GitLab or contact the maintainers.