Nifi quide
This commit is contained in:
359
nifi/NIFI_SETUP.md
Normal file
359
nifi/NIFI_SETUP.md
Normal file
@@ -0,0 +1,359 @@
|
||||
# NiFi Data Mart Setup Guide
|
||||
|
||||
This guide walks through building the ETL flow that reads from the MySQL OLTP database and loads the star schema into Oracle.
|
||||
|
||||
---
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Apache NiFi is running and you can open the UI in a browser
|
||||
- MySQL container is up (`./docker/start.sh`)
|
||||
- You have your Oracle credentials from the lab (host, port, service name, username, password)
|
||||
- You have the MySQL JDBC driver JAR and Oracle JDBC driver JAR available on the NiFi machine
|
||||
|
||||
> **JDBC drivers** — NiFi does not ship with database drivers. Download:
|
||||
> - MySQL: `mysql-connector-j-8.x.x.jar` from [dev.mysql.com](https://dev.mysql.com/downloads/connector/j/)
|
||||
> - Oracle: `ojdbc11.jar` from [oracle.com](https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html)
|
||||
>
|
||||
> Place both JARs in a folder NiFi can reach, e.g. `/opt/nifi/extra-jars/`.
|
||||
|
||||
---
|
||||
|
||||
## Step 1 — Create the two Database Connection Pools
|
||||
|
||||
Connection pools are shared resources. You create them once and all processors reuse them.
|
||||
|
||||
### 1.1 Open Controller Services
|
||||
|
||||
1. In the NiFi canvas, click the **≡ menu** (top right) → **Controller Settings**
|
||||
2. Go to the **Controller Services** tab
|
||||
3. Click the **+** button to add a new service
|
||||
|
||||
### 1.2 MySQL Connection Pool
|
||||
|
||||
1. Search for `DBCPConnectionPool` → Add
|
||||
2. Click the **gear icon** on the new service to configure it:
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| Database Connection URL | `jdbc:mysql://127.0.0.1:13306/ewc2025` |
|
||||
| Database Driver Class Name | `com.mysql.cj.jdbc.Driver` |
|
||||
| Database Driver Location(s) | `/opt/nifi/extra-jars/mysql-connector-j-8.x.x.jar` |
|
||||
| Database User | `root` |
|
||||
| Password | `ewc2025root` |
|
||||
|
||||
3. Click **Apply**, then click the **lightning bolt** icon to enable the service.
|
||||
|
||||
### 1.3 Oracle Connection Pool
|
||||
|
||||
1. Add another `DBCPConnectionPool`
|
||||
2. Configure it:
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| Database Connection URL | `jdbc:oracle:thin:@<host>:<port>/<service>` |
|
||||
| Database Driver Class Name | `oracle.jdbc.OracleDriver` |
|
||||
| Database Driver Location(s) | `/opt/nifi/extra-jars/ojdbc11.jar` |
|
||||
| Database User | *(your lab username)* |
|
||||
| Password | *(your lab password)* |
|
||||
|
||||
3. Apply and enable.
|
||||
|
||||
---
|
||||
|
||||
## Step 2 — Understand the Processor Chain
|
||||
|
||||
Every pipeline in this flow follows the same 5-processor pattern:
|
||||
|
||||
```
|
||||
ExecuteSQL ──► ConvertAvroToJSON ──► SplitJson ──► EvaluateJsonPath ──► PutSQL
|
||||
(MySQL) (Oracle)
|
||||
```
|
||||
|
||||
| Processor | What it does |
|
||||
|---|---|
|
||||
| **ExecuteSQL** | Runs the extract SQL query on MySQL, outputs Avro binary |
|
||||
| **ConvertAvroToJSON** | Converts Avro to a JSON array of records |
|
||||
| **SplitJson** | Splits the array into one FlowFile per record |
|
||||
| **EvaluateJsonPath** | Reads each JSON field and saves it as a FlowFile attribute |
|
||||
| **PutSQL** | Runs the load SQL on Oracle; NiFi fills in `${attribute}` placeholders |
|
||||
|
||||
You will build this chain **8 times** — once per table. The only things that change between pipelines are the SQL files and the EvaluateJsonPath field list.
|
||||
|
||||
---
|
||||
|
||||
## Step 3 — Build the First Pipeline (DIM_DATE)
|
||||
|
||||
We'll build this one in detail. The others follow the same steps.
|
||||
|
||||
### 3.1 Add processors to the canvas
|
||||
|
||||
Right-click on an empty area → **Add Processor**. Add these five in a row:
|
||||
|
||||
1. `ExecuteSQL`
|
||||
2. `ConvertAvroToJSON`
|
||||
3. `SplitJson`
|
||||
4. `EvaluateJsonPath`
|
||||
5. `PutSQL`
|
||||
|
||||
### 3.2 Configure ExecuteSQL
|
||||
|
||||
Double-click → **Properties**:
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| Database Connection Pooling Service | *(select your MySQL pool)* |
|
||||
| SQL select query | *(paste content of `nifi/sql/extract/01_dim_date.sql`)* |
|
||||
|
||||
In **Relationships**, auto-terminate the `failure` relationship (right-click the processor → check Failure).
|
||||
|
||||
### 3.3 Configure ConvertAvroToJSON
|
||||
|
||||
No changes needed from defaults. Just connect it.
|
||||
|
||||
### 3.4 Configure SplitJson
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| JsonPath Expression | `$.*` |
|
||||
|
||||
Auto-terminate `original` and `failure`.
|
||||
|
||||
### 3.5 Configure EvaluateJsonPath
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| Destination | `flowfile-attribute` |
|
||||
|
||||
Then add one property per field using the **+** button:
|
||||
|
||||
| Property Name | Value |
|
||||
|---|---|
|
||||
| `date_key` | `$.date_key` |
|
||||
| `full_date` | `$.full_date` |
|
||||
| `year` | `$.year` |
|
||||
| `quarter` | `$.quarter` |
|
||||
| `month` | `$.month` |
|
||||
| `month_name` | `$.month_name` |
|
||||
| `week_number` | `$.week_number` |
|
||||
| `day_of_month` | `$.day_of_month` |
|
||||
| `day_name` | `$.day_name` |
|
||||
|
||||
Auto-terminate `failure` and `unmatched`.
|
||||
|
||||
### 3.6 Configure PutSQL
|
||||
|
||||
| Property | Value |
|
||||
|---|---|
|
||||
| JDBC Connection Pool | *(select your Oracle pool)* |
|
||||
| SQL Statement | *(paste content of `nifi/sql/load/01_dim_date.sql`)* |
|
||||
|
||||
Auto-terminate `failure`, `retry`, `success`.
|
||||
|
||||
### 3.7 Connect the processors
|
||||
|
||||
Hover over a processor until the arrow icon appears, then drag to the next:
|
||||
|
||||
```
|
||||
ExecuteSQL ──(success)──► ConvertAvroToJSON ──(success)──► SplitJson ──(splits)──► EvaluateJsonPath ──(matched)──► PutSQL
|
||||
```
|
||||
|
||||
Use the `success` relationship on each connection unless noted otherwise. For SplitJson use `splits`.
|
||||
|
||||
---
|
||||
|
||||
## Step 4 — Build the Remaining Pipelines
|
||||
|
||||
Repeat Step 3 for each table below. The only differences are which SQL files you paste and which fields you add to EvaluateJsonPath.
|
||||
|
||||
### DIM_MEDAL (pipeline 2)
|
||||
|
||||
This one is different — it has **no extract step**. The 3 medal rows are inserted directly.
|
||||
|
||||
1. Add a single `ExecuteSQL` processor
|
||||
2. Connect it to the **Oracle** pool
|
||||
3. Paste the content of `nifi/sql/load/02_dim_medal.sql` as the SQL query
|
||||
4. Run it once manually (right-click → **Run Once**)
|
||||
|
||||
### DIM_GAME (pipeline 3)
|
||||
|
||||
**Extract:** `nifi/sql/extract/02_dim_game.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `game_id` | `$.game_id` |
|
||||
| `name` | `$.name` |
|
||||
| `game_type` | `$.game_type` |
|
||||
| `platform` | `$.platform` |
|
||||
|
||||
**Load:** `nifi/sql/load/03_dim_game.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
### DIM_COUNTRY (pipeline 4)
|
||||
|
||||
**Extract:** `nifi/sql/extract/03_dim_country.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `country_id` | `$.country_id` |
|
||||
| `name` | `$.name` |
|
||||
| `region` | `$.region` |
|
||||
|
||||
**Load:** `nifi/sql/load/04_dim_country.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
### DIM_ORGANIZATION (pipeline 5)
|
||||
|
||||
**Extract:** `nifi/sql/extract/04_dim_organization.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `organization_id` | `$.organization_id` |
|
||||
| `name` | `$.name` |
|
||||
| `region` | `$.region` |
|
||||
| `country` | `$.country` |
|
||||
| `club_partner_status` | `$.club_partner_status` |
|
||||
| `founded_year` | `$.founded_year` |
|
||||
| `social_media_followers_m` | `$.social_media_followers_m` |
|
||||
|
||||
**Load:** `nifi/sql/load/05_dim_organization.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
### FACT_TOURNAMENT (pipeline 6)
|
||||
|
||||
**Extract:** `nifi/sql/extract/05_fact_tournament.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `game_id` | `$.game_id` |
|
||||
| `start_date_key` | `$.start_date_key` |
|
||||
| `end_date_key` | `$.end_date_key` |
|
||||
| `winner_org_id` | `$.winner_org_id` |
|
||||
| `event_name` | `$.event_name` |
|
||||
| `gender` | `$.gender` |
|
||||
| `prize_pool_usd` | `$.prize_pool_usd` |
|
||||
| `num_participants` | `$.num_participants` |
|
||||
| `duration_days` | `$.duration_days` |
|
||||
| `has_club_points` | `$.has_club_points` |
|
||||
|
||||
**Load:** `nifi/sql/load/06_fact_tournament.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
### FACT_MEDAL_AWARD (pipeline 7)
|
||||
|
||||
**Extract:** `nifi/sql/extract/06_fact_medal_award.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `game_id` | `$.game_id` |
|
||||
| `country_id` | `$.country_id` |
|
||||
| `organization_id` | `$.organization_id` |
|
||||
| `medal_type` | `$.medal_type` |
|
||||
| `date_key` | `$.date_key` |
|
||||
| `player_name` | `$.player_name` |
|
||||
| `medal_count` | `$.medal_count` |
|
||||
| `medal_points` | `$.medal_points` |
|
||||
|
||||
**Load:** `nifi/sql/load/07_fact_medal_award.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
### FACT_CLUB_STANDING (pipeline 8)
|
||||
|
||||
**Extract:** `nifi/sql/extract/07_fact_club_standing.sql` on MySQL
|
||||
|
||||
**EvaluateJsonPath fields:**
|
||||
|
||||
| Name | Path |
|
||||
|---|---|
|
||||
| `organization_id` | `$.organization_id` |
|
||||
| `final_rank` | `$.final_rank` |
|
||||
| `total_points` | `$.total_points` |
|
||||
| `prize_money_usd` | `$.prize_money_usd` |
|
||||
| `tournament_wins` | `$.tournament_wins` |
|
||||
| `top_8_finishes` | `$.top_8_finishes` |
|
||||
| `eligible_to_win` | `$.eligible_to_win` |
|
||||
|
||||
**Load:** `nifi/sql/load/08_fact_club_standing.sql` on Oracle
|
||||
|
||||
---
|
||||
|
||||
## Step 5 — Run the Pipelines in Order
|
||||
|
||||
**The order matters** — facts reference dimension keys, so all dims must be loaded first.
|
||||
|
||||
Run each pipeline by right-clicking the **ExecuteSQL** processor → **Run Once**. Wait for all queues between processors to drain to 0 before starting the next pipeline.
|
||||
|
||||
```
|
||||
1. DIM_DATE
|
||||
2. DIM_MEDAL ← ExecuteSQL on Oracle directly, run once
|
||||
3. DIM_GAME
|
||||
4. DIM_COUNTRY
|
||||
5. DIM_ORGANIZATION ← must be after DIM_COUNTRY
|
||||
6. FACT_TOURNAMENT ← must be after all dims
|
||||
7. FACT_MEDAL_AWARD ← must be after all dims
|
||||
8. FACT_CLUB_STANDING ← must be after DIM_ORGANIZATION
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Step 6 — Verify the Load
|
||||
|
||||
After all pipelines complete, run these queries in your Oracle SQL client to confirm row counts:
|
||||
|
||||
```sql
|
||||
SELECT 'DIM_DATE' AS tbl, COUNT(*) AS cnt FROM DIM_DATE UNION ALL
|
||||
SELECT 'DIM_GAME' AS tbl, COUNT(*) AS cnt FROM DIM_GAME UNION ALL
|
||||
SELECT 'DIM_COUNTRY' AS tbl, COUNT(*) AS cnt FROM DIM_COUNTRY UNION ALL
|
||||
SELECT 'DIM_ORGANIZATION' AS tbl, COUNT(*) AS cnt FROM DIM_ORGANIZATION UNION ALL
|
||||
SELECT 'DIM_MEDAL' AS tbl, COUNT(*) AS cnt FROM DIM_MEDAL UNION ALL
|
||||
SELECT 'FACT_TOURNAMENT' AS tbl, COUNT(*) AS cnt FROM FACT_TOURNAMENT UNION ALL
|
||||
SELECT 'FACT_MEDAL_AWARD' AS tbl, COUNT(*) AS cnt FROM FACT_MEDAL_AWARD UNION ALL
|
||||
SELECT 'FACT_CLUB_STANDING' AS tbl, COUNT(*) AS cnt FROM FACT_CLUB_STANDING;
|
||||
```
|
||||
|
||||
Expected row counts:
|
||||
|
||||
| Table | Rows |
|
||||
|---|---|
|
||||
| DIM_DATE | 48 |
|
||||
| DIM_GAME | 25 |
|
||||
| DIM_COUNTRY | 36+ |
|
||||
| DIM_ORGANIZATION | 60+ |
|
||||
| DIM_MEDAL | 3 |
|
||||
| FACT_TOURNAMENT | 27 |
|
||||
| FACT_MEDAL_AWARD | 257 |
|
||||
| FACT_CLUB_STANDING | 24 |
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**FlowFiles stuck in a queue**
|
||||
Click the queue between two processors to inspect the FlowFiles. Click a FlowFile → **View** to see its content or attributes. This shows exactly what data is being passed and helps spot bad values.
|
||||
|
||||
**PutSQL fails with ORA- error**
|
||||
Right-click the failed FlowFile in the queue → **View** → check the `sql.error.message` attribute. Common causes:
|
||||
- `ORA-00001` — duplicate key, the table already has data; truncate the Oracle table and re-run
|
||||
- `ORA-01722` — invalid number, a numeric field contains an empty string; check EvaluateJsonPath matched correctly
|
||||
- `ORA-00904` — invalid identifier, the column name in the SQL doesn't match the table definition
|
||||
|
||||
**EvaluateJsonPath produces empty attributes**
|
||||
The JSON field name from ConvertAvroToJSON must exactly match the alias used in the extract SQL. Check that the SELECT alias (e.g. `AS game_id`) matches the JsonPath property name (`game_id`).
|
||||
|
||||
**ExecuteSQL on MySQL fails**
|
||||
Verify the MySQL container is running (`docker ps`) and that the JDBC URL uses port `13306`.
|
||||
Reference in New Issue
Block a user