Compare commits

...

3 Commits

Author SHA1 Message Date
718407d709 nifi flow 2026-05-17 21:24:37 +02:00
348a074a3a datamart updated 2026-05-17 21:23:55 +02:00
6c70628616 generate script fix 2026-05-17 21:21:43 +02:00
3 changed files with 513 additions and 16 deletions

451
docs/nifi-flow.md Normal file
View File

@@ -0,0 +1,451 @@
# NiFi ETL Flow — Hotel Reservations Data Mart
## Overview
The flow moves data from **MySQL 8.4 OLTP** (source) into **Oracle Data Mart** (target).
It is organized into **5 Process Groups** that run in sequence, controlled by a top-level scheduler.
```
[PG-1: Date Dim] → [PG-2: Static Dims] → [PG-3: SCD2 Hotel Dim] → [PG-4: SCD1 Guest] → [PG-5: Fact (incremental)]
```
Each PG has a single **Input Port** and **Output Port** so the orchestrator can chain them with connections.
---
## Controller Services (shared by all PGs)
| Name | Type | Config |
|------|------|--------|
| `MySQL_DBCPService` | DBCPConnectionPool | Driver: `com.mysql.cj.jdbc.Driver`; URL: `jdbc:mysql://127.0.0.1:13306/hotel_reservations`; User: `root`; Pwd: `hotel2025root` |
| `Oracle_DBCPService` | DBCPConnectionPool | Driver: `oracle.jdbc.OracleDriver`; URL: `jdbc:oracle:thin:@<host>:1521:<sid>`; User: `<schema>`; Pwd: `<pwd>` |
| `JsonReader` | JsonTreeReader | default settings |
| `JsonWriter` | JsonRecordSetWriter | default settings |
| `AvroReader` | AvroReader | default settings |
---
## PG-1: Load Date Dimension
**Runs once** (or when extending the date range). Populates `DIM_DATE` for 20202030.
```
GenerateFlowFile → ExecuteScript → SplitJson → EvaluateJsonPath → PutSQL
```
### Processors
**GenerateFlowFile**
- Run Schedule: manual (run once via right-click → Run Once)
- Custom Text: `{}`
**ExecuteScript** (Groovy)
```groovy
import groovy.json.JsonOutput
import java.time.*
def rows = []
def d = LocalDate.of(2020, 1, 1)
def end = LocalDate.of(2030, 12, 31)
while (!d.isAfter(end)) {
def m = d.monthValue
def season = (m >= 6 && m <= 8) ? 'Peak'
: (m >= 3 && m <= 5) ? 'High'
: (m >= 9 && m <= 11) ? 'Autumn'
: 'Winter'
rows << [
date_key: d.format(java.time.format.DateTimeFormatter.ofPattern('yyyyMMdd')) as int,
full_date: d.toString(),
year: d.year,
quarter: ((m - 1) / 3 + 1) as int,
month: m,
month_name: d.month.toString().capitalize(),
week_number: d.get(java.time.temporal.WeekFields.ISO.weekOfYear()),
day_of_month: d.dayOfMonth,
day_name: d.dayOfWeek.toString().capitalize(),
is_weekend: (d.dayOfWeek.value >= 6) ? 1 : 0,
is_business_day: (d.dayOfWeek.value <= 5) ? 1 : 0,
season: season
]
d = d.plusDays(1)
}
def ff = session.create()
ff = session.write(ff, { out -> out.write(JsonOutput.toJson(rows).bytes) } as OutputStreamCallback)
ff = session.putAttribute(ff, 'mime.type', 'application/json')
session.transfer(ff, REL_SUCCESS)
```
**SplitJson**
- JsonPath Expression: `$.*`
**EvaluateJsonPath**
- Destination: `flowfile-attribute`
- Attributes: `date_key`, `full_date`, `year`, `quarter`, `month`, `month_name`, `week_number`, `day_of_month`, `day_name`, `is_weekend`, `is_business_day`, `season`
**PutSQL**
- JDBC Connection Pool: `Oracle_DBCPService`
- SQL Statement:
```sql
INSERT INTO DIM_DATE (date_key, full_date, year, quarter, month, month_name,
week_number, day_of_month, day_name, is_weekend, is_business_day, season)
VALUES (${date_key}, TO_DATE('${full_date}','YYYY-MM-DD'), ${year}, ${quarter},
${month}, '${month_name}', ${week_number}, ${day_of_month},
'${day_name}', ${is_weekend}, ${is_business_day}, '${season}')
```
---
## PG-2: Static Dimensions (SCD Type 1)
Loads `DIM_COUNTRY`, `DIM_STAR_RATING`, `DIM_HOTEL_CHAIN` from MySQL.
Uses **MERGE INTO** so the flow is idempotent — re-running it updates changed rows and inserts new ones.
Each sub-flow follows the same pattern:
```
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
```
### 2a — DIM_COUNTRY
**ExecuteSQL** — Connection Pool: `MySQL_DBCPService`
```sql
SELECT country_id, code, name, currency FROM country ORDER BY country_id
```
**EvaluateJsonPath** attributes: `country_id`, `code`, `name`, `currency`
**PutSQL**
```sql
MERGE INTO DIM_COUNTRY tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.country_id = ${country_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}', tgt.currency = '${currency}'
WHEN NOT MATCHED THEN
INSERT (country_id, code, name, currency)
VALUES (${country_id}, '${code}', '${name}', '${currency}')
```
### 2b — DIM_STAR_RATING
**ExecuteSQL**
```sql
SELECT star_rating_id, code, description FROM star_rating ORDER BY code
```
**PutSQL**
```sql
MERGE INTO DIM_STAR_RATING tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.star_rating_id = ${star_rating_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = ${code}, tgt.description = '${description}'
WHEN NOT MATCHED THEN
INSERT (star_rating_id, code, description)
VALUES (${star_rating_id}, ${code}, '${description}')
```
### 2c — DIM_HOTEL_CHAIN
**ExecuteSQL**
```sql
SELECT hotel_chain_id, code, name FROM hotel_chain ORDER BY hotel_chain_id
```
**PutSQL**
```sql
MERGE INTO DIM_HOTEL_CHAIN tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.hotel_chain_id = ${hotel_chain_id})
WHEN MATCHED THEN
UPDATE SET tgt.code = '${code}', tgt.name = '${name}'
WHEN NOT MATCHED THEN
INSERT (hotel_chain_id, code, name)
VALUES (${hotel_chain_id}, '${code}', '${name}')
```
---
## PG-3: DIM_HOTEL — SCD Type 2
This is the analytically significant dimension. Hotels change star rating and chain affiliation over time (renovations, rebrandings). SCD Type 2 preserves history so reports can accurately show revenue by star category **at the time of booking**, not just today's category.
**Architecture:** NiFi stages raw data into `STG_HOTEL`, then an `ExecuteScript` runs the SCD2 SQL logic in a single Oracle transaction.
```
[Truncate STG] → [Load STG from MySQL] → [Apply SCD2 SQL]
```
### Step A: Truncate staging
**GenerateFlowFile****PutSQL**
```sql
TRUNCATE TABLE STG_HOTEL
```
### Step B: Load staging from MySQL
**ExecuteSQL**`MySQL_DBCPService`
```sql
SELECT
h.hotel_id,
hc.code AS chain_code,
c.code AS country_code,
sr.code AS star_code,
h.code,
h.name,
h.city
FROM hotel h
JOIN country c ON c.country_id = h.country_id
JOIN star_rating sr ON sr.star_rating_id = h.star_rating_id
LEFT JOIN hotel_chain hc ON hc.hotel_chain_id = h.hotel_chain_id
ORDER BY h.hotel_id
```
**ConvertAvroToJSON****SplitJson** (`$.*`)
**EvaluateJsonPath** attributes: `hotel_id`, `chain_code`, `country_code`, `star_code`, `code`, `name`, `city`
**PutSQL**`STG_HOTEL`
```sql
INSERT INTO STG_HOTEL (hotel_id, chain_code, country_code, star_code, code, name, city)
VALUES (${hotel_id}, NULLIF('${chain_code}',''), '${country_code}', ${star_code}, '${code}', '${name}', '${city}')
```
### Step C: Apply SCD2 logic
**GenerateFlowFile** (runs after B finishes) → **ExecuteScript** (Groovy)
The Groovy script opens a JDBC connection and executes two SQL statements in one transaction:
```groovy
import java.sql.*
def conn = context.controllerServiceLookup
.getControllerService('Oracle_DBCPService_ID')
.getConnection()
conn.autoCommit = false
try {
// 1. Expire records whose tracked attributes changed
conn.prepareStatement("""
UPDATE DIM_HOTEL dh
SET dh.expiry_date = TRUNC(SYSDATE) - 1,
dh.is_current = 0
WHERE dh.is_current = 1
AND EXISTS (
SELECT 1 FROM STG_HOTEL s
WHERE s.hotel_id = dh.source_hotel_id
AND (
NVL(s.chain_code,'~') != NVL((
SELECT hc.code FROM DIM_HOTEL_CHAIN hc
WHERE hc.hotel_chain_key = dh.hotel_chain_key),'~')
OR s.star_code != (
SELECT ds.code FROM DIM_STAR_RATING ds
WHERE ds.star_rating_key = dh.star_rating_key)
OR s.city != dh.city
)
)
""").executeUpdate()
// 2. Insert new version for changed hotels + insert brand-new hotels
conn.prepareStatement("""
INSERT INTO DIM_HOTEL (
source_hotel_id, hotel_chain_key, country_key, star_rating_key,
code, name, city, effective_date, expiry_date, is_current)
SELECT
s.hotel_id,
(SELECT hc.hotel_chain_key FROM DIM_HOTEL_CHAIN hc WHERE hc.code = s.chain_code),
(SELECT dc.country_key FROM DIM_COUNTRY dc WHERE dc.code = s.country_code),
(SELECT ds.star_rating_key FROM DIM_STAR_RATING ds WHERE ds.code = s.star_code),
s.code, s.name, s.city,
TRUNC(SYSDATE), NULL, 1
FROM STG_HOTEL s
WHERE NOT EXISTS (
SELECT 1 FROM DIM_HOTEL d
WHERE d.source_hotel_id = s.hotel_id
AND d.is_current = 1
)
""").executeUpdate()
conn.commit()
} catch (Exception e) {
conn.rollback()
throw e
} finally {
conn.close()
}
def ff = session.create()
session.transfer(ff, REL_SUCCESS)
```
---
## PG-4: DIM_GUEST — SCD Type 1
Guest personal data (city, country) can change without any analytical value in tracking the history. Plain MERGE/upsert is correct here.
```
ExecuteSQL(MySQL) → ConvertAvroToJSON → SplitJson → EvaluateJsonPath → PutSQL(MERGE)
```
**ExecuteSQL**`MySQL_DBCPService`
```sql
SELECT g.guest_id, c.code AS country_code, g.name, g.city
FROM guest g
LEFT JOIN country c ON c.country_id = g.country_id
ORDER BY g.guest_id
```
**EvaluateJsonPath** attributes: `guest_id`, `country_code`, `name`, `city`
**PutSQL**
```sql
MERGE INTO DIM_GUEST tgt
USING (SELECT 1 FROM DUAL) src ON (tgt.guest_id = ${guest_id})
WHEN MATCHED THEN
UPDATE SET
tgt.country_key = (SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
tgt.name = '${name}',
tgt.city = NULLIF('${city}','')
WHEN NOT MATCHED THEN
INSERT (guest_id, country_key, name, city)
VALUES (
${guest_id},
(SELECT country_key FROM DIM_COUNTRY WHERE code = NULLIF('${country_code}','')),
'${name}',
NULLIF('${city}','')
)
```
> **Note:** DIM_ROOM is also SCD Type 1 — load it the same way as DIM_GUEST, joining `hotel_room` with `room_type` in MySQL and MERGEing into `DIM_ROOM` (surrogate key lookup via `source_hotel_id + IS_CURRENT=1` from DIM_HOTEL).
---
## PG-5: FACT_ROOM_BOOKING — Incremental Load (Watermark)
The fact table is loaded **incrementally**: only `room_booking` rows with `room_booking_id` greater than the last loaded value are processed. The watermark is stored in `ETL_WATERMARK` in Oracle.
`source_rb_id` on `FACT_ROOM_BOOKING` has a UNIQUE constraint, so re-running is safe — duplicates are silently skipped.
```
[Read Watermark] → [ExecuteSQL MySQL] → [ConvertAvroToJSON] → [SplitJson]
→ [EvaluateJsonPath] → [PutSQL FACT] → [Update Watermark]
```
### Step A: Read watermark
**ExecuteSQL**`Oracle_DBCPService`
```sql
SELECT last_key FROM ETL_WATERMARK WHERE entity_name = 'FACT_ROOM_BOOKING'
```
**ConvertAvroToJSON****EvaluateJsonPath**
- `watermark``$.last_key`
### Step B: Load from MySQL
**ExecuteSQL**`MySQL_DBCPService`
SQL Statement (use attribute `${watermark}`):
```sql
SELECT
rb.room_booking_id,
rb.room_id,
rb.date_from,
rb.date_to,
rb.nightly_rate,
rb.total_amount,
b.guest_id,
b.status AS booking_status,
DATEDIFF(rb.date_to, rb.date_from) AS nights_stayed
FROM room_booking rb
JOIN booking b ON b.booking_id = rb.booking_id
WHERE rb.room_booking_id > ${watermark}
ORDER BY rb.room_booking_id
LIMIT 50000
```
> Set LIMIT to control batch size. Run PG-5 in a loop (using a Timer-driven GenerateFlowFile) until no rows come back.
### Step C: Split + extract attributes
**ConvertAvroToJSON****SplitJson** (`$.*`)
**EvaluateJsonPath** attributes:
`room_booking_id`, `room_id`, `guest_id`, `date_from`, `date_to`, `nightly_rate`, `total_amount`, `booking_status`, `nights_stayed`
### Step D: Insert into fact table
**PutSQL**`Oracle_DBCPService`
```sql
INSERT INTO FACT_ROOM_BOOKING (
source_rb_id, hotel_key, hotel_chain_key, room_key, guest_key, country_key,
star_rating_key, checkin_date_key, checkout_date_key,
booking_status, nights_stayed, nightly_rate, total_amount)
SELECT
${room_booking_id},
dh.hotel_key,
dh.hotel_chain_key,
dr.room_key,
dg.guest_key,
dg.country_key,
dh.star_rating_key,
TO_NUMBER(TO_CHAR(TO_DATE('${date_from}','YYYY-MM-DD'), 'YYYYMMDD')),
TO_NUMBER(TO_CHAR(TO_DATE('${date_to}', 'YYYY-MM-DD'), 'YYYYMMDD')),
'${booking_status}',
${nights_stayed},
TO_NUMBER('${nightly_rate}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,'''),
TO_NUMBER('${total_amount}', '9999990D99', 'NLS_NUMERIC_CHARACTERS=''.,''')
FROM
DIM_ROOM dr,
DIM_GUEST dg,
DIM_HOTEL dh
WHERE
dr.room_id = ${room_id}
AND dg.guest_id = ${guest_id}
AND dh.hotel_key = dr.hotel_key
-- SCD2 lookup: find hotel version active at check-in date
AND dh.effective_date <= TO_DATE('${date_from}','YYYY-MM-DD')
AND (dh.expiry_date IS NULL OR dh.expiry_date > TO_DATE('${date_from}','YYYY-MM-DD'))
-- Idempotent: skip if already loaded
AND NOT EXISTS (
SELECT 1 FROM FACT_ROOM_BOOKING f WHERE f.source_rb_id = ${room_booking_id}
)
```
> The `DH.EFFECTIVE_DATE / EXPIRY_DATE` condition is the payoff of SCD Type 2: the fact row always references the hotel dimension version that was true **when the guest actually checked in**, not what the hotel looks like today.
**Ignore Errors** on PutSQL (route `failure` → funnel) — UNIQUE constraint violations on `source_rb_id` are expected and harmless on re-runs.
### Step E: Update watermark
After the PutSQL succeeds, update the watermark with the highest `room_booking_id` seen in this batch.
**UpdateAttribute**
- `max_rb_id``${room_booking_id}` (NiFi Expression Language `max()` across the batch via a custom processor or MergeContent trick)
> Simplest approach: add a final **ExecuteSQL** that runs after the batch:
```sql
UPDATE ETL_WATERMARK
SET last_key = (SELECT MAX(source_rb_id) FROM FACT_ROOM_BOOKING),
last_run_ts = SYSTIMESTAMP
WHERE entity_name = 'FACT_ROOM_BOOKING'
```
---
## Execution Order & Scheduling
| PG | Trigger | Frequency |
|----|---------|-----------|
| PG-1 (Date Dim) | Manual (run once) | — |
| PG-2 (Static Dims) | Timer — 24h | Daily |
| PG-3 (DIM_HOTEL SCD2) | Timer — 24h | Daily, after PG-2 |
| PG-4 (DIM_GUEST SCD1) | Timer — 24h | Daily, after PG-3 |
| PG-5 (Fact incremental) | Timer — 1h | Hourly |
Chain PG-2 → PG-3 → PG-4 by connecting each PG's Output Port to the next PG's Input Port via a **success** relationship.
---
## Why SCD Type 2 for DIM_HOTEL?
A hotel being upgraded from 3★ to 4★ changes its rate tier going forward. If we just overwrite the dimension (SCD1), all historical bookings would suddenly appear to have been made in a 4★ hotel — inflating average revenue per star category in reports. SCD2 preserves the correct picture: every fact row points to the exact hotel version that was true at check-in.

