Files
IPZ_1/docs/nifi-flow.md
2026-05-17 21:24:37 +02:00

15 KiB
Raw Permalink Blame History

NiFi ETL Flow — Hotel Reservations Data Mart

Overview

The flow moves data from MySQL 8.4 OLTP (source) into Oracle Data Mart (target).
It is organized into 5 Process Groups that run in sequence, controlled by a top-level scheduler.

[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]

Each PG has a single Input Port and Output Port so the orchestrator can chain them with connections.


Controller Services (shared by all PGs)

Name Type Config
MySQL_DBCPService DBCPConnectionPool Driver: com.mysql.cj.jdbc.Driver; URL: jdbc:mysql://127.0.0.1:13306/hotel_reservations; User: root; Pwd: hotel2025root
Oracle_DBCPService DBCPConnectionPool Driver: oracle.jdbc.OracleDriver; URL: jdbc:oracle:thin:@<host>:1521:<sid>; User: <schema>; Pwd: <pwd>
JsonReader JsonTreeReader default settings
JsonWriter JsonRecordSetWriter default settings
AvroReader AvroReader default settings

PG-1: Load Date Dimension

Runs once (or when extending the date range). Populates DIM_DATE for 20202030.

GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL

Processors

GenerateFlowFile

  • Run Schedule: manual (run once via right-click → Run Once)
  • Custom Text: {}

ExecuteScript (Groovy)

import groovy.json.JsonOutput
import java.time.*

def rows = []
def d = LocalDate.of(2020, 1, 1)
def end = LocalDate.of(2030, 12, 31)
while (!d.isAfter(end)) {
    def m = d.monthValue
    def season = (m >= 6 && m <= 8) ? 'Peak'
               : (m >= 3 && m <= 5) ? 'High'
               : (m >= 9 && m <= 11) ? 'Autumn'
               : 'Winter'
    rows << [
        date_key:        d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
        full_date:       d.toString(),
        year:            d.year,
        quarter:         ((m - 1) / 3 + 1) as int,
        month:           m,
        month_name:      d.month.toString().capitalize(),
        week_number:     d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
        day_of_month:    d.dayOfMonth,
        day_name:        d.dayOfWeek.toString().capitalize(),
        is_weekend:      (d.dayOfWeek.value >= 6) ? 1 : 0,
        is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
        season:          season
    ]
    d = d.plusDays(1)
}
def ff = session.create()
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
ff = session.putAttribute(ff, 'mime.type', 'application/json')
session.transfer(ff, REL_SUCCESS)

SplitJson

  • JsonPath Expression: $.*

EvaluateJsonPath

  • Destination: flowfile-attribute
  • Attributes: date_key, full_date, year, quarter, month, month_name, week_number, day_of_month, day_name, is_weekend, is_business_day, season

PutSQL

  • JDBC Connection Pool: Oracle_DBCPService
  • SQL Statement:
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
    week_number, day_of_month, day_name, is_weekend, is_business_day, season)
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
    ${month}, '${month_name}', ${week_number}, ${day_of_month},
    '${day_name}', ${is_weekend}, ${is_business_day}, '${season}')

PG-2: Static Dimensions (SCD Type 1)

Loads DIM_COUNTRY, DIM_STAR_RATING, DIM_HOTEL_CHAIN from MySQL.
Uses MERGE INTO so the flow is idempotent — re-running it updates changed rows and inserts new ones.

Each sub-flow follows the same pattern:

ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)

2a — DIM_COUNTRY

ExecuteSQL — Connection Pool: MySQL_DBCPService

SELECT country_id, code, name, currency FROM country ORDER BY country_id

EvaluateJsonPath attributes: country_id, code, name, currency

PutSQL

MERGE INTO DIM_COUNTRY tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
WHEN MATCHED THEN
    UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
WHEN NOT MATCHED THEN
    INSERT (country_id, code, name, currency)
    VALUES (${country_id}, '${code}', '${name}', '${currency}')

2b — DIM_STAR_RATING

ExecuteSQL

SELECT star_rating_id, code, description FROM star_rating ORDER BY code

PutSQL

