# NiFi ETL Flow — Hotel Reservations Data Mart ## Overview The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target). It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler. ``` [PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)] ``` Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections. --- ## Controller Services (shared by all PGs) | Name | Type | Config | |------|------|--------| | `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` | | `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@:1521:`; User: ``; Pwd: `` | | `JsonReader` | JsonTreeReader | default settings | | `JsonWriter` | JsonRecordSetWriter | default settings | | `AvroReader` | AvroReader | default settings | --- ## PG-1: Load Date Dimension **Runs once** (or when extending the date range). Populates `DIM_DATE` for 2020–2030. ``` GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL ``` ### Processors **GenerateFlowFile** - Run Schedule: manual (run once via right-click → Run Once) - Custom Text: `{}` **ExecuteScript** (Groovy) ```groovy import groovy.json.JsonOutput import java.time.* def rows = [] def d = LocalDate.of(2020, 1, 1) def end = LocalDate.of(2030, 12, 31) while (!d.isAfter(end)) { def m = d.monthValue def season = (m >= 6 && m <= 8) ? 'Peak' : (m >= 3 && m <= 5) ? 'High' : (m >= 9 && m <= 11) ? 'Autumn' : 'Winter' rows << [ date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int, full_date: d.toString(), year: d.year, quarter: ((m - 1) / 3 + 1) as int, month: m, month_name: d.month.toString().capitalize(), week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()), day_of_month: d.dayOfMonth, day_name: d.dayOfWeek.toString().capitalize(), is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0, is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0, season: season ] d = d.plusDays(1) } def ff = session.create() ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback) ff = session.putAttribute(ff, 'mime.type', 'application/json') session.transfer(ff, REL_SUCCESS) ``` **SplitJson** - JsonPath Expression: `$.*` **EvaluateJsonPath** - Destination: `flowfile-attribute` - Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season` **PutSQL** - JDBC Connection Pool: `Oracle_DBCPService` - SQL Statement: ```sql INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name, week_number, day_of_month, day_name, is_weekend, is_business_day, season) VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter}, ${month}, '${month_name}', ${week_number}, ${day_of_month}, '${day_name}', ${is_weekend}, ${is_business_day}, '${season}') ``` --- ## PG-2: Static Dimensions (SCD Type 1) Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL. Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones. Each sub-flow follows the same pattern: ``` ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE) ``` ### 2a — DIM_COUNTRY **ExecuteSQL** — Connection Pool: `MySQL_DBCPService` ```sql SELECT country_id, code, name, currency FROM country ORDER BY country_id ``` **EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency` **PutSQL** ```sql MERGE INTO DIM_COUNTRY tgt USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id}) WHEN MATCHED THEN UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}' WHEN NOT MATCHED THEN INSERT (country_id, code, name, currency) VALUES (${country_id}, '${code}', '${name}', '${currency}') ``` ### 2b — DIM_STAR_RATING **ExecuteSQL** ```sql SELECT star_rating_id, code, description FROM star_rating ORDER BY code ``` **PutSQL** ```sql MERGE INTO DIM_STAR_RATING tgt USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id}) WHEN MATCHED THEN UPDATE SET tgt.code = ${code}, tgt.description = '${description}' WHEN NOT MATCHED THEN INSERT (star_rating_id, code, description) VALUES (${star_rating_id}, ${code}, '${description}') ``` ### 2c — DIM_HOTEL_CHAIN **ExecuteSQL** ```sql SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id ``` **PutSQL** ```sql MERGE INTO DIM_HOTEL_CHAIN tgt USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id}) WHEN MATCHED THEN UPDATE SET tgt.code = '${code}', tgt.name = '${name}' WHEN NOT MATCHED THEN INSERT (hotel_chain_id, code, name) VALUES (${hotel_chain_id}, '${code}', '${name}') ``` --- ## PG-3: DIM_HOTEL — SCD Type 2 This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category. **Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction. ``` [Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL] ``` ### Step A: Truncate staging **GenerateFlowFile** → **PutSQL** ```sql TRUNCATE TABLE STG_HOTEL ``` ### Step B: Load staging from MySQL **ExecuteSQL** — `MySQL_DBCPService` ```sql SELECT h.hotel_id, hc.code AS chain_code, c.code AS country_code, sr.code AS star_code, h.code, h.name, h.city FROM hotel h JOIN country c ON c.country_id = h.country_id JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id ORDER BY h.hotel_id ``` **ConvertAvroToJSON** → **SplitJson** (`$.*`) **EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city` **PutSQL** → `STG_HOTEL` ```sql INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city) VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}') ``` ### Step C: Apply SCD2 logic **GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy) The Groovy script opens a JDBC connection and executes two SQL statements in one transaction: ```groovy import java.sql.* def conn = context.controllerServiceLookup .getControllerService('Oracle_DBCPService_ID') .getConnection() conn.autoCommit = false try { // 1. Expire records whose tracked attributes changed conn.prepareStatement(""" UPDATE DIM_HOTEL dh SET dh.expiry_date = TRUNC(SYSDATE) - 1, dh.is_current = 0 WHERE dh.is_current = 1 AND EXISTS ( SELECT 1 FROM STG_HOTEL s WHERE s.hotel_id = dh.source_hotel_id AND ( NVL(s.chain_code,'~') != NVL(( SELECT hc.code FROM DIM_HOTEL_CHAIN hc WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~') OR s.star_code != ( SELECT ds.code FROM DIM_STAR_RATING ds WHERE ds.star_rating_key = dh.star_rating_key) OR s.city != dh.city ) ) """).executeUpdate() // 2. Insert new version for changed hotels + insert brand-new hotels conn.prepareStatement(""" INSERT INTO DIM_HOTEL ( source_hotel_id, hotel_chain_key, country_key, star_rating_key, code, name, city, effective_date, expiry_date, is_current) SELECT s.hotel_id, (SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code), (SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code), (SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code), s.code, s.name, s.city, TRUNC(SYSDATE), NULL, 1 FROM STG_HOTEL s WHERE NOT EXISTS ( SELECT 1 FROM DIM_HOTEL d WHERE d.source_hotel_id = s.hotel_id AND d.is_current = 1 ) """).executeUpdate() conn.commit() } catch (Exception e) { conn.rollback() throw e } finally { conn.close() } def ff = session.create() session.transfer(ff, REL_SUCCESS) ``` --- ## PG-4: DIM_GUEST — SCD Type 1 Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here. ``` ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE) ``` **ExecuteSQL** — `MySQL_DBCPService` ```sql SELECT g.guest_id, c.code AS country_code, g.name, g.city FROM guest g LEFT JOIN country c ON c.country_id = g.country_id ORDER BY g.guest_id ``` **EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city` **PutSQL** ```sql MERGE INTO DIM_GUEST tgt USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id}) WHEN MATCHED THEN UPDATE SET tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')), tgt.name = '${name}', tgt.city = NULLIF('${city}','') WHEN NOT MATCHED THEN INSERT (guest_id, country_key, name, city) VALUES ( ${guest_id}, (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')), '${name}', NULLIF('${city}','') ) ``` > **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL). --- ## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark) The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle. `source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped. ``` [Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson] → [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark] ``` ### Step A: Read watermark **ExecuteSQL** — `Oracle_DBCPService` ```sql SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING' ``` **ConvertAvroToJSON** → **EvaluateJsonPath** - `watermark` ← `$.last_key` ### Step B: Load from MySQL **ExecuteSQL** — `MySQL_DBCPService` SQL Statement (use attribute `${watermark}`): ```sql SELECT rb.room_booking_id, rb.room_id, rb.date_from, rb.date_to, rb.nightly_rate, rb.total_amount, b.guest_id, b.status AS booking_status, DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed FROM room_booking rb JOIN booking b ON b.booking_id = rb.booking_id WHERE rb.room_booking_id > ${watermark} ORDER BY rb.room_booking_id LIMIT 50000 ``` > Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back. ### Step C: Split + extract attributes **ConvertAvroToJSON** → **SplitJson** (`$.*`) **EvaluateJsonPath** attributes: `room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed` ### Step D: Insert into fact table **PutSQL** — `Oracle_DBCPService` ```sql INSERT INTO FACT_ROOM_BOOKING ( source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key, star_rating_key, checkin_date_key, checkout_date_key, booking_status, nights_stayed, nightly_rate, total_amount) SELECT ${room_booking_id}, dh.hotel_key, dh.hotel_chain_key, dr.room_key, dg.guest_key, dg.country_key, dh.star_rating_key, TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')), TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')), '${booking_status}', ${nights_stayed}, TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''), TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''') FROM DIM_ROOM dr, DIM_GUEST dg, DIM_HOTEL dh WHERE dr.room_id = ${room_id} AND dg.guest_id = ${guest_id} AND dh.hotel_key = dr.hotel_key -- SCD2 lookup: find hotel version active at check-in date AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD') AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD')) -- Idempotent: skip if already loaded AND NOT EXISTS ( SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id} ) ``` > The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today. **Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs. ### Step E: Update watermark After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch. **UpdateAttribute** - `max_rb_id` ← `${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick) > Simplest approach: add a final **ExecuteSQL** that runs after the batch: ```sql UPDATE ETL_WATERMARK SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING), last_run_ts = SYSTIMESTAMP WHERE entity_name = 'FACT_ROOM_BOOKING' ``` --- ## Execution Order & Scheduling | PG | Trigger | Frequency | |----|---------|-----------| | PG-1 (Date Dim) | Manual (run once) | — | | PG-2 (Static Dims) | Timer — 24h | Daily | | PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 | | PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 | | PG-5 (Fact incremental) | Timer — 1h | Hourly | Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship. --- ## Why SCD Type 2 for DIM_HOTEL? A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.