The framework — zero knowledge of individual sources
class JobFactory:
@staticmethod
def get_job(job_type, io_handler, key_value_parser, spark, logger):
if job_type == JobType.BRONZE_TO_SILVER.value:
source_name = key_value_parser.get_value("source_name")
dataset_name = key_value_parser.get_value("dataset_name")
# dynamically imports src.source.{source}.src.transformer.{dataset}_transform
conf_module = DynamicMod().import_source_trans(source_name, dataset_name)
transformer = DynamicMod().get_src_class(conf_module, "Transformer")(io_handler)
return S3ReaderJob(key_value_parser, io_handler, transformer, logger)
elif job_type == JobType.SILVER_TO_GOLD.value:
return S3ReaderJob(key_value_parser, io_handler, "", logger)
Each source — plug in by convention, no core changes
# transformation/src/source/cdm/src/transformer/customer_attributes_transform.py
class Transformer: # name is the contract — DynamicMod finds it
def apply_transformations(self, input_df, source_date):
input_df = CommonTransformer.remove_white_spaces_from_col(self, input_df)
input_df = CommonTransformer.convert_to_lower_case(self, input_df)
input_df = CommonTransformer.add_audit_column_to_dataframe(self, input_df)
input_df = input_df.withColumn("source_date", F.to_date(F.lit(source_date), "yyyyMMdd"))
return input_df
Source type resolved from config at runtime — no bespoke pipelines
def ingest(self):
source_type = self.keyvalue_parser.get_value("source_type") # from job args
if source_type == 'oracle' : db_util = OracleUtil (spark, parser, ...)
elif source_type == 'cassandra': db_util = CassandraUtil(spark, parser, ...)
elif source_type == 'kafka' : db_util = KafkaUtil (spark, parser, ...)
elif source_type == 'adobe' : db_util = AdobeUtil (spark, parser, ...)
else: raise Exception(f"Unsupported source type: {source_type}")
db_util.execute() # same interface, every source
Kafka stream config — env-specific, no hardcoding
# ingestion/src/core/util/streaming/configs/dev/order_header.conf
input {
schema_registry_id = "100248"
read_start_position = "earliest"
groupId = "order-header-dev-group"
}
processing {
file_output_format = "parquet"
batch_interval = "60 seconds"
max_records_per_batch = 500
# prod/ qa/ uat/ each have their own copy — zero env hardcoding
}
Every log line carries full job context — no guessing in prod
class ETLLog:
def __init__(self, job_run_id, stage, dataset_name,
source_name, pipeline_id, domain_name, app_id):
self.job_run_id = job_run_id # ties every log to a DAG run
self.pipeline_id = pipeline_id # links ingestion → transform → quality
self.domain_name = domain_name # customer / master / crm …
self.app_id = app_id # Spark app id for EMR correlation
def create_log(self):
return (
f"{{app_id: {self.app_id}, domain: {self.domain_name}"
f", pipeline: {self.pipeline_id}, source: {self.source_name}"
f", stage: {self.stage}, dataset: {self.dataset_name}}}"
)
Shared base class — every Spark test inherits a real local session
class SparkTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = cls.get_spark_session() # local[*] Spark, real Parquet/Hudi jars
cls.logger = cls.get_etl_logger().logger
@classmethod
def tearDownClass(cls):
cls.spark.stop()
def assert_df_equals(self, expected, result):
self.assertEqual(expected.schema, result.schema)
unequal = expected.rdd.zipWithIndex().join(
result.rdd.zipWithIndex()).filter(
lambda x: x[1][0] != x[1][1])
self.assertEqual([], unequal.take(10)) # row-level diff on failure
# S3 mocked with moto — run `moto_server s3 -p 5000` before the suite
# pytest tests/ -m "not integtest" → unit only, no AWS credentials needed