MERGE INTO DIM_STAR_RATING tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
WHEN MATCHED THEN
    UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
WHEN NOT MATCHED THEN
    INSERT (star_rating_id, code, description)
    VALUES (${star_rating_id}, ${code}, '${description}')

2c — DIM_HOTEL_CHAIN

ExecuteSQL

SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id

PutSQL

MERGE INTO DIM_HOTEL_CHAIN tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
WHEN MATCHED THEN
    UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
WHEN NOT MATCHED THEN
    INSERT (hotel_chain_id, code, name)
    VALUES (${hotel_chain_id}, '${code}', '${name}')

PG-3: DIM_HOTEL — SCD Type 2

This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category at the time of booking, not just today's category.

Architecture: NiFi stages raw data into STG_HOTEL, then an ExecuteScript runs the SCD2 SQL logic in a single Oracle transaction.

[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]

Step A: Truncate staging

GenerateFlowFilePutSQL

TRUNCATE TABLE STG_HOTEL

Step B: Load staging from MySQL

ExecuteSQLMySQL_DBCPService

SELECT
    h.hotel_id,
    hc.code   AS chain_code,
    c.code    AS country_code,
    sr.code   AS star_code,
    h.code,
    h.name,
    h.city
FROM hotel h
JOIN country c      ON c.country_id    = h.country_id
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
ORDER BY h.hotel_id

ConvertAvroToJSONSplitJson ($.*)

EvaluateJsonPath attributes: hotel_id, chain_code, country_code, star_code, code, name, city

PutSQLSTG_HOTEL

INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')

Step C: Apply SCD2 logic

GenerateFlowFile (runs after B finishes) → ExecuteScript (Groovy)

The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:

import java.sql.*

def conn = context.controllerServiceLookup
    .getControllerService('Oracle_DBCPService_ID')
    .getConnection()
conn.autoCommit = false

try {
    // 1. Expire records whose tracked attributes changed
    conn.prepareStatement("""
        UPDATE DIM_HOTEL dh
        SET    dh.expiry_date = TRUNC(SYSDATE) - 1,
               dh.is_current  = 0
        WHERE  dh.is_current = 1
        AND EXISTS (
            SELECT 1 FROM STG_HOTEL s
            WHERE  s.hotel_id = dh.source_hotel_id
            AND (
                NVL(s.chain_code,'~') != NVL((
                    SELECT hc.code FROM DIM_HOTEL_CHAIN hc
                    WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
                OR s.star_code != (
                    SELECT ds.code FROM DIM_STAR_RATING ds
                    WHERE ds.star_rating_key = dh.star_rating_key)
                OR s.city != dh.city
            )
        )
    """).executeUpdate()

    // 2. Insert new version for changed hotels + insert brand-new hotels
    conn.prepareStatement("""
        INSERT INTO DIM_HOTEL (
            source_hotel_id, hotel_chain_key, country_key, star_rating_key,
            code, name, city, effective_date, expiry_date, is_current)
        SELECT
            s.hotel_id,
            (SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
            (SELECT dc.country_key     FROM DIM_COUNTRY     dc WHERE dc.code = s.country_code),
            (SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
            s.code, s.name, s.city,
            TRUNC(SYSDATE), NULL, 1
        FROM STG_HOTEL s
        WHERE NOT EXISTS (
            SELECT 1 FROM DIM_HOTEL d
            WHERE  d.source_hotel_id = s.hotel_id
            AND    d.is_current = 1
        )
    """).executeUpdate()

    conn.commit()
} catch (Exception e) {
    conn.rollback()
    throw e
} finally {
    conn.close()
}
def ff = session.create()
session.transfer(ff, REL_SUCCESS)

PG-4: DIM_GUEST — SCD Type 1

Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.

ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)

ExecuteSQLMySQL_DBCPService

SELECT g.guest_id, c.code AS country_code, g.name, g.city
FROM guest g
LEFT JOIN country c ON c.country_id = g.country_id
ORDER BY g.guest_id

EvaluateJsonPath attributes: guest_id, country_code, name, city

PutSQL

MERGE INTO DIM_GUEST tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
WHEN MATCHED THEN
    UPDATE SET
        tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
        tgt.name        = '${name}',
        tgt.city        = NULLIF('${city}','')
