Files
IPZ_1/docs/05_etl_pipeline.md
2026-05-17 17:17:04 +02:00

5.5 KiB

ETL Pipeline

Overview

The ETL (Extract, Transform, Load) pipeline is built in Apache NiFi and moves data from the MySQL OLTP database into the Oracle Data Mart. It runs 8 sequential pipelines — one per target table — each following the same processor chain.

For the detailed step-by-step NiFi configuration (which buttons to click, which properties to set), see nifi/NIFI_SETUP.md.


Two Phases Before NiFi

Phase 1 — Seed the OLTP (one-time)

Before NiFi runs, the MySQL database must be populated. This is done by the C# seed script:

./docker/start.sh          # start MySQL container
dotnet run ./scripts/seed.cs

The script reads all 10 CSV files, resolves foreign key relationships (game names, country lookups, organization cross-references), and inserts everything in the correct dependency order.

Phase 2 — Create the Data Mart schema in Oracle (one-time)

Run sql/datamart_schema.sql against your Oracle lab schema before the first NiFi run. This creates all 8 tables. Oracle SQL Developer or any SQL client works for this.


NiFi Processor Chain

Every pipeline follows this identical 5-processor pattern:

ExecuteSQL          ConvertAvroToJSON     SplitJson
(MySQL source)  ──►  (Avro → JSON)   ──►  (array → 1 FlowFile per row)
                                               │
                                               ▼
                                       EvaluateJsonPath
                                       (JSON fields → FlowFile attributes)
                                               │
                                               ▼
                                           PutSQL
                                        (Oracle target)
Processor Role
ExecuteSQL Runs the extract SQL on MySQL, produces Avro-encoded records
ConvertAvroToJSON Converts the Avro binary to a JSON array
SplitJson Splits the JSON array into one FlowFile per record
EvaluateJsonPath Reads each field from the JSON record and stores it as a named FlowFile attribute
PutSQL Runs the Oracle INSERT statement, substituting ${attribute} placeholders with the FlowFile attribute values

The 8 Pipelines

Pipelines must run in this order because facts depend on all dimensions being loaded first.

Dimensions (run first, in any order among themselves)

# Pipeline Extract source Rows
1 DIM_DATE MySQL CTE (generates date range) 48
2 DIM_MEDAL No extract — 3 static rows inserted directly to Oracle 3
3 DIM_GAME game table 25
4 DIM_COUNTRY country table 36+
5 DIM_ORGANIZATION organization JOIN country 60+

Facts (run after all dimensions)

# Pipeline Extract source Rows
6 FACT_TOURNAMENT tournament JOIN schedule JOIN organization 27
7 FACT_MEDAL_AWARD medalist JOIN tournament 257
8 FACT_CLUB_STANDING club_championship_standing 24

How Key Resolution Works

The OLTP stores natural keys (e.g. game_id = 3). The Oracle Data Mart uses surrogate keys generated by GENERATED ALWAYS AS IDENTITY (e.g. game_key = 3). These can differ if there is ever a gap or reorder.

The fact load SQL handles this by embedding a sub-SELECT inside each INSERT...SELECT...FROM DUAL:

INSERT INTO FACT_TOURNAMENT (game_key, ...)
SELECT
    (SELECT game_key FROM DIM_GAME WHERE game_id = ${game_id}),
    ...
FROM DUAL

This means the Oracle database itself resolves the surrogate key at insert time, using the natural key that was extracted from MySQL and carried through as a FlowFile attribute. No transformation processor is needed.


SQL File Layout

nifi/sql/
├── extract/
│   ├── 01_dim_date.sql          Recursive CTE to generate calendar rows
│   ├── 02_dim_game.sql          Simple SELECT from game
│   ├── 03_dim_country.sql       Simple SELECT from country
│   ├── 04_dim_organization.sql  SELECT with LEFT JOIN to country for country name
│   ├── 05_fact_tournament.sql   JOIN with schedule and org; computes date keys
│   ├── 06_fact_medal_award.sql  JOIN with tournament; computes medal_points
│   └── 07_fact_club_standing.sql
└── load/
    ├── 01_dim_date.sql           INSERT with TO_DATE conversion for Oracle
    ├── 02_dim_medal.sql          Static 3-row INSERT, run once directly
    ├── 03_dim_game.sql
    ├── 04_dim_country.sql
    ├── 05_dim_organization.sql   NULL-safe EL expressions for optional fields
    ├── 06_fact_tournament.sql    Sub-SELECT key lookups via game_id, org_id
    ├── 07_fact_medal_award.sql   Sub-SELECT key lookups; medal_type lookup
    └── 08_fact_club_standing.sql Sub-SELECT key lookup via org_id

Transformations Performed

Most of the "transformation" work happens in the extract SQL rather than in NiFi processors, which keeps the NiFi flow simple.

Transformation Where it happens
Date → YYYYMMDD integer key CAST(DATE_FORMAT(..., '%Y%m%d') AS UNSIGNED) in extract SQL
Medal → medal_points (Gold=3, Silver=2, Bronze=1) CASE expression in extract SQL
Boolean → 0/1 CASE WHEN ... = 1 THEN 1 ELSE 0 END in extract SQL
Natural key → surrogate key Sub-SELECT in Oracle load SQL
Duration calculation fallback COALESCE(s.duration_days, DATEDIFF(...) + 1) in extract SQL
NULL handling for optional fields NiFi Expression Language :isEmpty():ifElse(...) in load SQL