452 lines
15 KiB
Markdown
452 lines
15 KiB
Markdown
# NiFi ETL Flow — Hotel Reservations Data Mart
|
||
|
||
## Overview
|
||
|
||
The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target).
|
||
It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler.
|
||
|
||
```
|
||
[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]
|
||
```
|
||
|
||
Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections.
|
||
|
||
---
|
||
|
||
## Controller Services (shared by all PGs)
|
||
|
||
| Name | Type | Config |
|
||
|------|------|--------|
|
||
| `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` |
|
||
| `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@<host>:1521:<sid>`; User: `<schema>`; Pwd: `<pwd>` |
|
||
| `JsonReader` | JsonTreeReader | default settings |
|
||
| `JsonWriter` | JsonRecordSetWriter | default settings |
|
||
| `AvroReader` | AvroReader | default settings |
|
||
|
||
---
|
||
|
||
## PG-1: Load Date Dimension
|
||
|
||
**Runs once** (or when extending the date range). Populates `DIM_DATE` for 2020–2030.
|
||
|
||
```
|
||
GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL
|
||
```
|
||
|
||
### Processors
|
||
|
||
**GenerateFlowFile**
|
||
- Run Schedule: manual (run once via right-click → Run Once)
|
||
- Custom Text: `{}`
|
||
|
||
**ExecuteScript** (Groovy)
|
||
```groovy
|
||
import groovy.json.JsonOutput
|
||
import java.time.*
|
||
|
||
def rows = []
|
||
def d = LocalDate.of(2020, 1, 1)
|
||
def end = LocalDate.of(2030, 12, 31)
|
||
while (!d.isAfter(end)) {
|
||
def m = d.monthValue
|
||
def season = (m >= 6 && m <= 8) ? 'Peak'
|
||
: (m >= 3 && m <= 5) ? 'High'
|
||
: (m >= 9 && m <= 11) ? 'Autumn'
|
||
: 'Winter'
|
||
rows << [
|
||
date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
|
||
full_date: d.toString(),
|
||
year: d.year,
|
||
quarter: ((m - 1) / 3 + 1) as int,
|
||
month: m,
|
||
month_name: d.month.toString().capitalize(),
|
||
week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
|
||
day_of_month: d.dayOfMonth,
|
||
day_name: d.dayOfWeek.toString().capitalize(),
|
||
is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0,
|
||
is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
|
||
season: season
|
||
]
|
||
d = d.plusDays(1)
|
||
}
|
||
def ff = session.create()
|
||
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
|
||
ff = session.putAttribute(ff, 'mime.type', 'application/json')
|
||
session.transfer(ff, REL_SUCCESS)
|
||
```
|
||
|
||
**SplitJson**
|
||
- JsonPath Expression: `$.*`
|
||
|
||
**EvaluateJsonPath**
|
||
- Destination: `flowfile-attribute`
|
||
- Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season`
|
||
|
||
**PutSQL**
|
||
- JDBC Connection Pool: `Oracle_DBCPService`
|
||
- SQL Statement:
|
||
```sql
|
||
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
|
||
week_number, day_of_month, day_name, is_weekend, is_business_day, season)
|
||
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
|
||
${month}, '${month_name}', ${week_number}, ${day_of_month},
|
||
'${day_name}', ${is_weekend}, ${is_business_day}, '${season}')
|
||
```
|
||
|
||
---
|
||
|
||
## PG-2: Static Dimensions (SCD Type 1)
|
||
|
||
Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL.
|
||
Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones.
|
||
|
||
Each sub-flow follows the same pattern:
|
||
|
||
```
|
||
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
|
||
```
|
||
|
||
### 2a — DIM_COUNTRY
|
||
|
||
**ExecuteSQL** — Connection Pool: `MySQL_DBCPService`
|
||
```sql
|
||
SELECT country_id, code, name, currency FROM country ORDER BY country_id
|
||
```
|
||
|
||
**EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency`
|
||
|
||
**PutSQL**
|
||
```sql
|
||
MERGE INTO DIM_COUNTRY tgt
|
||
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
|
||
WHEN MATCHED THEN
|
||
UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (country_id, code, name, currency)
|
||
VALUES (${country_id}, '${code}', '${name}', '${currency}')
|
||
```
|
||
|
||
### 2b — DIM_STAR_RATING
|
||
|
||
**ExecuteSQL**
|
||
```sql
|
||
SELECT star_rating_id, code, description FROM star_rating ORDER BY code
|
||
```
|
||
|
||
**PutSQL**
|
||
```sql
|
||
MERGE INTO DIM_STAR_RATING tgt
|
||
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
|
||
WHEN MATCHED THEN
|
||
UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (star_rating_id, code, description)
|
||
VALUES (${star_rating_id}, ${code}, '${description}')
|
||
```
|
||
|
||
### 2c — DIM_HOTEL_CHAIN
|
||
|
||
**ExecuteSQL**
|
||
```sql
|
||
SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id
|
||
```
|
||
|
||
**PutSQL**
|
||
```sql
|
||
MERGE INTO DIM_HOTEL_CHAIN tgt
|
||
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
|
||
WHEN MATCHED THEN
|
||
UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (hotel_chain_id, code, name)
|
||
VALUES (${hotel_chain_id}, '${code}', '${name}')
|
||
```
|
||
|
||
---
|
||
|
||
## PG-3: DIM_HOTEL — SCD Type 2
|
||
|
||
This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category.
|
||
|
||
**Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction.
|
||
|
||
```
|
||
[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]
|
||
```
|
||
|
||
### Step A: Truncate staging
|
||
|
||
**GenerateFlowFile** → **PutSQL**
|
||
```sql
|
||
TRUNCATE TABLE STG_HOTEL
|
||
```
|
||
|
||
### Step B: Load staging from MySQL
|
||
|
||
**ExecuteSQL** — `MySQL_DBCPService`
|
||
```sql
|
||
SELECT
|
||
h.hotel_id,
|
||
hc.code AS chain_code,
|
||
c.code AS country_code,
|
||
sr.code AS star_code,
|
||
h.code,
|
||
h.name,
|
||
h.city
|
||
FROM hotel h
|
||
JOIN country c ON c.country_id = h.country_id
|
||
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
|
||
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
|
||
ORDER BY h.hotel_id
|
||
```
|
||
|
||
**ConvertAvroToJSON** → **SplitJson** (`$.*`)
|
||
|
||
**EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city`
|
||
|
||
**PutSQL** → `STG_HOTEL`
|
||
```sql
|
||
INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
|
||
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')
|
||
```
|
||
|
||
### Step C: Apply SCD2 logic
|
||
|
||
**GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy)
|
||
|
||
The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:
|
||
|
||
```groovy
|
||
import java.sql.*
|
||
|
||
def conn = context.controllerServiceLookup
|
||
.getControllerService('Oracle_DBCPService_ID')
|
||
.getConnection()
|
||
conn.autoCommit = false
|
||
|
||
try {
|
||
// 1. Expire records whose tracked attributes changed
|
||
conn.prepareStatement("""
|
||
UPDATE DIM_HOTEL dh
|
||
SET dh.expiry_date = TRUNC(SYSDATE) - 1,
|
||
dh.is_current = 0
|
||
WHERE dh.is_current = 1
|
||
AND EXISTS (
|
||
SELECT 1 FROM STG_HOTEL s
|
||
WHERE s.hotel_id = dh.source_hotel_id
|
||
AND (
|
||
NVL(s.chain_code,'~') != NVL((
|
||
SELECT hc.code FROM DIM_HOTEL_CHAIN hc
|
||
WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
|
||
OR s.star_code != (
|
||
SELECT ds.code FROM DIM_STAR_RATING ds
|
||
WHERE ds.star_rating_key = dh.star_rating_key)
|
||
OR s.city != dh.city
|
||
)
|
||
)
|
||
""").executeUpdate()
|
||
|
||
// 2. Insert new version for changed hotels + insert brand-new hotels
|
||
conn.prepareStatement("""
|
||
INSERT INTO DIM_HOTEL (
|
||
source_hotel_id, hotel_chain_key, country_key, star_rating_key,
|
||
code, name, city, effective_date, expiry_date, is_current)
|
||
SELECT
|
||
s.hotel_id,
|
||
(SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
|
||
(SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code),
|
||
(SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
|
||
s.code, s.name, s.city,
|
||
TRUNC(SYSDATE), NULL, 1
|
||
FROM STG_HOTEL s
|
||
WHERE NOT EXISTS (
|
||
SELECT 1 FROM DIM_HOTEL d
|
||
WHERE d.source_hotel_id = s.hotel_id
|
||
AND d.is_current = 1
|
||
)
|
||
""").executeUpdate()
|
||
|
||
conn.commit()
|
||
} catch (Exception e) {
|
||
conn.rollback()
|
||
throw e
|
||
} finally {
|
||
conn.close()
|
||
}
|
||
def ff = session.create()
|
||
session.transfer(ff, REL_SUCCESS)
|
||
```
|
||
|
||
---
|
||
|
||
## PG-4: DIM_GUEST — SCD Type 1
|
||
|
||
Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.
|
||
|
||
```
|
||
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
|
||
```
|
||
|
||
**ExecuteSQL** — `MySQL_DBCPService`
|
||
```sql
|
||
SELECT g.guest_id, c.code AS country_code, g.name, g.city
|
||
FROM guest g
|
||
LEFT JOIN country c ON c.country_id = g.country_id
|
||
ORDER BY g.guest_id
|
||
```
|
||
|
||
**EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city`
|
||
|
||
**PutSQL**
|
||
```sql
|
||
MERGE INTO DIM_GUEST tgt
|
||
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
|
||
WHEN MATCHED THEN
|
||
UPDATE SET
|
||
tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
|
||
tgt.name = '${name}',
|
||
tgt.city = NULLIF('${city}','')
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (guest_id, country_key, name, city)
|
||
VALUES (
|
||
${guest_id},
|
||
(SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
|
||
'${name}',
|
||
NULLIF('${city}','')
|
||
)
|
||
```
|
||
|
||
> **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL).
|
||
|
||
---
|
||
|
||
## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)
|
||
|
||
The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle.
|
||
|
||
`source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.
|
||
|
||
```
|
||
[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
|
||
→ [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]
|
||
```
|
||
|
||
### Step A: Read watermark
|
||
|
||
**ExecuteSQL** — `Oracle_DBCPService`
|
||
```sql
|
||
SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'
|
||
```
|
||
|
||
**ConvertAvroToJSON** → **EvaluateJsonPath**
|
||
- `watermark` ← `$.last_key`
|
||
|
||
### Step B: Load from MySQL
|
||
|
||
**ExecuteSQL** — `MySQL_DBCPService`
|
||
SQL Statement (use attribute `${watermark}`):
|
||
```sql
|
||
SELECT
|
||
rb.room_booking_id,
|
||
rb.room_id,
|
||
rb.date_from,
|
||
rb.date_to,
|
||
rb.nightly_rate,
|
||
rb.total_amount,
|
||
b.guest_id,
|
||
b.status AS booking_status,
|
||
DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
|
||
FROM room_booking rb
|
||
JOIN booking b ON b.booking_id = rb.booking_id
|
||
WHERE rb.room_booking_id > ${watermark}
|
||
ORDER BY rb.room_booking_id
|
||
LIMIT 50000
|
||
```
|
||
|
||
> Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.
|
||
|
||
### Step C: Split + extract attributes
|
||
|
||
**ConvertAvroToJSON** → **SplitJson** (`$.*`)
|
||
|
||
**EvaluateJsonPath** attributes:
|
||
`room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed`
|
||
|
||
### Step D: Insert into fact table
|
||
|
||
**PutSQL** — `Oracle_DBCPService`
|
||
```sql
|
||
INSERT INTO FACT_ROOM_BOOKING (
|
||
source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
|
||
star_rating_key, checkin_date_key, checkout_date_key,
|
||
booking_status, nights_stayed, nightly_rate, total_amount)
|
||
SELECT
|
||
${room_booking_id},
|
||
dh.hotel_key,
|
||
dh.hotel_chain_key,
|
||
dr.room_key,
|
||
dg.guest_key,
|
||
dg.country_key,
|
||
dh.star_rating_key,
|
||
TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
|
||
TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')),
|
||
'${booking_status}',
|
||
${nights_stayed},
|
||
TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
|
||
TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
|
||
FROM
|
||
DIM_ROOM dr,
|
||
DIM_GUEST dg,
|
||
DIM_HOTEL dh
|
||
WHERE
|
||
dr.room_id = ${room_id}
|
||
AND dg.guest_id = ${guest_id}
|
||
AND dh.hotel_key = dr.hotel_key
|
||
-- SCD2 lookup: find hotel version active at check-in date
|
||
AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
|
||
AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
|
||
-- Idempotent: skip if already loaded
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
|
||
)
|
||
```
|
||
|
||
> The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today.
|
||
|
||
**Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs.
|
||
|
||
### Step E: Update watermark
|
||
|
||
After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch.
|
||
|
||
**UpdateAttribute**
|
||
- `max_rb_id` ← `${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick)
|
||
|
||
> Simplest approach: add a final **ExecuteSQL** that runs after the batch:
|
||
```sql
|
||
UPDATE ETL_WATERMARK
|
||
SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
|
||
last_run_ts = SYSTIMESTAMP
|
||
WHERE entity_name = 'FACT_ROOM_BOOKING'
|
||
```
|
||
|
||
---
|
||
|
||
## Execution Order & Scheduling
|
||
|
||
| PG | Trigger | Frequency |
|
||
|----|---------|-----------|
|
||
| PG-1 (Date Dim) | Manual (run once) | — |
|
||
| PG-2 (Static Dims) | Timer — 24h | Daily |
|
||
| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 |
|
||
| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 |
|
||
| PG-5 (Fact incremental) | Timer — 1h | Hourly |
|
||
|
||
Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship.
|
||
|
||
---
|
||
|
||
## Why SCD Type 2 for DIM_HOTEL?
|
||
|
||
A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.
|