WHEN NOT MATCHED THEN
    INSERT (guest_id, country_key, name, city)
    VALUES (
        ${guest_id},
        (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
        '${name}',
        NULLIF('${city}','')
    )

Note: DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining hotel_room with room_type in MySQL and MERGEing into DIM_ROOM (surrogate key lookup via source_hotel_id + IS_CURRENT=1 from DIM_HOTEL).


PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)

The fact table is loaded incrementally: only room_booking rows with room_booking_id greater than the last loaded value are processed. The watermark is stored in ETL_WATERMARK in Oracle.

source_rb_id on FACT_ROOM_BOOKING has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.

[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
    → [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]

Step A: Read watermark

ExecuteSQLOracle_DBCPService

SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'

ConvertAvroToJSONEvaluateJsonPath

  • watermark$.last_key

Step B: Load from MySQL

ExecuteSQLMySQL_DBCPService
SQL Statement (use attribute ${watermark}):

SELECT
    rb.room_booking_id,
    rb.room_id,
    rb.date_from,
    rb.date_to,
    rb.nightly_rate,
    rb.total_amount,
    b.guest_id,
    b.status AS booking_status,
    DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
FROM room_booking rb
JOIN booking b ON b.booking_id = rb.booking_id
WHERE rb.room_booking_id > ${watermark}
ORDER BY rb.room_booking_id
LIMIT 50000

Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.

Step C: Split + extract attributes

ConvertAvroToJSONSplitJson ($.*)

EvaluateJsonPath attributes: room_booking_id, room_id, guest_id, date_from, date_to, nightly_rate, total_amount, booking_status, nights_stayed

Step D: Insert into fact table

PutSQLOracle_DBCPService

INSERT INTO FACT_ROOM_BOOKING (
    source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
    star_rating_key, checkin_date_key, checkout_date_key,
    booking_status, nights_stayed, nightly_rate, total_amount)
SELECT
    ${room_booking_id},
    dh.hotel_key,
    dh.hotel_chain_key,
    dr.room_key,
    dg.guest_key,
    dg.country_key,
    dh.star_rating_key,
    TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
    TO_NUMBER(TO_CHAR(TO_DATE('${date_to}',  'YYYY-MM-DD'), 'YYYYMMDD')),
    '${booking_status}',
    ${nights_stayed},
    TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
    TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
FROM
    DIM_ROOM   dr,
    DIM_GUEST  dg,
    DIM_HOTEL  dh
WHERE
    dr.room_id          = ${room_id}
    AND dg.guest_id     = ${guest_id}
    AND dh.hotel_key    = dr.hotel_key
    -- SCD2 lookup: find hotel version active at check-in date
    AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
    AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
    -- Idempotent: skip if already loaded
    AND NOT EXISTS (
        SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
    )

The DH.EFFECTIVE_DATE / EXPIRY_DATE condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true when the guest actually checked in, not what the hotel looks like today.

Ignore Errors on PutSQL (route failure → funnel) — UNIQUE constraint violations on source_rb_id are expected and harmless on re-runs.

Step E: Update watermark

After the PutSQL succeeds, update the watermark with the highest room_booking_id seen in this batch.

UpdateAttribute

  • max_rb_id${room_booking_id} (NiFi Expression Language max() across the batch via a custom processor or MergeContent trick)

Simplest approach: add a final ExecuteSQL that runs after the batch:

UPDATE ETL_WATERMARK
SET last_key   = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
    last_run_ts = SYSTIMESTAMP
WHERE entity_name = 'FACT_ROOM_BOOKING'

Execution Order & Scheduling

PG Trigger Frequency
PG-1 (Date Dim) Manual (run once)
PG-2 (Static Dims) Timer — 24h Daily
PG-3 (DIM_HOTEL SCD2) Timer — 24h Daily, after PG-2
PG-4 (DIM_GUEST SCD1) Timer — 24h Daily, after PG-3
PG-5 (Fact incremental) Timer — 1h Hourly

Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a success relationship.


Why SCD Type 2 for DIM_HOTEL?

A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.