Pattern 1 Factory + dynamic module loader transformation/src/core/jobs/common/job_factory.py
The framework — zero knowledge of individual sources
class JobFactory:

    @staticmethod
    def get_job(job_type, io_handler, key_value_parser, spark, logger):

        if job_type == JobType.BRONZE_TO_SILVER.value:
            source_name  = key_value_parser.get_value("source_name")
            dataset_name = key_value_parser.get_value("dataset_name")

            # dynamically imports  src.source.{source}.src.transformer.{dataset}_transform
            conf_module  = DynamicMod().import_source_trans(source_name, dataset_name)
            transformer  = DynamicMod().get_src_class(conf_module, "Transformer")(io_handler)

            return S3ReaderJob(key_value_parser, io_handler, transformer, logger)

        elif job_type == JobType.SILVER_TO_GOLD.value:
            return S3ReaderJob(key_value_parser, io_handler, "", logger)
Each source — plug in by convention, no core changes
# transformation/src/source/cdm/src/transformer/customer_attributes_transform.py

class Transformer:           # name is the contract — DynamicMod finds it

    def apply_transformations(self, input_df, source_date):
        input_df = CommonTransformer.remove_white_spaces_from_col(self, input_df)
        input_df = CommonTransformer.convert_to_lower_case(self, input_df)
        input_df = CommonTransformer.add_audit_column_to_dataframe(self, input_df)
        input_df = input_df.withColumn("source_date", F.to_date(F.lit(source_date), "yyyyMMdd"))
        return input_df
Pattern 2 Config-driven pipelines — one framework, every source type ingestion/src/core/ingestion_driver.py
Source type resolved from config at runtime — no bespoke pipelines
def ingest(self):
    source_type = self.keyvalue_parser.get_value("source_type")  # from job args

    if   source_type == 'oracle'  : db_util = OracleUtil   (spark, parser, ...)
    elif source_type == 'cassandra': db_util = CassandraUtil(spark, parser, ...)
    elif source_type == 'kafka'   : db_util = KafkaUtil    (spark, parser, ...)
    elif source_type == 'adobe'   : db_util = AdobeUtil    (spark, parser, ...)
    else: raise Exception(f"Unsupported source type: {source_type}")

    db_util.execute()     # same interface, every source
Kafka stream config — env-specific, no hardcoding
# ingestion/src/core/util/streaming/configs/dev/order_header.conf

input {
    schema_registry_id    = "100248"
    read_start_position   = "earliest"
    groupId               = "order-header-dev-group"
}
processing {
    file_output_format    = "parquet"
    batch_interval        = "60 seconds"
    max_records_per_batch = 500
    # prod/ qa/ uat/ each have their own copy — zero env hardcoding
}
Pattern 3 Structured ETL logging — every job, fully traceable logger/src/common/log.py
Every log line carries full job context — no guessing in prod
class ETLLog:
    def __init__(self, job_run_id, stage, dataset_name,
                       source_name, pipeline_id, domain_name, app_id):
        self.job_run_id   = job_run_id    # ties every log to a DAG run
        self.pipeline_id  = pipeline_id   # links ingestion → transform → quality
        self.domain_name  = domain_name   # customer / master / crm …
        self.app_id       = app_id         # Spark app id for EMR correlation

    def create_log(self):
        return (
            f"{{app_id: {self.app_id}, domain: {self.domain_name}"
            f", pipeline: {self.pipeline_id}, source: {self.source_name}"
            f", stage: {self.stage}, dataset: {self.dataset_name}}}"
        )
Pattern 4 Testing — co-located, Spark-aware, S3 mocked locally transformation/tests/helper/spark_test_base.py
Shared base class — every Spark test inherits a real local session
class SparkTestBase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.spark = cls.get_spark_session()    # local[*] Spark, real Parquet/Hudi jars
        cls.logger = cls.get_etl_logger().logger

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    def assert_df_equals(self, expected, result):
        self.assertEqual(expected.schema, result.schema)
        unequal = expected.rdd.zipWithIndex().join(
                     result.rdd.zipWithIndex()).filter(
                         lambda x: x[1][0] != x[1][1])
        self.assertEqual([], unequal.take(10))   # row-level diff on failure

    # S3 mocked with moto — run `moto_server s3 -p 5000` before the suite
    # pytest tests/ -m "not integtest"  →  unit only, no AWS credentials needed