Files
IPZ_1/docs/nifi-flow.md
2026-05-17 21:24:37 +02:00

452 lines
15 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# NiFi ETL Flow — Hotel Reservations Data Mart
## Overview
The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target).
It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler.
```
[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]
```
Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections.
---
## Controller Services (shared by all PGs)
| Name | Type | Config |
|------|------|--------|
| `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` |
| `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@<host>:1521:<sid>`; User: `<schema>`; Pwd: `<pwd>` |
| `JsonReader` | JsonTreeReader | default settings |
| `JsonWriter` | JsonRecordSetWriter | default settings |
| `AvroReader` | AvroReader | default settings |
---
## PG-1: Load Date Dimension
**Runs once** (or when extending the date range). Populates `DIM_DATE` for 20202030.
```
GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL
```
### Processors
**GenerateFlowFile**
- Run Schedule: manual (run once via right-click → Run Once)
- Custom Text: `{}`
**ExecuteScript** (Groovy)
```groovy
import groovy.json.JsonOutput
import java.time.*
def rows = []
def d = LocalDate.of(2020, 1, 1)
def end = LocalDate.of(2030, 12, 31)
while (!d.isAfter(end)) {
def m = d.monthValue
def season = (m >= 6 && m <= 8) ? 'Peak'
: (m >= 3 && m <= 5) ? 'High'
: (m >= 9 && m <= 11) ? 'Autumn'
: 'Winter'
rows << [
date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
full_date: d.toString(),
year: d.year,
quarter: ((m - 1) / 3 + 1) as int,
month: m,
month_name: d.month.toString().capitalize(),
week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
day_of_month: d.dayOfMonth,
day_name: d.dayOfWeek.toString().capitalize(),
is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0,
is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
season: season
]
d = d.plusDays(1)
}
def ff = session.create()
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
ff = session.putAttribute(ff, 'mime.type', 'application/json')
session.transfer(ff, REL_SUCCESS)
```
**SplitJson**
- JsonPath Expression: `$.*`
**EvaluateJsonPath**
- Destination: `flowfile-attribute`
- Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season`
**PutSQL**
- JDBC Connection Pool: `Oracle_DBCPService`
- SQL Statement:
```sql
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
week_number, day_of_month, day_name, is_weekend, is_business_day, season)
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
${month}, '${month_name}', ${week_number}, ${day_of_month},
'${day_name}', ${is_weekend}, ${is_business_day}, '${season}')
```
---
## PG-2: Static Dimensions (SCD Type 1)
Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL.
Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones.
Each sub-flow follows the same pattern:
```
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
```
### 2a — DIM_COUNTRY
**ExecuteSQL** — Connection Pool: `MySQL_DBCPService`
```sql
SELECT country_id, code, name, currency FROM country ORDER BY country_id
```
**EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency`
**PutSQL**
```sql
MERGE INTO DIM_COUNTRY tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
WHEN NOT MATCHED THEN
INSERT (country_id, code, name, currency)
VALUES (${country_id}, '${code}', '${name}', '${currency}')
```
### 2b — DIM_STAR_RATING
**ExecuteSQL**
```sql
SELECT star_rating_id, code, description FROM star_rating ORDER BY code
```
**PutSQL**
```sql
MERGE INTO DIM_STAR_RATING tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
WHEN NOT MATCHED THEN
INSERT (star_rating_id, code, description)
VALUES (${star_rating_id}, ${code}, '${description}')
```
### 2c — DIM_HOTEL_CHAIN
**ExecuteSQL**
```sql
SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id
```
**PutSQL**
```sql
MERGE INTO DIM_HOTEL_CHAIN tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
WHEN NOT MATCHED THEN
INSERT (hotel_chain_id, code, name)
VALUES (${hotel_chain_id}, '${code}', '${name}')
```
---
## PG-3: DIM_HOTEL — SCD Type 2
This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category.
**Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction.
```
[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]
```
### Step A: Truncate staging
**GenerateFlowFile****PutSQL**
```sql
TRUNCATE TABLE STG_HOTEL
```
### Step B: Load staging from MySQL
**ExecuteSQL**`MySQL_DBCPService`
```sql
SELECT
h.hotel_id,
hc.code AS chain_code,
c.code AS country_code,
sr.code AS star_code,
h.code,
h.name,
h.city
FROM hotel h
JOIN country c ON c.country_id = h.country_id
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
ORDER BY h.hotel_id
```
**ConvertAvroToJSON****SplitJson** (`$.*`)
**EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city`
**PutSQL**`STG_HOTEL`
```sql
INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')
```
### Step C: Apply SCD2 logic
**GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy)
The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:
```groovy
import java.sql.*
def conn = context.controllerServiceLookup
.getControllerService('Oracle_DBCPService_ID')
.getConnection()
conn.autoCommit = false
try {
// 1. Expire records whose tracked attributes changed
conn.prepareStatement("""
UPDATE DIM_HOTEL dh
SET dh.expiry_date = TRUNC(SYSDATE) - 1,
dh.is_current = 0
WHERE dh.is_current = 1
AND EXISTS (
SELECT 1 FROM STG_HOTEL s
WHERE s.hotel_id = dh.source_hotel_id
AND (
NVL(s.chain_code,'~') != NVL((
SELECT hc.code FROM DIM_HOTEL_CHAIN hc
WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
OR s.star_code != (
SELECT ds.code FROM DIM_STAR_RATING ds
WHERE ds.star_rating_key = dh.star_rating_key)
OR s.city != dh.city
)
)
""").executeUpdate()
// 2. Insert new version for changed hotels + insert brand-new hotels
conn.prepareStatement("""
INSERT INTO DIM_HOTEL (
source_hotel_id, hotel_chain_key, country_key, star_rating_key,
code, name, city, effective_date, expiry_date, is_current)
SELECT
s.hotel_id,
(SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
(SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code),
(SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
s.code, s.name, s.city,
TRUNC(SYSDATE), NULL, 1
FROM STG_HOTEL s
WHERE NOT EXISTS (
SELECT 1 FROM DIM_HOTEL d
WHERE d.source_hotel_id = s.hotel_id
AND d.is_current = 1
)
""").executeUpdate()
conn.commit()
} catch (Exception e) {
conn.rollback()
throw e
} finally {
conn.close()
}
def ff = session.create()
session.transfer(ff, REL_SUCCESS)
```
---
## PG-4: DIM_GUEST — SCD Type 1
Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.
```
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
```
**ExecuteSQL**`MySQL_DBCPService`
```sql
SELECT g.guest_id, c.code AS country_code, g.name, g.city
FROM guest g
LEFT JOIN country c ON c.country_id = g.country_id
ORDER BY g.guest_id
```
**EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city`
**PutSQL**
```sql
MERGE INTO DIM_GUEST tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
WHEN MATCHED THEN
UPDATE SET
tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
tgt.name = '${name}',
tgt.city = NULLIF('${city}','')
WHEN NOT MATCHED THEN
INSERT (guest_id, country_key, name, city)
VALUES (
${guest_id},
(SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
'${name}',
NULLIF('${city}','')
)
```
> **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL).
---
## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)
The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle.
`source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.
```
[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
→ [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]
```
### Step A: Read watermark
**ExecuteSQL**`Oracle_DBCPService`
```sql
SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'
```
**ConvertAvroToJSON****EvaluateJsonPath**
- `watermark``$.last_key`
### Step B: Load from MySQL
**ExecuteSQL**`MySQL_DBCPService`
SQL Statement (use attribute `${watermark}`):
```sql
SELECT
rb.room_booking_id,
rb.room_id,
rb.date_from,
rb.date_to,
rb.nightly_rate,
rb.total_amount,
b.guest_id,
b.status AS booking_status,
DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
FROM room_booking rb
JOIN booking b ON b.booking_id = rb.booking_id
WHERE rb.room_booking_id > ${watermark}
ORDER BY rb.room_booking_id
LIMIT 50000
```
> Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.
### Step C: Split + extract attributes
**ConvertAvroToJSON****SplitJson** (`$.*`)
**EvaluateJsonPath** attributes:
`room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed`
### Step D: Insert into fact table
**PutSQL**`Oracle_DBCPService`
```sql
INSERT INTO FACT_ROOM_BOOKING (
source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
star_rating_key, checkin_date_key, checkout_date_key,
booking_status, nights_stayed, nightly_rate, total_amount)
SELECT
${room_booking_id},
dh.hotel_key,
dh.hotel_chain_key,
dr.room_key,
dg.guest_key,
dg.country_key,
dh.star_rating_key,
TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')),
'${booking_status}',
${nights_stayed},
TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
FROM
DIM_ROOM dr,
DIM_GUEST dg,
DIM_HOTEL dh
WHERE
dr.room_id = ${room_id}
AND dg.guest_id = ${guest_id}
AND dh.hotel_key = dr.hotel_key
-- SCD2 lookup: find hotel version active at check-in date
AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
-- Idempotent: skip if already loaded
AND NOT EXISTS (
SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
)
```
> The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today.
**Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs.
### Step E: Update watermark
After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch.
**UpdateAttribute**
- `max_rb_id``${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick)
> Simplest approach: add a final **ExecuteSQL** that runs after the batch:
```sql
UPDATE ETL_WATERMARK
SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
last_run_ts = SYSTIMESTAMP
WHERE entity_name = 'FACT_ROOM_BOOKING'
```
---
## Execution Order & Scheduling
| PG | Trigger | Frequency |
|----|---------|-----------|
| PG-1 (Date Dim) | Manual (run once) | — |
| PG-2 (Static Dims) | Timer — 24h | Daily |
| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 |
| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 |
| PG-5 (Fact incremental) | Timer — 1h | Hourly |
Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship.
---
## Why SCD Type 2 for DIM_HOTEL?
A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.