core-etl#

This library provides essential components for ETL processes, offering reusable interfaces for seamless data extraction, transformation, and loading…


Python Versions License Pipeline Status Docs Status Security

Documentation Contents#

Index:

Features#

Base ETL Framework

  • Template method pattern for ETL workflow orchestration

  • Comprehensive lifecycle hooks (pre-processing, execution, post-processing, cleanup)

  • Built-in error handling with detailed exception logging

  • Task status tracking (CREATED, EXECUTING, SUCCESS, ERROR)

  • Timezone support for date/datetime processing (defaults to UTC)

  • Temporary folder management for local file operations

  • Extensible resource cleanup mechanisms

File-Based ETL (IBaseEtlFromFile)

  • Process files from various sources (SFTP, local filesystem, cloud storage)

  • Iterator-based file processing with error isolation per file

  • Individual file success/error callbacks for custom handling

  • Batch file operations with automatic error recovery

  • Extensible hooks: get_paths(), process_file(), on_success(), on_error()

Record-Based ETL (IBaseEtlFromRecord)

  • Process records from APIs, databases, files, message queues, and data streams

  • Memory-efficient batch processing, batch sizing is the subclass’s responsibility via retrieve_records()

  • Built-in transformation pipeline:

    • Field removal (attrs_to_remove)

    • Field renaming (name_mapper)

    • Data type casting (type_mapper)

  • Pre and post transformation hooks for custom business logic

  • Extensible methods: retrieve_records(), process_records(), pre_transformations(), post_transformations()

Async ETL (IAsyncETL)

  • Concurrent record processing via asyncio producer/consumer pattern

  • Configurable worker pool size (max_workers) and queue capacity (max_queue_size)

  • Individual record failures are isolated, failed records are logged and skipped without aborting the pipeline

  • Extensible methods: produce_records(), _process_record()

  • Note: execute() uses asyncio.run() internally; call await asyncio.to_thread(task.execute) from async contexts

Quick Start#

All base classes are importable directly from the top-level package:

from core_etl import (
    IBaseETL,
    IBaseEtlFromFile,
    IBaseEtlFromRecord,
    IAsyncETL,
)

Installation#

Install from PyPI using pip:

pip install core-etl
uv pip install core-etl     # Or using UV...
pip install -e ".[dev]"     # For development...

Setting Up Environment#

  1. Install required libraries:

pip install --upgrade pip
pip install virtualenv
  1. Create Python virtual environment:

virtualenv --python=python3.12 .venv
  1. Activate the virtual environment:

source .venv/bin/activate

Install packages#

pip install .
pip install -e ".[dev]"

Check tests and coverage#

python manager.py run-tests
python manager.py run-coverage

Contributing#

Contributions are welcome! Please:

  1. Fork the repository

  2. Create a feature branch

  3. Write tests for new functionality

  4. Ensure all tests pass: python manager.py run-tests

  5. Run linting: pylint core_etl

  6. Run security checks: bandit -r core_etl

  7. Submit a pull request

License#

This project is licensed under the MIT License. See the LICENSE file for details.

Support#

For questions or support, please open an issue on GitLab or contact the maintainers.

Authors#