diff --git a/docs/nifi-flow.md b/docs/nifi-flow.md new file mode 100644 index 0000000..b72f2da --- /dev/null +++ b/docs/nifi-flow.md @@ -0,0 +1,451 @@ +# NiFi ETL Flow — Hotel Reservations Data Mart + +## Overview + +The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target). +It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler. + +``` +[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)] +``` + +Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections. + +--- + +## Controller Services (shared by all PGs) + +| Name | Type | Config | +|------|------|--------| +| `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` | +| `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@:1521:`; User: ``; Pwd: `` | +| `JsonReader` | JsonTreeReader | default settings | +| `JsonWriter` | JsonRecordSetWriter | default settings | +| `AvroReader` | AvroReader | default settings | + +--- + +## PG-1: Load Date Dimension + +**Runs once** (or when extending the date range). Populates `DIM_DATE` for 2020–2030. + +``` +GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL +``` + +### Processors + +**GenerateFlowFile** +- Run Schedule: manual (run once via right-click → Run Once) +- Custom Text: `{}` + +**ExecuteScript** (Groovy) +```groovy +import groovy.json.JsonOutput +import java.time.* + +def rows = [] +def d = LocalDate.of(2020, 1, 1) +def end = LocalDate.of(2030, 12, 31) +while (!d.isAfter(end)) { + def m = d.monthValue + def season = (m >= 6 && m <= 8) ? 'Peak' + : (m >= 3 && m <= 5) ? 'High' + : (m >= 9 && m <= 11) ? 'Autumn' + : 'Winter' + rows << [ + date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int, + full_date: d.toString(), + year: d.year, + quarter: ((m - 1) / 3 + 1) as int, + month: m, + month_name: d.month.toString().capitalize(), + week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()), + day_of_month: d.dayOfMonth, + day_name: d.dayOfWeek.toString().capitalize(), + is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0, + is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0, + season: season + ] + d = d.plusDays(1) +} +def ff = session.create() +ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback) +ff = session.putAttribute(ff, 'mime.type', 'application/json') +session.transfer(ff, REL_SUCCESS) +``` + +**SplitJson** +- JsonPath Expression: `$.*` + +**EvaluateJsonPath** +- Destination: `flowfile-attribute` +- Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season` + +**PutSQL** +- JDBC Connection Pool: `Oracle_DBCPService` +- SQL Statement: +```sql +INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name, + week_number, day_of_month, day_name, is_weekend, is_business_day, season) +VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter}, + ${month}, '${month_name}', ${week_number}, ${day_of_month}, + '${day_name}', ${is_weekend}, ${is_business_day}, '${season}') +``` + +--- + +## PG-2: Static Dimensions (SCD Type 1) + +Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL. +Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones. + +Each sub-flow follows the same pattern: + +``` +ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE) +``` + +### 2a — DIM_COUNTRY + +**ExecuteSQL** — Connection Pool: `MySQL_DBCPService` +```sql +SELECT country_id, code, name, currency FROM country ORDER BY country_id +``` + +**EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency` + +**PutSQL** +```sql +MERGE INTO DIM_COUNTRY tgt +USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id}) +WHEN MATCHED THEN + UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}' +WHEN NOT MATCHED THEN + INSERT (country_id, code, name, currency) + VALUES (${country_id}, '${code}', '${name}', '${currency}') +``` + +### 2b — DIM_STAR_RATING + +**ExecuteSQL** +```sql +SELECT star_rating_id, code, description FROM star_rating ORDER BY code +``` + +**PutSQL** +```sql +MERGE INTO DIM_STAR_RATING tgt +USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id}) +WHEN MATCHED THEN + UPDATE SET tgt.code = ${code}, tgt.description = '${description}' +WHEN NOT MATCHED THEN + INSERT (star_rating_id, code, description) + VALUES (${star_rating_id}, ${code}, '${description}') +``` + +### 2c — DIM_HOTEL_CHAIN + +**ExecuteSQL** +```sql +SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id +``` + +**PutSQL** +```sql +MERGE INTO DIM_HOTEL_CHAIN tgt +USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id}) +WHEN MATCHED THEN + UPDATE SET tgt.code = '${code}', tgt.name = '${name}' +WHEN NOT MATCHED THEN + INSERT (hotel_chain_id, code, name) + VALUES (${hotel_chain_id}, '${code}', '${name}') +``` + +--- + +## PG-3: DIM_HOTEL — SCD Type 2 + +This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category. + +**Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction. + +``` +[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL] +``` + +### Step A: Truncate staging + +**GenerateFlowFile** → **PutSQL** +```sql +TRUNCATE TABLE STG_HOTEL +``` + +### Step B: Load staging from MySQL + +**ExecuteSQL** — `MySQL_DBCPService` +```sql +SELECT + h.hotel_id, + hc.code AS chain_code, + c.code AS country_code, + sr.code AS star_code, + h.code, + h.name, + h.city +FROM hotel h +JOIN country c ON c.country_id = h.country_id +JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id +LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id +ORDER BY h.hotel_id +``` + +**ConvertAvroToJSON** → **SplitJson** (`$.*`) + +**EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city` + +**PutSQL** → `STG_HOTEL` +```sql +INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city) +VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}') +``` + +### Step C: Apply SCD2 logic + +**GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy) + +The Groovy script opens a JDBC connection and executes two SQL statements in one transaction: + +```groovy +import java.sql.* + +def conn = context.controllerServiceLookup + .getControllerService('Oracle_DBCPService_ID') + .getConnection() +conn.autoCommit = false + +try { + // 1. Expire records whose tracked attributes changed + conn.prepareStatement(""" + UPDATE DIM_HOTEL dh + SET dh.expiry_date = TRUNC(SYSDATE) - 1, + dh.is_current = 0 + WHERE dh.is_current = 1 + AND EXISTS ( + SELECT 1 FROM STG_HOTEL s + WHERE s.hotel_id = dh.source_hotel_id + AND ( + NVL(s.chain_code,'~') != NVL(( + SELECT hc.code FROM DIM_HOTEL_CHAIN hc + WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~') + OR s.star_code != ( + SELECT ds.code FROM DIM_STAR_RATING ds + WHERE ds.star_rating_key = dh.star_rating_key) + OR s.city != dh.city + ) + ) + """).executeUpdate() + + // 2. Insert new version for changed hotels + insert brand-new hotels + conn.prepareStatement(""" + INSERT INTO DIM_HOTEL ( + source_hotel_id, hotel_chain_key, country_key, star_rating_key, + code, name, city, effective_date, expiry_date, is_current) + SELECT + s.hotel_id, + (SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code), + (SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code), + (SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code), + s.code, s.name, s.city, + TRUNC(SYSDATE), NULL, 1 + FROM STG_HOTEL s + WHERE NOT EXISTS ( + SELECT 1 FROM DIM_HOTEL d + WHERE d.source_hotel_id = s.hotel_id + AND d.is_current = 1 + ) + """).executeUpdate() + + conn.commit() +} catch (Exception e) { + conn.rollback() + throw e +} finally { + conn.close() +} +def ff = session.create() +session.transfer(ff, REL_SUCCESS) +``` + +--- + +## PG-4: DIM_GUEST — SCD Type 1 + +Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here. + +``` +ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE) +``` + +**ExecuteSQL** — `MySQL_DBCPService` +```sql +SELECT g.guest_id, c.code AS country_code, g.name, g.city +FROM guest g +LEFT JOIN country c ON c.country_id = g.country_id +ORDER BY g.guest_id +``` + +**EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city` + +**PutSQL** +```sql +MERGE INTO DIM_GUEST tgt +USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id}) +WHEN MATCHED THEN + UPDATE SET + tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')), + tgt.name = '${name}', + tgt.city = NULLIF('${city}','') +WHEN NOT MATCHED THEN + INSERT (guest_id, country_key, name, city) + VALUES ( + ${guest_id}, + (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')), + '${name}', + NULLIF('${city}','') + ) +``` + +> **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL). + +--- + +## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark) + +The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle. + +`source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped. + +``` +[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson] + → [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark] +``` + +### Step A: Read watermark + +**ExecuteSQL** — `Oracle_DBCPService` +```sql +SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING' +``` + +**ConvertAvroToJSON** → **EvaluateJsonPath** +- `watermark` ← `$.last_key` + +### Step B: Load from MySQL + +**ExecuteSQL** — `MySQL_DBCPService` +SQL Statement (use attribute `${watermark}`): +```sql +SELECT + rb.room_booking_id, + rb.room_id, + rb.date_from, + rb.date_to, + rb.nightly_rate, + rb.total_amount, + b.guest_id, + b.status AS booking_status, + DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed +FROM room_booking rb +JOIN booking b ON b.booking_id = rb.booking_id +WHERE rb.room_booking_id > ${watermark} +ORDER BY rb.room_booking_id +LIMIT 50000 +``` + +> Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back. + +### Step C: Split + extract attributes + +**ConvertAvroToJSON** → **SplitJson** (`$.*`) + +**EvaluateJsonPath** attributes: +`room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed` + +### Step D: Insert into fact table + +**PutSQL** — `Oracle_DBCPService` +```sql +INSERT INTO FACT_ROOM_BOOKING ( + source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key, + star_rating_key, checkin_date_key, checkout_date_key, + booking_status, nights_stayed, nightly_rate, total_amount) +SELECT + ${room_booking_id}, + dh.hotel_key, + dh.hotel_chain_key, + dr.room_key, + dg.guest_key, + dg.country_key, + dh.star_rating_key, + TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')), + TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')), + '${booking_status}', + ${nights_stayed}, + TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''), + TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''') +FROM + DIM_ROOM dr, + DIM_GUEST dg, + DIM_HOTEL dh +WHERE + dr.room_id = ${room_id} + AND dg.guest_id = ${guest_id} + AND dh.hotel_key = dr.hotel_key + -- SCD2 lookup: find hotel version active at check-in date + AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD') + AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD')) + -- Idempotent: skip if already loaded + AND NOT EXISTS ( + SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id} + ) +``` + +> The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today. + +**Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs. + +### Step E: Update watermark + +After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch. + +**UpdateAttribute** +- `max_rb_id` ← `${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick) + +> Simplest approach: add a final **ExecuteSQL** that runs after the batch: +```sql +UPDATE ETL_WATERMARK +SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING), + last_run_ts = SYSTIMESTAMP +WHERE entity_name = 'FACT_ROOM_BOOKING' +``` + +--- + +## Execution Order & Scheduling + +| PG | Trigger | Frequency | +|----|---------|-----------| +| PG-1 (Date Dim) | Manual (run once) | — | +| PG-2 (Static Dims) | Timer — 24h | Daily | +| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 | +| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 | +| PG-5 (Fact incremental) | Timer — 1h | Hourly | + +Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship. + +--- + +## Why SCD Type 2 for DIM_HOTEL? + +A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.