15 KiB
NiFi ETL Flow — Hotel Reservations Data Mart
Overview
The flow moves data from MySQL 8.4 OLTP (source) into Oracle Data Mart (target).
It is organized into 5 Process Groups that run in sequence, controlled by a top-level scheduler.
[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]
Each PG has a single Input Port and Output Port so the orchestrator can chain them with connections.
Controller Services (shared by all PGs)
| Name | Type | Config |
|---|---|---|
MySQL_DBCPService |
DBCPConnectionPool | Driver: com.mysql.cj.jdbc.Driver; URL: jdbc:mysql://127.0.0.1:13306/hotel_reservations; User: root; Pwd: hotel2025root |
Oracle_DBCPService |
DBCPConnectionPool | Driver: oracle.jdbc.OracleDriver; URL: jdbc:oracle:thin:@<host>:1521:<sid>; User: <schema>; Pwd: <pwd> |
JsonReader |
JsonTreeReader | default settings |
JsonWriter |
JsonRecordSetWriter | default settings |
AvroReader |
AvroReader | default settings |
PG-1: Load Date Dimension
Runs once (or when extending the date range). Populates DIM_DATE for 2020–2030.
GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL
Processors
GenerateFlowFile
- Run Schedule: manual (run once via right-click → Run Once)
- Custom Text:
{}
ExecuteScript (Groovy)
import groovy.json.JsonOutput
import java.time.*
def rows = []
def d = LocalDate.of(2020, 1, 1)
def end = LocalDate.of(2030, 12, 31)
while (!d.isAfter(end)) {
def m = d.monthValue
def season = (m >= 6 && m <= 8) ? 'Peak'
: (m >= 3 && m <= 5) ? 'High'
: (m >= 9 && m <= 11) ? 'Autumn'
: 'Winter'
rows << [
date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
full_date: d.toString(),
year: d.year,
quarter: ((m - 1) / 3 + 1) as int,
month: m,
month_name: d.month.toString().capitalize(),
week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
day_of_month: d.dayOfMonth,
day_name: d.dayOfWeek.toString().capitalize(),
is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0,
is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
season: season
]
d = d.plusDays(1)
}
def ff = session.create()
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
ff = session.putAttribute(ff, 'mime.type', 'application/json')
session.transfer(ff, REL_SUCCESS)
SplitJson
- JsonPath Expression:
$.*
EvaluateJsonPath
- Destination:
flowfile-attribute - Attributes:
date_key,full_date,year,quarter,month,month_name,week_number,day_of_month,day_name,is_weekend,is_business_day,season
PutSQL
- JDBC Connection Pool:
Oracle_DBCPService - SQL Statement:
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
week_number, day_of_month, day_name, is_weekend, is_business_day, season)
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
${month}, '${month_name}', ${week_number}, ${day_of_month},
'${day_name}', ${is_weekend}, ${is_business_day}, '${season}')
PG-2: Static Dimensions (SCD Type 1)
Loads DIM_COUNTRY, DIM_STAR_RATING, DIM_HOTEL_CHAIN from MySQL.
Uses MERGE INTO so the flow is idempotent — re-running it updates changed rows and inserts new ones.
Each sub-flow follows the same pattern:
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
2a — DIM_COUNTRY
ExecuteSQL — Connection Pool: MySQL_DBCPService
SELECT country_id, code, name, currency FROM country ORDER BY country_id
EvaluateJsonPath attributes: country_id, code, name, currency
PutSQL
MERGE INTO DIM_COUNTRY tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
WHEN NOT MATCHED THEN
INSERT (country_id, code, name, currency)
VALUES (${country_id}, '${code}', '${name}', '${currency}')
2b — DIM_STAR_RATING
ExecuteSQL
SELECT star_rating_id, code, description FROM star_rating ORDER BY code
PutSQL
MERGE INTO DIM_STAR_RATING tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
WHEN NOT MATCHED THEN
INSERT (star_rating_id, code, description)
VALUES (${star_rating_id}, ${code}, '${description}')
2c — DIM_HOTEL_CHAIN
ExecuteSQL
SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id
PutSQL
MERGE INTO DIM_HOTEL_CHAIN tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
WHEN NOT MATCHED THEN
INSERT (hotel_chain_id, code, name)
VALUES (${hotel_chain_id}, '${code}', '${name}')
PG-3: DIM_HOTEL — SCD Type 2
This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category at the time of booking, not just today's category.
Architecture: NiFi stages raw data into STG_HOTEL, then an ExecuteScript runs the SCD2 SQL logic in a single Oracle transaction.
[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]
Step A: Truncate staging
GenerateFlowFile → PutSQL
TRUNCATE TABLE STG_HOTEL
Step B: Load staging from MySQL
ExecuteSQL — MySQL_DBCPService
SELECT
h.hotel_id,
hc.code AS chain_code,
c.code AS country_code,
sr.code AS star_code,
h.code,
h.name,
h.city
FROM hotel h
JOIN country c ON c.country_id = h.country_id
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
ORDER BY h.hotel_id
ConvertAvroToJSON → SplitJson ($.*)
EvaluateJsonPath attributes: hotel_id, chain_code, country_code, star_code, code, name, city
PutSQL → STG_HOTEL
INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')
Step C: Apply SCD2 logic
GenerateFlowFile (runs after B finishes) → ExecuteScript (Groovy)
The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:
import java.sql.*
def conn = context.controllerServiceLookup
.getControllerService('Oracle_DBCPService_ID')
.getConnection()
conn.autoCommit = false
try {
// 1. Expire records whose tracked attributes changed
conn.prepareStatement("""
UPDATE DIM_HOTEL dh
SET dh.expiry_date = TRUNC(SYSDATE) - 1,
dh.is_current = 0
WHERE dh.is_current = 1
AND EXISTS (
SELECT 1 FROM STG_HOTEL s
WHERE s.hotel_id = dh.source_hotel_id
AND (
NVL(s.chain_code,'~') != NVL((
SELECT hc.code FROM DIM_HOTEL_CHAIN hc
WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
OR s.star_code != (
SELECT ds.code FROM DIM_STAR_RATING ds
WHERE ds.star_rating_key = dh.star_rating_key)
OR s.city != dh.city
)
)
""").executeUpdate()
// 2. Insert new version for changed hotels + insert brand-new hotels
conn.prepareStatement("""
INSERT INTO DIM_HOTEL (
source_hotel_id, hotel_chain_key, country_key, star_rating_key,
code, name, city, effective_date, expiry_date, is_current)
SELECT
s.hotel_id,
(SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
(SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code),
(SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
s.code, s.name, s.city,
TRUNC(SYSDATE), NULL, 1
FROM STG_HOTEL s
WHERE NOT EXISTS (
SELECT 1 FROM DIM_HOTEL d
WHERE d.source_hotel_id = s.hotel_id
AND d.is_current = 1
)
""").executeUpdate()
conn.commit()
} catch (Exception e) {
conn.rollback()
throw e
} finally {
conn.close()
}
def ff = session.create()
session.transfer(ff, REL_SUCCESS)
PG-4: DIM_GUEST — SCD Type 1
Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
ExecuteSQL — MySQL_DBCPService
SELECT g.guest_id, c.code AS country_code, g.name, g.city
FROM guest g
LEFT JOIN country c ON c.country_id = g.country_id
ORDER BY g.guest_id
EvaluateJsonPath attributes: guest_id, country_code, name, city
PutSQL
MERGE INTO DIM_GUEST tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
WHEN MATCHED THEN
UPDATE SET
tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
tgt.name = '${name}',
tgt.city = NULLIF('${city}','')
WHEN NOT MATCHED THEN
INSERT (guest_id, country_key, name, city)
VALUES (
${guest_id},
(SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
'${name}',
NULLIF('${city}','')
)
Note: DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining
hotel_roomwithroom_typein MySQL and MERGEing intoDIM_ROOM(surrogate key lookup viasource_hotel_id + IS_CURRENT=1from DIM_HOTEL).
PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)
The fact table is loaded incrementally: only room_booking rows with room_booking_id greater than the last loaded value are processed. The watermark is stored in ETL_WATERMARK in Oracle.
source_rb_id on FACT_ROOM_BOOKING has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.
[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
→ [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]
Step A: Read watermark
ExecuteSQL — Oracle_DBCPService
SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'
ConvertAvroToJSON → EvaluateJsonPath
watermark←$.last_key
Step B: Load from MySQL
ExecuteSQL — MySQL_DBCPService
SQL Statement (use attribute ${watermark}):
SELECT
rb.room_booking_id,
rb.room_id,
rb.date_from,
rb.date_to,
rb.nightly_rate,
rb.total_amount,
b.guest_id,
b.status AS booking_status,
DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
FROM room_booking rb
JOIN booking b ON b.booking_id = rb.booking_id
WHERE rb.room_booking_id > ${watermark}
ORDER BY rb.room_booking_id
LIMIT 50000
Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.
Step C: Split + extract attributes
ConvertAvroToJSON → SplitJson ($.*)
EvaluateJsonPath attributes:
room_booking_id, room_id, guest_id, date_from, date_to, nightly_rate, total_amount, booking_status, nights_stayed
Step D: Insert into fact table
PutSQL — Oracle_DBCPService
INSERT INTO FACT_ROOM_BOOKING (
source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
star_rating_key, checkin_date_key, checkout_date_key,
booking_status, nights_stayed, nightly_rate, total_amount)
SELECT
${room_booking_id},
dh.hotel_key,
dh.hotel_chain_key,
dr.room_key,
dg.guest_key,
dg.country_key,
dh.star_rating_key,
TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')),
'${booking_status}',
${nights_stayed},
TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
FROM
DIM_ROOM dr,
DIM_GUEST dg,
DIM_HOTEL dh
WHERE
dr.room_id = ${room_id}
AND dg.guest_id = ${guest_id}
AND dh.hotel_key = dr.hotel_key
-- SCD2 lookup: find hotel version active at check-in date
AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
-- Idempotent: skip if already loaded
AND NOT EXISTS (
SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
)
The
DH.EFFECTIVE_DATE / EXPIRY_DATEcondition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true when the guest actually checked in, not what the hotel looks like today.
Ignore Errors on PutSQL (route failure → funnel) — UNIQUE constraint violations on source_rb_id are expected and harmless on re-runs.
Step E: Update watermark
After the PutSQL succeeds, update the watermark with the highest room_booking_id seen in this batch.
UpdateAttribute
max_rb_id←${room_booking_id}(NiFi Expression Languagemax()across the batch via a custom processor or MergeContent trick)
Simplest approach: add a final ExecuteSQL that runs after the batch:
UPDATE ETL_WATERMARK
SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
last_run_ts = SYSTIMESTAMP
WHERE entity_name = 'FACT_ROOM_BOOKING'
Execution Order & Scheduling
| PG | Trigger | Frequency |
|---|---|---|
| PG-1 (Date Dim) | Manual (run once) | — |
| PG-2 (Static Dims) | Timer — 24h | Daily |
| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 |
| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 |
| PG-5 (Fact incremental) | Timer — 1h | Hourly |
Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a success relationship.
Why SCD Type 2 for DIM_HOTEL?
A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.