Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions examples/data_engineering_pipline/nvidia-dask-rapids/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# 🚀 Dask-CUDA Multi-GPU ETL Template

## 🌟 Overview

This template provides an efficient, scalable solution for performing **Extract, Transform, and Load (ETL)** operations on datasets that are too large to fit in the memory of a single GPU or distributed GPU. It uses the **Dask** parallel computing library to orchestrate workers across multiple **NVIDIA GPUs**, leveraging the **RAPIDS cuDF** library for GPU-accelerated operations.

This architecture ensures maximum speedup for tasks involving filtering, feature engineering, and complex aggregations.

### Key Technologies

* **Dask Distributed:** Handles cluster management and task scheduling across all GPUs.
* **cuDF:** The GPU DataFrame library that provides a Pandas-like API for blazing-fast transformations.
* **LocalCUDACluster:** Automatically configures one Dask worker per available GPU.
* **Target Environment:** Linux with **CUDA Toolkit 12.x** installed (e.g., RunPod, AWS, or any **Saturn Cloud GPU instance**).

-----

## 🛠️ How to Run the Code

This template requires a specific setup to ensure the Python packages correctly link to the underlying NVIDIA CUDA libraries.

### 1\. Create the Environment

The `setup_env.sh` script creates an isolated Python virtual environment (`venv`), installs all necessary libraries, and sets the critical CUDA library paths.

```bash
#!/bin/bash

ENV_NAME="dask_rapids_etl_venv"
CUDA_VERSION="12"

# 1. Create venv
rm -rf $ENV_NAME
python3 -m venv $ENV_NAME
echo "✅ Virtual Environment created."

...
```

**Execution:** Run the script once in your terminal:

```bash
bash setup_env.sh
```

-----

### 2\. Procedures (Job Execution)

Once the setup script finishes, follow these steps in your terminal.

#### Step A: Activate the Environment

You must do this every time you open a new terminal or shell session.

```bash
source dask_rapids_etl_venv/bin/activate
```

#### Step B: Run the ETL Pipeline

Save the main Python code as `dask_cuda_etl.py` and execute it.

```bash
python dask_cuda_etl.py
```

The script will automatically perform the following steps:

