5.5 KiB
ETL Pipeline
Overview
The ETL (Extract, Transform, Load) pipeline is built in Apache NiFi and moves data from the MySQL OLTP database into the Oracle Data Mart. It runs 8 sequential pipelines — one per target table — each following the same processor chain.
For the detailed step-by-step NiFi configuration (which buttons to click, which properties to set), see nifi/NIFI_SETUP.md.
Two Phases Before NiFi
Phase 1 — Seed the OLTP (one-time)
Before NiFi runs, the MySQL database must be populated. This is done by the C# seed script:
./docker/start.sh # start MySQL container
dotnet run ./scripts/seed.cs
The script reads all 10 CSV files, resolves foreign key relationships (game names, country lookups, organization cross-references), and inserts everything in the correct dependency order.
Phase 2 — Create the Data Mart schema in Oracle (one-time)
Run sql/datamart_schema.sql against your Oracle lab schema before the first NiFi run. This creates all 8 tables. Oracle SQL Developer or any SQL client works for this.
NiFi Processor Chain
Every pipeline follows this identical 5-processor pattern:
ExecuteSQL ConvertAvroToJSON SplitJson
(MySQL source) ──► (Avro → JSON) ──► (array → 1 FlowFile per row)
│
▼
EvaluateJsonPath
(JSON fields → FlowFile attributes)
│
▼
PutSQL
(Oracle target)
| Processor | Role |
|---|---|
| ExecuteSQL | Runs the extract SQL on MySQL, produces Avro-encoded records |
| ConvertAvroToJSON | Converts the Avro binary to a JSON array |
| SplitJson | Splits the JSON array into one FlowFile per record |
| EvaluateJsonPath | Reads each field from the JSON record and stores it as a named FlowFile attribute |
| PutSQL | Runs the Oracle INSERT statement, substituting ${attribute} placeholders with the FlowFile attribute values |
The 8 Pipelines
Pipelines must run in this order because facts depend on all dimensions being loaded first.
Dimensions (run first, in any order among themselves)
| # | Pipeline | Extract source | Rows |
|---|---|---|---|
| 1 | DIM_DATE | MySQL CTE (generates date range) | 48 |
| 2 | DIM_MEDAL | No extract — 3 static rows inserted directly to Oracle | 3 |
| 3 | DIM_GAME | game table |
25 |
| 4 | DIM_COUNTRY | country table |
36+ |
| 5 | DIM_ORGANIZATION | organization JOIN country |
60+ |
Facts (run after all dimensions)
| # | Pipeline | Extract source | Rows |
|---|---|---|---|
| 6 | FACT_TOURNAMENT | tournament JOIN schedule JOIN organization |
27 |
| 7 | FACT_MEDAL_AWARD | medalist JOIN tournament |
257 |
| 8 | FACT_CLUB_STANDING | club_championship_standing |
24 |
How Key Resolution Works
The OLTP stores natural keys (e.g. game_id = 3). The Oracle Data Mart uses surrogate keys generated by GENERATED ALWAYS AS IDENTITY (e.g. game_key = 3). These can differ if there is ever a gap or reorder.
The fact load SQL handles this by embedding a sub-SELECT inside each INSERT...SELECT...FROM DUAL:
INSERT INTO FACT_TOURNAMENT (game_key, ...)
SELECT
(SELECT game_key FROM DIM_GAME WHERE game_id = ${game_id}),
...
FROM DUAL
This means the Oracle database itself resolves the surrogate key at insert time, using the natural key that was extracted from MySQL and carried through as a FlowFile attribute. No transformation processor is needed.
SQL File Layout
nifi/sql/
├── extract/
│ ├── 01_dim_date.sql Recursive CTE to generate calendar rows
│ ├── 02_dim_game.sql Simple SELECT from game
│ ├── 03_dim_country.sql Simple SELECT from country
│ ├── 04_dim_organization.sql SELECT with LEFT JOIN to country for country name
│ ├── 05_fact_tournament.sql JOIN with schedule and org; computes date keys
│ ├── 06_fact_medal_award.sql JOIN with tournament; computes medal_points
│ └── 07_fact_club_standing.sql
└── load/
├── 01_dim_date.sql INSERT with TO_DATE conversion for Oracle
├── 02_dim_medal.sql Static 3-row INSERT, run once directly
├── 03_dim_game.sql
├── 04_dim_country.sql
├── 05_dim_organization.sql NULL-safe EL expressions for optional fields
├── 06_fact_tournament.sql Sub-SELECT key lookups via game_id, org_id
├── 07_fact_medal_award.sql Sub-SELECT key lookups; medal_type lookup
└── 08_fact_club_standing.sql Sub-SELECT key lookup via org_id
Transformations Performed
Most of the "transformation" work happens in the extract SQL rather than in NiFi processors, which keeps the NiFi flow simple.
| Transformation | Where it happens |
|---|---|
| Date → YYYYMMDD integer key | CAST(DATE_FORMAT(..., '%Y%m%d') AS UNSIGNED) in extract SQL |
| Medal → medal_points (Gold=3, Silver=2, Bronze=1) | CASE expression in extract SQL |
| Boolean → 0/1 | CASE WHEN ... = 1 THEN 1 ELSE 0 END in extract SQL |
| Natural key → surrogate key | Sub-SELECT in Oracle load SQL |
| Duration calculation fallback | COALESCE(s.duration_days, DATEDIFF(...) + 1) in extract SQL |
| NULL handling for optional fields | NiFi Expression Language :isEmpty():ifElse(...) in load SQL |