View File

@@ -19,6 +19,7 @@ var rng = new Random(SEED);
await using var conn = new MySqlConnection(DSN);
await conn.OpenAsync();
Console.WriteLine("Connected.");
await new MySqlCommand("SET foreign_key_checks=0, unique_checks=0", conn).ExecuteNonQueryAsync();
async Task Exec(string sql)
{
@@ -480,9 +481,8 @@ while (bookingsDone < BOOKING_COUNT)
bookingRows.Add($"({guestId},{hotelId},{D(checkin)},{D(checkout)},{S(status)},{DT(created)})");
}
// Insert bookings and get the first inserted ID
long firstId = await ExecScalar("SELECT AUTO_INCREMENT FROM information_schema.tables WHERE table_schema='hotel_reservations' AND table_name='booking'");
await Exec($"INSERT INTO booking (guest_id, hotel_id, date_from, date_to, status, created_at) VALUES {string.Join(',', bookingRows)}");
long firstId = await ExecScalar("SELECT LAST_INSERT_ID()");
// Re-derive checkin/nights from the same rng sequence is impossible after the fact,
// so re-parse from inserted rows to build room_bookings

View File

@@ -1,9 +1,39 @@
-- =============================================================================
-- HOTEL RESERVATIONS — DATA MART (STAR SCHEMA)
-- Target: Oracle (university lab schema)
-- Based on A.24 Revenue Data Mart — Dimensional Modelling by Example
-- =============================================================================
-- -----------------------------------------------------------------------------
-- ETL CONTROL TABLE
-- Tracks incremental load watermarks per entity.
-- -----------------------------------------------------------------------------
CREATE TABLE ETL_WATERMARK (
entity_name VARCHAR2(50) NOT NULL,
last_key NUMBER(20,0) DEFAULT 0 NOT NULL,
last_run_ts TIMESTAMP DEFAULT SYSTIMESTAMP,
CONSTRAINT pk_etl_wm PRIMARY KEY (entity_name)
);
INSERT INTO ETL_WATERMARK (entity_name, last_key) VALUES ('FACT_ROOM_BOOKING', 0);
COMMIT;
-- -----------------------------------------------------------------------------
-- STAGING TABLES
-- NiFi loads raw MySQL data here first; SCD logic runs in pure SQL after.
-- Truncated at the start of each ETL run.
-- -----------------------------------------------------------------------------
CREATE TABLE STG_HOTEL (
hotel_id NUMBER(10,0) NOT NULL,
chain_code VARCHAR2(10),
country_code CHAR(2) NOT NULL,
star_code NUMBER(1,0) NOT NULL,
code VARCHAR2(20) NOT NULL,
name VARCHAR2(150) NOT NULL,
city VARCHAR2(100) NOT NULL
);
-- -----------------------------------------------------------------------------
-- DIMENSION TABLES
-- -----------------------------------------------------------------------------
@@ -21,12 +51,13 @@ CREATE TABLE DIM_DATE (
day_name VARCHAR2(10) NOT NULL,
is_weekend NUMBER(1,0) NOT NULL,
is_business_day NUMBER(1,0) NOT NULL,
season VARCHAR2(10) NOT NULL, -- Peak / High / Low / Off
season VARCHAR2(10) NOT NULL,
CONSTRAINT pk_dim_date PRIMARY KEY (date_key),
CONSTRAINT ck_dim_date_wknd CHECK (is_weekend IN (0,1)),
CONSTRAINT ck_dim_date_bday CHECK (is_business_day IN (0,1))
);
-- SCD Type 1 — country attributes are stable; just overwrite if anything changes
CREATE TABLE DIM_COUNTRY (
country_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
country_id NUMBER(10,0) NOT NULL,
@@ -37,6 +68,7 @@ CREATE TABLE DIM_COUNTRY (
CONSTRAINT uq_dim_cntry_id UNIQUE (country_id)
);
-- SCD Type 1 — star rating lookup, never changes
CREATE TABLE DIM_STAR_RATING (
star_rating_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
star_rating_id NUMBER(10,0) NOT NULL,
@@ -46,6 +78,7 @@ CREATE TABLE DIM_STAR_RATING (
CONSTRAINT uq_dim_star_id UNIQUE (star_rating_id)
);
-- SCD Type 1 — chain name/code rarely changes; overwrite
CREATE TABLE DIM_HOTEL_CHAIN (
hotel_chain_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
hotel_chain_id NUMBER(10,0) NOT NULL,
@@ -55,22 +88,31 @@ CREATE TABLE DIM_HOTEL_CHAIN (
CONSTRAINT uq_dim_chain_id UNIQUE (hotel_chain_id)
);
-- SCD Type 2 — hotels can change star rating or chain affiliation over time.
-- source_hotel_id is the natural key from MySQL; hotel_key is the surrogate.
-- One hotel can have multiple rows; IS_CURRENT=1 row is the active version.
-- FACT_ROOM_BOOKING links to the hotel version current at check-in date.
CREATE TABLE DIM_HOTEL (
hotel_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
hotel_id NUMBER(10,0) NOT NULL,
source_hotel_id NUMBER(10,0) NOT NULL,
hotel_chain_key NUMBER(10,0),
country_key NUMBER(10,0) NOT NULL,
star_rating_key NUMBER(10,0) NOT NULL,
code VARCHAR2(20) NOT NULL,
name VARCHAR2(150) NOT NULL,
city VARCHAR2(100) NOT NULL,
-- SCD2 versioning
effective_date DATE NOT NULL,
expiry_date DATE,
is_current NUMBER(1,0) DEFAULT 1 NOT NULL,
CONSTRAINT pk_dim_hotel PRIMARY KEY (hotel_key),
CONSTRAINT uq_dim_hotel_id UNIQUE (hotel_id),
CONSTRAINT ck_dh_current CHECK (is_current IN (0,1)),
CONSTRAINT fk_dh_chain FOREIGN KEY (hotel_chain_key) REFERENCES DIM_HOTEL_CHAIN (hotel_chain_key),
CONSTRAINT fk_dh_country FOREIGN KEY (country_key) REFERENCES DIM_COUNTRY (country_key),
CONSTRAINT fk_dh_star FOREIGN KEY (star_rating_key) REFERENCES DIM_STAR_RATING (star_rating_key)
);
-- SCD Type 1 — room type/floor rarely changes; upsert is sufficient
CREATE TABLE DIM_ROOM (
room_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
room_id NUMBER(10,0) NOT NULL,
@@ -87,6 +129,7 @@ CREATE TABLE DIM_ROOM (
CONSTRAINT ck_dim_room_smk CHECK (smoking_yn IN (0,1))
);
-- SCD Type 1 — guest contact details are overwritten if they change
CREATE TABLE DIM_GUEST (
guest_key NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
guest_id NUMBER(10,0) NOT NULL,
@@ -102,11 +145,13 @@ CREATE TABLE DIM_GUEST (
-- FACT TABLE
-- -----------------------------------------------------------------------------
-- Grain: one row per room_booking
-- Revenue measures: nightly_rate, total_amount, nights_stayed
-- Grain: one row per room_booking.
-- source_rb_id: natural key from MySQL — used for idempotent incremental loads.
-- hotel_key: points to the DIM_HOTEL version active at check-in (SCD2 lookup).
CREATE TABLE FACT_ROOM_BOOKING (
fact_id NUMBER(10,0) GENERATED ALWAYS AS IDENTITY,
-- foreign keys
source_rb_id NUMBER(10,0) NOT NULL,
-- dimension FKs
hotel_key NUMBER(10,0) NOT NULL,
hotel_chain_key NUMBER(10,0),
room_key NUMBER(10,0) NOT NULL,
@@ -115,13 +160,14 @@ CREATE TABLE FACT_ROOM_BOOKING (
star_rating_key NUMBER(10,0) NOT NULL,
checkin_date_key NUMBER(8,0) NOT NULL,
checkout_date_key NUMBER(8,0) NOT NULL,
-- degenerate dimensions
-- degenerate dimension
booking_status VARCHAR2(20) NOT NULL,
-- measures
nights_stayed NUMBER(4,0) NOT NULL,
nightly_rate NUMBER(10,2) NOT NULL,
total_amount NUMBER(12,2) NOT NULL,
CONSTRAINT pk_fact_rb PRIMARY KEY (fact_id),
CONSTRAINT uq_fact_rb_src UNIQUE (source_rb_id),
CONSTRAINT fk_frb_hotel FOREIGN KEY (hotel_key) REFERENCES DIM_HOTEL (hotel_key),
CONSTRAINT fk_frb_chain FOREIGN KEY (hotel_chain_key) REFERENCES DIM_HOTEL_CHAIN (hotel_chain_key),
CONSTRAINT fk_frb_room FOREIGN KEY (room_key) REFERENCES DIM_ROOM (room_key),