1. **Cluster Startup:** Launch the `LocalCUDACluster` and create Dask workers (one per detected GPU, defaulting to 2 workers if the device count isn't explicitly set - note that you can easily change this to suite your resources availability).
2. **Extraction:** Generate synthetic data and move it from CPU to GPU memory (`cudf.from_pandas`).
3. **Transformation:** Execute GPU-accelerated filtering (`ddf_filtered`), log transformation (`np.log`), and distributed aggregation (`ddf_grouped`).
4. **Load:** The final result is collected back to a CPU Pandas DataFrame using `.compute()`.

-----

## 3\. 🎯 Configuration

The template defaults to **2 GPUs**. If your machine has more or fewer visible GPUs, you can override the variable at the top of the `dask_cuda_etl.py` script:

```python
# --- Configuration Section in dask_cuda_etl.py ---
# To force 3 GPUs, change the default:
# N_GPUS = 3

# try:
# visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES')
# if visible_devices:
# N_GPUS = len(visible_devices.split(','))
# else:
# # Fallback if the variable isn't set, so you can input Your GPU number here
# N_GPUS = 2
# except Exception:
# N_GPUS = 2
```

### Scaling and Deployment on Saturn Cloud

This template is an ideal starting point for production data science workflows. To move beyond local testing and leverage true scalability, we recommend deploying this project on **Saturn Cloud**:

* **Scalable Clusters:** Easily provision GPU clusters with multiple nodes (not just multiple GPUs on a single node) for true cluster computing. [Learn about Saturn Cloud Clusters](https://saturncloud.io/docs/#about-saturn-cloud).
* **Managed Environments:** Avoid complex `LD_LIBRARY_PATH` issues by using Saturn Cloud's fully managed RAPIDS/CUDA environment. [Explore Saturn Cloud Resources](https://saturncloud.io/docs/enterprise/).
* **Production Jobs:** Schedule this exact Python script as a recurring job on Saturn Cloud for automated ETL processing. [Set up a Saturn Cloud Job](https://www.saturncloud.io/docs/).

-----

## 🎉 Conclusion

This template successfully demonstrates the power of parallel, GPU-accelerated computing for ETL. By leveraging the **Dask-CUDA** framework, we transform a large dataset in seconds, showcasing massive speedups compared to traditional CPU-bound pipelines. This foundation is essential for processing the large data volumes encountered in modern ML and analytics tasks, providing a seamless pathway to scaling up on **Saturn Cloud**.
139 changes: 139 additions & 0 deletions examples/data_engineering_pipline/nvidia-dask-rapids/dask_cuda_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import dask
import pandas as pd
import numpy as np
import time

# --- Dask-CUDA/RAPIDS Imports ---
# These imports rely on the 'cudf-cu12' and 'dask-cuda' packages
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cudf
import dask_cudf

# --- Configuration ---
# LocalCUDACluster will autodetect GPUs, but we calculate the count for partitioning.
# If you wanted to restrict to GPUs 0 and 1, you would use: CUDA_VISIBLE_DEVICES="0,1"
# N_GPUS = int(os.environ.get('CUDA_VISIBLE_DEVICES', '0').count(',')) + 1

try:
visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES')
if visible_devices:
N_GPUS = len(visible_devices.split(','))
else:
# Fallback if the variable isn't set, so you can input Your GPU number here
N_GPUS = 2
except Exception:
N_GPUS = 2

# Define a reasonable synthetic workload size (e.g., 2 GB)
FILE_SIZE_MB = 2048

# --- ETL Logic ---

def generate_data(size_mb, n_partitions):
"""
E: Extract (Simulate large data creation)
Generates synthetic data on the CPU, then transfers it to Dask-cuDF on the GPU.
"""
n_rows = int((size_mb * 1024 * 1024) / 8 / 5)
print(f"Generating ~{n_rows / 1e6:.1f} Million rows of data...")

# Create the base DataFrame using Pandas (on CPU)
df_cpu = pd.DataFrame({
'user_id': np.random.randint(0, 500_000, n_rows),
'timestamp_s': np.random.randint(1609459200, 1640995200, n_rows),
'revenue': np.random.rand(n_rows) * 100,
'region': np.random.choice(['East', 'West', 'Central'], n_rows),
})

# Convert to Dask DataFrame, partitioned by the number of available GPUs
ddf_base = dask.dataframe.from_pandas(df_cpu, npartitions=n_partitions)

# Map partitions to cuDF: CRITICAL STEP to move data to GPU memory
ddf_gpu = ddf_base.map_partitions(cudf.from_pandas).persist()
print(f"✅ Data generated, loaded, and persisted across {n_partitions} GPU(s) as Dask-cuDF.")

return ddf_gpu

def run_etl(ddf_in):
"""
T: Transform (GPU-accelerated operations)
Performs filtering, feature engineering, and aggregation on the GPUs.
"""

print("\n--- Starting Multi-GPU ETL ---")

# 1. Filter and Persist
ddf_filtered = ddf_in[
(ddf_in['revenue'] > 50.0) &
(ddf_in['region'] == 'East')
].persist()
print(" - Filtering complete. Persisting intermediate result.")

# 2. Feature Engineering: Calculate log-transformed revenue
ddf_derived = ddf_filtered.assign(
log_revenue=np.log(ddf_filtered['revenue'])
)

# 3. Aggregation: Use standard dictionary aggregation syntax
ddf_grouped = ddf_derived.groupby('user_id').agg({
'revenue': 'sum',
'log_revenue': 'mean',
'timestamp_s': 'count'
})

# Rename the columns explicitly after aggregation for clarity (L: Load)
ddf_grouped = ddf_grouped.rename(columns={
'revenue': 'total_revenue',
'log_revenue': 'avg_log_revenue',
'timestamp_s': 'transaction_count'
})

# Trigger computation
start_time = time.time()
result_df_cpu = ddf_grouped.compute()
end_time = time.time()

print("--- ETL Complete ---")
print(f"Total GPU processing time: {end_time - start_time:.4f} seconds.")
return result_df_cpu

# --- Main Execution ---
def main():
cluster = None
client = None

# 1. Start the Dask Multi-GPU Cluster
try:
print(f"Starting LocalCUDACluster with {N_GPUS} GPU worker(s)...")
cluster = LocalCUDACluster(n_workers=N_GPUS)
client = Client(cluster)

print(f"🌐 Dask Dashboard link: {client.dashboard_link}")
print(f"Cluster started with {len(client.scheduler_info()['workers'])} GPU worker(s).")

# 2. Extract Data
ddf_in = generate_data(FILE_SIZE_MB, N_GPUS)

# 3. Transform and Load
result_df_cpu = run_etl(ddf_in)

# 4. Final Output and Verification
print("\n=== Final Aggregated Result (CPU Pandas) ===")
print(result_df_cpu.head())

except Exception as e:
print(f"\n❌ An error occurred during Dask-CUDA execution.")
print(f"Error: {e}")

finally:
# 5. Cleanup: Critical step to release GPU memory and resources
if client:
client.close()
if cluster:
cluster.close()
print("\n🛑 Dask Cluster shutdown.")

if __name__ == "__main__":
main()
31 changes: 31 additions & 0 deletions examples/data_engineering_pipline/nvidia-dask-rapids/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

ENV_NAME="dask_rapids_etl_venv"
CUDA_VERSION="12" # Based on nvcc output (12.8)

# 1. Create venv
rm -rf $ENV_NAME
python3 -m venv $ENV_NAME

# --- CRITICAL FIX: Set LD_LIBRARY_PATH immediately inside the environment ---
# This ensures the active shell session finds the CUDA libraries (12.8) that nvcc confirmed exist.
source $ENV_NAME/bin/activate
export LD_LIBRARY_PATH="/usr/lib/x86_64-linux-gnu:/usr/local/cuda/lib64:${LD_LIBRARY_PATH}"
echo "✅ LD_LIBRARY_PATH set for active environment."

# --- 2. Install Core ETL Packages (Dask + RAPIDS cu12) ---
echo "--- Installing Core ETL Packages (Dask + RAPIDS cu12) ---"

# Install Dask and core libraries
pip install pandas numpy dask distributed

# Install core RAPIDS packages explicitly targeting CUDA 12 (cu12)
# We trust the NVIDIA index to provide the compatible 25.10 wheel for CUDA 12.8.
pip install \
--extra-index-url=https://pypi.nvidia.com \
"cudf-cu${CUDA_VERSION}==25.10.*" \
"dask-cudf-cu${CUDA_VERSION}==25.10.*" \
"dask-cuda"

echo "--- Installation Complete ---"
echo "✅ Core packages installed. Run 'python dask_hybrid_etl.py' now."
Loading
Loading