Compare commits
3 Commits
e64288694b
...
718407d709
| Author | SHA1 | Date | |
|---|---|---|---|
| 718407d709 | |||
| 348a074a3a | |||
| 6c70628616 |
451
docs/nifi-flow.md
Normal file
451
docs/nifi-flow.md
Normal file
@@ -0,0 +1,451 @@
|
||||
# NiFi ETL Flow — Hotel Reservations Data Mart
|
||||
|
||||
## Overview
|
||||
|
||||
The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target).
|
||||
It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler.
|
||||
|
||||
```
|
||||
[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]
|
||||
```
|
||||
|
||||
Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections.
|
||||
|
||||
---
|
||||
|
||||
## Controller Services (shared by all PGs)
|
||||
|
||||
| Name | Type | Config |
|
||||
|------|------|--------|
|
||||
| `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` |
|
||||
| `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@<host>:1521:<sid>`; User: `<schema>`; Pwd: `<pwd>` |
|
||||
| `JsonReader` | JsonTreeReader | default settings |
|
||||
| `JsonWriter` | JsonRecordSetWriter | default settings |
|
||||
| `AvroReader` | AvroReader | default settings |
|
||||
|
||||
---
|
||||
|
||||
## PG-1: Load Date Dimension
|
||||
|
||||
**Runs once** (or when extending the date range). Populates `DIM_DATE` for 2020–2030.
|
||||
|
||||
```
|
||||
GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL
|
||||
```
|
||||
|
||||
### Processors
|
||||
|
||||
**GenerateFlowFile**
|
||||
- Run Schedule: manual (run once via right-click → Run Once)
|
||||
- Custom Text: `{}`
|
||||
|
||||
**ExecuteScript** (Groovy)
|
||||
```groovy
|
||||
import groovy.json.JsonOutput
|
||||
import java.time.*
|
||||
|
||||
def rows = []
|
||||
def d = LocalDate.of(2020, 1, 1)
|
||||
def end = LocalDate.of(2030, 12, 31)
|
||||
while (!d.isAfter(end)) {
|
||||
def m = d.monthValue
|
||||
def season = (m >= 6 && m <= 8) ? 'Peak'
|
||||
: (m >= 3 && m <= 5) ? 'High'
|
||||
: (m >= 9 && m <= 11) ? 'Autumn'
|
||||
: 'Winter'
|
||||
rows << [
|
||||
date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
|
||||
full_date: d.toString(),
|
||||
year: d.year,
|
||||
quarter: ((m - 1) / 3 + 1) as int,
|
||||
month: m,
|
||||
month_name: d.month.toString().capitalize(),
|
||||
week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
|
||||
day_of_month: d.dayOfMonth,
|
||||
day_name: d.dayOfWeek.toString().capitalize(),
|
||||
is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0,
|
||||
is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
|
||||
season: season
|
||||
]
|
||||
d = d.plusDays(1)
|
||||
}
|
||||
def ff = session.create()
|
||||
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
|
||||
ff = session.putAttribute(ff, 'mime.type', 'application/json')
|
||||
session.transfer(ff, REL_SUCCESS)
|
||||
```
|
||||
|
||||
**SplitJson**
|
||||
- JsonPath Expression: `$.*`
|
||||
|
||||
**EvaluateJsonPath**
|
||||
- Destination: `flowfile-attribute`
|
||||
- Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season`
|
||||
|
||||
**PutSQL**
|
||||
- JDBC Connection Pool: `Oracle_DBCPService`
|
||||
- SQL Statement:
|
||||
```sql
|
||||
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
|
||||
week_number, day_of_month, day_name, is_weekend, is_business_day, season)
|
||||
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
|
||||
${month}, '${month_name}', ${week_number}, ${day_of_month},
|
||||
'${day_name}', ${is_weekend}, ${is_business_day}, '${season}')
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## PG-2: Static Dimensions (SCD Type 1)
|
||||
|
||||
Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL.
|
||||
Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones.
|
||||
|
||||
Each sub-flow follows the same pattern:
|
||||
|
||||
```
|
||||
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
|
||||
```
|
||||
|
||||
### 2a — DIM_COUNTRY
|
||||
|
||||
**ExecuteSQL** — Connection Pool: `MySQL_DBCPService`
|
||||
```sql
|
||||
SELECT country_id, code, name, currency FROM country ORDER BY country_id
|
||||
```
|
||||
|
||||
**EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency`
|
||||
|
||||
**PutSQL**
|
||||
```sql
|
||||
MERGE INTO DIM_COUNTRY tgt
|
||||
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (country_id, code, name, currency)
|
||||
VALUES (${country_id}, '${code}', '${name}', '${currency}')
|
||||
```
|
||||
|
||||
### 2b — DIM_STAR_RATING
|
||||
|
||||
**ExecuteSQL**
|
||||
```sql
|
||||
SELECT star_rating_id, code, description FROM star_rating ORDER BY code
|
||||
```
|
||||
|
||||
**PutSQL**
|
||||
```sql
|
||||
MERGE INTO DIM_STAR_RATING tgt
|
||||
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (star_rating_id, code, description)
|
||||
VALUES (${star_rating_id}, ${code}, '${description}')
|
||||
```
|
||||
|
||||
### 2c — DIM_HOTEL_CHAIN
|
||||
|
||||
**ExecuteSQL**
|
||||
```sql
|
||||
SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id
|
||||
```
|
||||
|
||||
**PutSQL**
|
||||
```sql
|
||||
MERGE INTO DIM_HOTEL_CHAIN tgt
|
||||
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (hotel_chain_id, code, name)
|
||||
VALUES (${hotel_chain_id}, '${code}', '${name}')
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## PG-3: DIM_HOTEL — SCD Type 2
|
||||
|
||||
This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category.
|
||||
|
||||
**Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction.
|
||||
|
||||
```
|
||||
[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]
|
||||
```
|
||||
|
||||
### Step A: Truncate staging
|
||||
|
||||
**GenerateFlowFile** → **PutSQL**
|
||||
```sql
|
||||
TRUNCATE TABLE STG_HOTEL
|
||||
```
|
||||
|
||||
### Step B: Load staging from MySQL
|
||||
|
||||
**ExecuteSQL** — `MySQL_DBCPService`
|
||||
```sql
|
||||
SELECT
|
||||
h.hotel_id,
|
||||
hc.code AS chain_code,
|
||||
c.code AS country_code,
|
||||
sr.code AS star_code,
|
||||
h.code,
|
||||
h.name,
|
||||
h.city
|
||||
FROM hotel h
|
||||
JOIN country c ON c.country_id = h.country_id
|
||||
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
|
||||
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
|
||||
ORDER BY h.hotel_id
|
||||
```
|
||||
|
||||
**ConvertAvroToJSON** → **SplitJson** (`$.*`)
|
||||
|
||||
**EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city`
|
||||
|
||||
**PutSQL** → `STG_HOTEL`
|
||||
```sql
|
||||
INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
|
||||
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')
|
||||
```
|
||||
|
||||
### Step C: Apply SCD2 logic
|
||||
|
||||
**GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy)
|
||||
|
||||
The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:
|
||||
|
||||
```groovy
|
||||
import java.sql.*
|
||||
|
||||
def conn = context.controllerServiceLookup
|
||||
.getControllerService('Oracle_DBCPService_ID')
|
||||
.getConnection()
|
||||
conn.autoCommit = false
|
||||
|
||||
try {
|
||||
// 1. Expire records whose tracked attributes changed
|
||||
conn.prepareStatement("""
|
||||
UPDATE DIM_HOTEL dh
|
||||
SET dh.expiry_date = TRUNC(SYSDATE) - 1,
|
||||
dh.is_current = 0
|
||||
WHERE dh.is_current = 1
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM STG_HOTEL s
|
||||
WHERE s.hotel_id = dh.source_hotel_id
|
||||
AND (
|
||||
NVL(s.chain_code,'~') != NVL((
|
||||
SELECT hc.code FROM DIM_HOTEL_CHAIN hc
|
||||
WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
|
||||
OR s.star_code != (
|
||||
SELECT ds.code FROM DIM_STAR_RATING ds
|
||||
WHERE ds.star_rating_key = dh.star_rating_key)
|
||||
OR s.city != dh.city
|
||||
)
|
||||
)
|
||||
""").executeUpdate()
|
||||
|
||||
// 2. Insert new version for changed hotels + insert brand-new hotels
|
||||
conn.prepareStatement("""
|
||||
INSERT INTO DIM_HOTEL (
|
||||
source_hotel_id, hotel_chain_key, country_key, star_rating_key,
|
||||
code, name, city, effective_date, expiry_date, is_current)
|
||||
SELECT
|
||||
s.hotel_id,
|
||||
(SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
|
||||
(SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code),
|
||||
(SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
|
||||
s.code, s.name, s.city,
|
||||
TRUNC(SYSDATE), NULL, 1
|
||||
FROM STG_HOTEL s
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM DIM_HOTEL d
|
||||
WHERE d.source_hotel_id = s.hotel_id
|
||||
AND d.is_current = 1
|
||||
)
|
||||
""").executeUpdate()
|
||||
|
||||
conn.commit()
|
||||
} catch (Exception e) {
|
||||
conn.rollback()
|
||||
throw e
|
||||
} finally {
|
||||
conn.close()
|
||||
}
|
||||
def ff = session.create()
|
||||
session.transfer(ff, REL_SUCCESS)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## PG-4: DIM_GUEST — SCD Type 1
|
||||
|
||||
Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.
|
||||
|
||||
```
|
||||
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
|
||||
```
|
||||
|
||||
**ExecuteSQL** — `MySQL_DBCPService`
|
||||
```sql
|
||||
SELECT g.guest_id, c.code AS country_code, g.name, g.city
|
||||
FROM guest g
|
||||
LEFT JOIN country c ON c.country_id = g.country_id
|
||||
ORDER BY g.guest_id
|
||||
```
|
||||
|
||||
**EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city`
|
||||
|
||||
**PutSQL**
|
||||
```sql
|
||||
MERGE INTO DIM_GUEST tgt
|
||||
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET
|
||||
tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
|
||||
tgt.name = '${name}',
|
||||
tgt.city = NULLIF('${city}','')
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (guest_id, country_key, name, city)
|
||||
VALUES (
|
||||
${guest_id},
|
||||
(SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
|
||||
'${name}',
|
||||
NULLIF('${city}','')
|
||||
)
|
||||
```
|
||||
|
||||
> **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL).
|
||||
|
||||
---
|
||||
|
||||
## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)
|
||||
|
||||
The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle.
|
||||
|
||||
`source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.
|
||||
|
||||
```
|
||||
[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
|
||||
→ [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]
|
||||
```
|
||||
|
||||
### Step A: Read watermark
|
||||
|
||||
**ExecuteSQL** — `Oracle_DBCPService`
|
||||
```sql
|
||||
SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'
|
||||
```
|
||||
|
||||
**ConvertAvroToJSON** → **EvaluateJsonPath**
|
||||
- `watermark` ← `$.last_key`
|
||||
|
||||
### Step B: Load from MySQL
|
||||
|
||||
**ExecuteSQL** — `MySQL_DBCPService`
|
||||
SQL Statement (use attribute `${watermark}`):
|
||||
```sql
|
||||
SELECT
|
||||
rb.room_booking_id,
|
||||
rb.room_id,
|
||||
rb.date_from,
|
||||
rb.date_to,
|
||||
rb.nightly_rate,
|
||||
rb.total_amount,
|
||||
b.guest_id,
|
||||
b.status AS booking_status,
|
||||
DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
|
||||
FROM room_booking rb
|
||||
JOIN booking b ON b.booking_id = rb.booking_id
|
||||
WHERE rb.room_booking_id > ${watermark}
|
||||
ORDER BY rb.room_booking_id
|
||||
LIMIT 50000
|
||||
```
|
||||
|
||||
> Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.
|
||||
|
||||
### Step C: Split + extract attributes
|
||||
|
||||
**ConvertAvroToJSON** → **SplitJson** (`$.*`)
|
||||
|
||||
**EvaluateJsonPath** attributes:
|
||||
`room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed`
|
||||
|
||||
### Step D: Insert into fact table
|
||||
|
||||
**PutSQL** — `Oracle_DBCPService`
|
||||
```sql
|
||||
INSERT INTO FACT_ROOM_BOOKING (
|
||||
source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
|
||||
star_rating_key, checkin_date_key, checkout_date_key,
|
||||
booking_status, nights_stayed, nightly_rate, total_amount)
|
||||
SELECT
|
||||
${room_booking_id},
|
||||
dh.hotel_key,
|
||||
dh.hotel_chain_key,
|
||||
dr.room_key,
|
||||
dg.guest_key,
|
||||
dg.country_key,
|
||||
dh.star_rating_key,
|
||||
TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
|
||||
TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')),
|
||||
'${booking_status}',
|
||||
${nights_stayed},
|
||||
TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
|
||||
TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
|
||||
FROM
|
||||
DIM_ROOM dr,
|
||||
DIM_GUEST dg,
|
||||
DIM_HOTEL dh
|
||||
WHERE
|
||||
dr.room_id = ${room_id}
|
||||
AND dg.guest_id = ${guest_id}
|
||||
AND dh.hotel_key = dr.hotel_key
|
||||
-- SCD2 lookup: find hotel version active at check-in date
|
||||
AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
|
||||
AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
|
||||
-- Idempotent: skip if already loaded
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
|
||||
)
|
||||
```
|
||||
|
||||
> The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today.
|
||||
|
||||
**Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs.
|
||||
|
||||
### Step E: Update watermark
|
||||
|
||||
After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch.
|
||||
|
||||
**UpdateAttribute**
|
||||
- `max_rb_id` ← `${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick)
|
||||
|
||||
> Simplest approach: add a final **ExecuteSQL** that runs after the batch:
|
||||
```sql
|
||||
UPDATE ETL_WATERMARK
|
||||
SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
|
||||
last_run_ts = SYSTIMESTAMP
|
||||
WHERE entity_name = 'FACT_ROOM_BOOKING'
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Execution Order & Scheduling
|
||||
|
||||
| PG | Trigger | Frequency |
|
||||
|----|---------|-----------|
|
||||
| PG-1 (Date Dim) | Manual (run once) | — |
|
||||
| PG-2 (Static Dims) | Timer — 24h | Daily |
|
||||
| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 |
|
||||
| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 |
|
||||
| PG-5 (Fact incremental) | Timer — 1h | Hourly |
|
||||
|
||||
Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship.
|
||||
|
||||
---
|
||||
|
||||
## Why SCD Type 2 for DIM_HOTEL?
|
||||
|
||||
A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.
|
||||
@@ -19,6 +19,7 @@ var rng = new Random(SEED);
|
||||
await using var conn = new MySqlConnection(DSN);
|
||||
await conn.OpenAsync();
|
||||
Console.WriteLine("Connected.");
|
||||
await new MySqlCommand("SET foreign_key_checks=0, unique_checks=0", conn).ExecuteNonQueryAsync();
|
||||
|
||||
async Task Exec(string sql)
|
||||
{
|
||||
@@ -480,9 +481,8 @@ while (bookingsDone < BOOKING_COUNT)
|
||||
bookingRows.Add($"({guestId},{hotelId},{D(checkin)},{D(checkout)},{S(status)},{DT(created)})");
|
||||
}
|
||||
|
||||
// Insert bookings and get the first inserted ID
|
||||
long firstId = await ExecScalar("SELECT AUTO_INCREMENT FROM information_schema.tables WHERE table_schema='hotel_reservations' AND table_name='booking'");
|
||||
await Exec($"INSERT INTO booking (guest_id, hotel_id, date_from, date_to, status, created_at) VALUES {string.Join(',', bookingRows)}");
|
||||
long firstId = await ExecScalar("SELECT LAST_INSERT_ID()");
|
||||
|
||||
// Re-derive checkin/nights from the same rng sequence is impossible after the fact,
|
||||
// so re-parse from inserted rows to build room_bookings
|
||||
|
||||
@@ -1,9 +1,39 @@
|
||||
-- =============================================================================
|
||||
-- HOTEL RESERVATIONS — DATA MART (STAR SCHEMA)
|
||||
-- Target: Oracle (university lab schema)
|
||||
-- Based on A.24 Revenue Data Mart — Dimensional Modelling by Example
|
||||
-- =============================================================================
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- ETL CONTROL TABLE
|
||||
-- Tracks incremental load watermarks per entity.
|
||||
-- -----------------------------------------------------------------------------
|
||||
|
||||
CREATE TABLE ETL_WATERMARK (
|
||||
entity_name VARCHAR2(50) NOT NULL,
|
||||
last_key NUMBER(20,0) DEFAULT 0 NOT NULL,
|
||||
last_run_ts TIMESTAMP DEFAULT SYSTIMESTAMP,
|
||||
CONSTRAINT pk_etl_wm PRIMARY KEY (entity_name)
|
||||
);
|
||||
|
||||
INSERT INTO ETL_WATERMARK (entity_name, last_key) VALUES ('FACT_ROOM_BOOKING', 0);
|
||||
COMMIT;
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- STAGING TABLES
|
||||
-- NiFi loads raw MySQL data here first; SCD logic runs in pure SQL after.
|
||||
-- Truncated at the start of each ETL run.
|
||||
-- -----------------------------------------------------------------------------
|
||||
|
||||
CREATE TABLE STG_HOTEL (
|
||||
hotel_id NUMBER(10,0) NOT NULL,
|
||||
chain_code VARCHAR2(10),
|
||||
country_code CHAR(2) NOT NULL,
|
||||
star_code NUMBER(1,0) NOT NULL,
|
||||
code VARCHAR2(20) NOT NULL,
|
||||
name VARCHAR2(150) NOT NULL,
|
||||
city VARCHAR2(100) NOT NULL
|
||||
);
|
||||
|
||||
-- -----------------------------------------------------------------------------
|
||||
-- DIMENSION TABLES
|
||||
-- -----------------------------------------------------------------------------
|
||||
@@ -21,12 +51,13 @@ CREATE TABLE DIM_DATE (
|
||||
day_name VARCHAR2(10) NOT NULL,
|
||||
is_weekend NUMBER(1,0) NOT NULL,
|
||||
is_business_day NUMBER(1,0) NOT NULL,
|
||||
season VARCHAR2(10) NOT NULL, -- Peak / High / Low / Off
|
||||
season VARCHAR2(10) NOT NULL,
|
||||
CONSTRAINT pk_dim_date PRIMARY KEY (date_key),
|
||||
CONSTRAINT ck_dim_date_wknd CHECK (is_weekend IN (0,1)),
|
||||
CONSTRAINT ck_dim_date_bday CHECK (is_business_day IN (0,1))
|
||||
);
|
||||
|
||||
-- SCD Type 1 — country attributes are stable; just overwrite if anything changes
|
||||
CREATE TABLE DIM_COUNTRY (
|
||||
country_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
country_id NUMBER(10,0) NOT NULL,
|
||||
@@ -37,6 +68,7 @@ CREATE TABLE DIM_COUNTRY (
|
||||
CONSTRAINT uq_dim_cntry_id UNIQUE (country_id)
|
||||
);
|
||||
|
||||
-- SCD Type 1 — star rating lookup, never changes
|
||||
CREATE TABLE DIM_STAR_RATING (
|
||||
star_rating_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
star_rating_id NUMBER(10,0) NOT NULL,
|
||||
@@ -46,6 +78,7 @@ CREATE TABLE DIM_STAR_RATING (
|
||||
CONSTRAINT uq_dim_star_id UNIQUE (star_rating_id)
|
||||
);
|
||||
|
||||
-- SCD Type 1 — chain name/code rarely changes; overwrite
|
||||
CREATE TABLE DIM_HOTEL_CHAIN (
|
||||
hotel_chain_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
hotel_chain_id NUMBER(10,0) NOT NULL,
|
||||
@@ -55,22 +88,31 @@ CREATE TABLE DIM_HOTEL_CHAIN (
|
||||
CONSTRAINT uq_dim_chain_id UNIQUE (hotel_chain_id)
|
||||
);
|
||||
|
||||
-- SCD Type 2 — hotels can change star rating or chain affiliation over time.
|
||||
-- source_hotel_id is the natural key from MySQL; hotel_key is the surrogate.
|
||||
-- One hotel can have multiple rows; IS_CURRENT=1 row is the active version.
|
||||
-- FACT_ROOM_BOOKING links to the hotel version current at check-in date.
|
||||
CREATE TABLE DIM_HOTEL (
|
||||
hotel_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
hotel_id NUMBER(10,0) NOT NULL,
|
||||
source_hotel_id NUMBER(10,0) NOT NULL,
|
||||
hotel_chain_key NUMBER(10,0),
|
||||
country_key NUMBER(10,0) NOT NULL,
|
||||
star_rating_key NUMBER(10,0) NOT NULL,
|
||||
code VARCHAR2(20) NOT NULL,
|
||||
name VARCHAR2(150) NOT NULL,
|
||||
city VARCHAR2(100) NOT NULL,
|
||||
-- SCD2 versioning
|
||||
effective_date DATE NOT NULL,
|
||||
expiry_date DATE,
|
||||
is_current NUMBER(1,0) DEFAULT 1 NOT NULL,
|
||||
CONSTRAINT pk_dim_hotel PRIMARY KEY (hotel_key),
|
||||
CONSTRAINT uq_dim_hotel_id UNIQUE (hotel_id),
|
||||
CONSTRAINT ck_dh_current CHECK (is_current IN (0,1)),
|
||||
CONSTRAINT fk_dh_chain FOREIGN KEY (hotel_chain_key) REFERENCES DIM_HOTEL_CHAIN (hotel_chain_key),
|
||||
CONSTRAINT fk_dh_country FOREIGN KEY (country_key) REFERENCES DIM_COUNTRY (country_key),
|
||||
CONSTRAINT fk_dh_star FOREIGN KEY (star_rating_key) REFERENCES DIM_STAR_RATING (star_rating_key)
|
||||
);
|
||||
|
||||
-- SCD Type 1 — room type/floor rarely changes; upsert is sufficient
|
||||
CREATE TABLE DIM_ROOM (
|
||||
room_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
room_id NUMBER(10,0) NOT NULL,
|
||||
@@ -87,6 +129,7 @@ CREATE TABLE DIM_ROOM (
|
||||
CONSTRAINT ck_dim_room_smk CHECK (smoking_yn IN (0,1))
|
||||
);
|
||||
|
||||
-- SCD Type 1 — guest contact details are overwritten if they change
|
||||
CREATE TABLE DIM_GUEST (
|
||||
guest_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
guest_id NUMBER(10,0) NOT NULL,
|
||||
@@ -102,11 +145,13 @@ CREATE TABLE DIM_GUEST (
|
||||
-- FACT TABLE
|
||||
-- -----------------------------------------------------------------------------
|
||||
|
||||
-- Grain: one row per room_booking
|
||||
-- Revenue measures: nightly_rate, total_amount, nights_stayed
|
||||
-- Grain: one row per room_booking.
|
||||
-- source_rb_id: natural key from MySQL — used for idempotent incremental loads.
|
||||
-- hotel_key: points to the DIM_HOTEL version active at check-in (SCD2 lookup).
|
||||
CREATE TABLE FACT_ROOM_BOOKING (
|
||||
fact_id NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
|
||||
-- foreign keys
|
||||
source_rb_id NUMBER(10,0) NOT NULL,
|
||||
-- dimension FKs
|
||||
hotel_key NUMBER(10,0) NOT NULL,
|
||||
hotel_chain_key NUMBER(10,0),
|
||||
room_key NUMBER(10,0) NOT NULL,
|
||||
@@ -115,13 +160,14 @@ CREATE TABLE FACT_ROOM_BOOKING (
|
||||
star_rating_key NUMBER(10,0) NOT NULL,
|
||||
checkin_date_key NUMBER(8,0) NOT NULL,
|
||||
checkout_date_key NUMBER(8,0) NOT NULL,
|
||||
-- degenerate dimensions
|
||||
-- degenerate dimension
|
||||
booking_status VARCHAR2(20) NOT NULL,
|
||||
-- measures
|
||||
nights_stayed NUMBER(4,0) NOT NULL,
|
||||
nightly_rate NUMBER(10,2) NOT NULL,
|
||||
total_amount NUMBER(12,2) NOT NULL,
|
||||
CONSTRAINT pk_fact_rb PRIMARY KEY (fact_id),
|
||||
CONSTRAINT uq_fact_rb_src UNIQUE (source_rb_id),
|
||||
CONSTRAINT fk_frb_hotel FOREIGN KEY (hotel_key) REFERENCES DIM_HOTEL (hotel_key),
|
||||
CONSTRAINT fk_frb_chain FOREIGN KEY (hotel_chain_key) REFERENCES DIM_HOTEL_CHAIN (hotel_chain_key),
|
||||
CONSTRAINT fk_frb_room FOREIGN KEY (room_key) REFERENCES DIM_ROOM (room_key),
|
||||
|
||||
Reference in New Issue
Block a user