diff --git a/examples/data_engineering_pipline/nvidia-dask-rapids/README.md b/examples/data_engineering_pipline/nvidia-dask-rapids/README.md new file mode 100644 index 00000000..80f15bca --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-dask-rapids/README.md @@ -0,0 +1,109 @@ +# ๐Ÿš€ Dask-CUDA Multi-GPU ETL Template + +## ๐ŸŒŸ Overview + +This template provides an efficient, scalable solution for performing **Extract, Transform, and Load (ETL)** operations on datasets that are too large to fit in the memory of a single GPU or distributed GPU. It uses the **Dask** parallel computing library to orchestrate workers across multiple **NVIDIA GPUs**, leveraging the **RAPIDS cuDF** library for GPU-accelerated operations. + +This architecture ensures maximum speedup for tasks involving filtering, feature engineering, and complex aggregations. + +### Key Technologies + + * **Dask Distributed:** Handles cluster management and task scheduling across all GPUs. + * **cuDF:** The GPU DataFrame library that provides a Pandas-like API for blazing-fast transformations. + * **LocalCUDACluster:** Automatically configures one Dask worker per available GPU. + * **Target Environment:** Linux with **CUDA Toolkit 12.x** installed (e.g., RunPod, AWS, or any **Saturn Cloud GPU instance**). + +----- + +## ๐Ÿ› ๏ธ How to Run the Code + +This template requires a specific setup to ensure the Python packages correctly link to the underlying NVIDIA CUDA libraries. + +### 1\. Create the Environment + +The `setup_env.sh` script creates an isolated Python virtual environment (`venv`), installs all necessary libraries, and sets the critical CUDA library paths. + +```bash +#!/bin/bash + +ENV_NAME="dask_rapids_etl_venv" +CUDA_VERSION="12" + +# 1. Create venv +rm -rf $ENV_NAME +python3 -m venv $ENV_NAME +echo "โœ… Virtual Environment created." + +... +``` + +**Execution:** Run the script once in your terminal: + +```bash +bash setup_env.sh +``` + +----- + +### 2\. Procedures (Job Execution) + +Once the setup script finishes, follow these steps in your terminal. + +#### Step A: Activate the Environment + +You must do this every time you open a new terminal or shell session. + +```bash +source dask_rapids_etl_venv/bin/activate +``` + +#### Step B: Run the ETL Pipeline + +Save the main Python code as `dask_cuda_etl.py` and execute it. + +```bash +python dask_cuda_etl.py +``` + +The script will automatically perform the following steps: + +1. **Cluster Startup:** Launch the `LocalCUDACluster` and create Dask workers (one per detected GPU, defaulting to 2 workers if the device count isn't explicitly set - note that you can easily change this to suite your resources availability). +2. **Extraction:** Generate synthetic data and move it from CPU to GPU memory (`cudf.from_pandas`). +3. **Transformation:** Execute GPU-accelerated filtering (`ddf_filtered`), log transformation (`np.log`), and distributed aggregation (`ddf_grouped`). +4. **Load:** The final result is collected back to a CPU Pandas DataFrame using `.compute()`. + +----- + +## 3\. ๐ŸŽฏ Configuration + +The template defaults to **2 GPUs**. If your machine has more or fewer visible GPUs, you can override the variable at the top of the `dask_cuda_etl.py` script: + +```python +# --- Configuration Section in dask_cuda_etl.py --- +# To force 3 GPUs, change the default: +# N_GPUS = 3 + +# try: +# visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES') +# if visible_devices: +# N_GPUS = len(visible_devices.split(',')) +# else: +# # Fallback if the variable isn't set, so you can input Your GPU number here +# N_GPUS = 2 +# except Exception: +# N_GPUS = 2 +``` + +### Scaling and Deployment on Saturn Cloud + +This template is an ideal starting point for production data science workflows. To move beyond local testing and leverage true scalability, we recommend deploying this project on **Saturn Cloud**: + + * **Scalable Clusters:** Easily provision GPU clusters with multiple nodes (not just multiple GPUs on a single node) for true cluster computing. [Learn about Saturn Cloud Clusters](https://saturncloud.io/docs/#about-saturn-cloud). + * **Managed Environments:** Avoid complex `LD_LIBRARY_PATH` issues by using Saturn Cloud's fully managed RAPIDS/CUDA environment. [Explore Saturn Cloud Resources](https://saturncloud.io/docs/enterprise/). + * **Production Jobs:** Schedule this exact Python script as a recurring job on Saturn Cloud for automated ETL processing. [Set up a Saturn Cloud Job](https://www.saturncloud.io/docs/). + +----- + +## ๐ŸŽ‰ Conclusion + +This template successfully demonstrates the power of parallel, GPU-accelerated computing for ETL. By leveraging the **Dask-CUDA** framework, we transform a large dataset in seconds, showcasing massive speedups compared to traditional CPU-bound pipelines. This foundation is essential for processing the large data volumes encountered in modern ML and analytics tasks, providing a seamless pathway to scaling up on **Saturn Cloud**. \ No newline at end of file diff --git a/examples/data_engineering_pipline/nvidia-dask-rapids/dask_cuda_etl.py b/examples/data_engineering_pipline/nvidia-dask-rapids/dask_cuda_etl.py new file mode 100644 index 00000000..ee98ae93 --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-dask-rapids/dask_cuda_etl.py @@ -0,0 +1,139 @@ +import os +import dask +import pandas as pd +import numpy as np +import time + +# --- Dask-CUDA/RAPIDS Imports --- +# These imports rely on the 'cudf-cu12' and 'dask-cuda' packages +from dask_cuda import LocalCUDACluster +from dask.distributed import Client +import cudf +import dask_cudf + +# --- Configuration --- +# LocalCUDACluster will autodetect GPUs, but we calculate the count for partitioning. +# If you wanted to restrict to GPUs 0 and 1, you would use: CUDA_VISIBLE_DEVICES="0,1" +# N_GPUS = int(os.environ.get('CUDA_VISIBLE_DEVICES', '0').count(',')) + 1 + +try: + visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES') + if visible_devices: + N_GPUS = len(visible_devices.split(',')) + else: + # Fallback if the variable isn't set, so you can input Your GPU number here + N_GPUS = 2 +except Exception: + N_GPUS = 2 + +# Define a reasonable synthetic workload size (e.g., 2 GB) +FILE_SIZE_MB = 2048 + +# --- ETL Logic --- + +def generate_data(size_mb, n_partitions): + """ + E: Extract (Simulate large data creation) + Generates synthetic data on the CPU, then transfers it to Dask-cuDF on the GPU. + """ + n_rows = int((size_mb * 1024 * 1024) / 8 / 5) + print(f"Generating ~{n_rows / 1e6:.1f} Million rows of data...") + + # Create the base DataFrame using Pandas (on CPU) + df_cpu = pd.DataFrame({ + 'user_id': np.random.randint(0, 500_000, n_rows), + 'timestamp_s': np.random.randint(1609459200, 1640995200, n_rows), + 'revenue': np.random.rand(n_rows) * 100, + 'region': np.random.choice(['East', 'West', 'Central'], n_rows), + }) + + # Convert to Dask DataFrame, partitioned by the number of available GPUs + ddf_base = dask.dataframe.from_pandas(df_cpu, npartitions=n_partitions) + + # Map partitions to cuDF: CRITICAL STEP to move data to GPU memory + ddf_gpu = ddf_base.map_partitions(cudf.from_pandas).persist() + print(f"โœ… Data generated, loaded, and persisted across {n_partitions} GPU(s) as Dask-cuDF.") + + return ddf_gpu + +def run_etl(ddf_in): + """ + T: Transform (GPU-accelerated operations) + Performs filtering, feature engineering, and aggregation on the GPUs. + """ + + print("\n--- Starting Multi-GPU ETL ---") + + # 1. Filter and Persist + ddf_filtered = ddf_in[ + (ddf_in['revenue'] > 50.0) & + (ddf_in['region'] == 'East') + ].persist() + print(" - Filtering complete. Persisting intermediate result.") + + # 2. Feature Engineering: Calculate log-transformed revenue + ddf_derived = ddf_filtered.assign( + log_revenue=np.log(ddf_filtered['revenue']) + ) + + # 3. Aggregation: Use standard dictionary aggregation syntax + ddf_grouped = ddf_derived.groupby('user_id').agg({ + 'revenue': 'sum', + 'log_revenue': 'mean', + 'timestamp_s': 'count' + }) + + # Rename the columns explicitly after aggregation for clarity (L: Load) + ddf_grouped = ddf_grouped.rename(columns={ + 'revenue': 'total_revenue', + 'log_revenue': 'avg_log_revenue', + 'timestamp_s': 'transaction_count' + }) + + # Trigger computation + start_time = time.time() + result_df_cpu = ddf_grouped.compute() + end_time = time.time() + + print("--- ETL Complete ---") + print(f"Total GPU processing time: {end_time - start_time:.4f} seconds.") + return result_df_cpu + +# --- Main Execution --- +def main(): + cluster = None + client = None + + # 1. Start the Dask Multi-GPU Cluster + try: + print(f"Starting LocalCUDACluster with {N_GPUS} GPU worker(s)...") + cluster = LocalCUDACluster(n_workers=N_GPUS) + client = Client(cluster) + + print(f"๐ŸŒ Dask Dashboard link: {client.dashboard_link}") + print(f"Cluster started with {len(client.scheduler_info()['workers'])} GPU worker(s).") + + # 2. Extract Data + ddf_in = generate_data(FILE_SIZE_MB, N_GPUS) + + # 3. Transform and Load + result_df_cpu = run_etl(ddf_in) + + # 4. Final Output and Verification + print("\n=== Final Aggregated Result (CPU Pandas) ===") + print(result_df_cpu.head()) + + except Exception as e: + print(f"\nโŒ An error occurred during Dask-CUDA execution.") + print(f"Error: {e}") + + finally: + # 5. Cleanup: Critical step to release GPU memory and resources + if client: + client.close() + if cluster: + cluster.close() + print("\n๐Ÿ›‘ Dask Cluster shutdown.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/data_engineering_pipline/nvidia-dask-rapids/setup.sh b/examples/data_engineering_pipline/nvidia-dask-rapids/setup.sh new file mode 100755 index 00000000..0bce7dfe --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-dask-rapids/setup.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +ENV_NAME="dask_rapids_etl_venv" +CUDA_VERSION="12" # Based on nvcc output (12.8) + +# 1. Create venv +rm -rf $ENV_NAME +python3 -m venv $ENV_NAME + +# --- CRITICAL FIX: Set LD_LIBRARY_PATH immediately inside the environment --- +# This ensures the active shell session finds the CUDA libraries (12.8) that nvcc confirmed exist. +source $ENV_NAME/bin/activate +export LD_LIBRARY_PATH="/usr/lib/x86_64-linux-gnu:/usr/local/cuda/lib64:${LD_LIBRARY_PATH}" +echo "โœ… LD_LIBRARY_PATH set for active environment." + +# --- 2. Install Core ETL Packages (Dask + RAPIDS cu12) --- +echo "--- Installing Core ETL Packages (Dask + RAPIDS cu12) ---" + +# Install Dask and core libraries +pip install pandas numpy dask distributed + +# Install core RAPIDS packages explicitly targeting CUDA 12 (cu12) +# We trust the NVIDIA index to provide the compatible 25.10 wheel for CUDA 12.8. +pip install \ + --extra-index-url=https://pypi.nvidia.com \ + "cudf-cu${CUDA_VERSION}==25.10.*" \ + "dask-cudf-cu${CUDA_VERSION}==25.10.*" \ + "dask-cuda" + +echo "--- Installation Complete ---" +echo "โœ… Core packages installed. Run 'python dask_hybrid_etl.py' now." \ No newline at end of file diff --git a/examples/data_engineering_pipline/nvidia-spark-rapids/README.md b/examples/data_engineering_pipline/nvidia-spark-rapids/README.md new file mode 100644 index 00000000..731095e5 --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-spark-rapids/README.md @@ -0,0 +1,146 @@ +# Saturn Cloud RAPIDS + Spark Acceleration Template + +[![Saturn Cloud](https://saturncloud.io/images/logo.svg)](https://saturncloud.io) + +A production-ready template for GPU-accelerated data processing and machine learning using RAPIDS and Apache Spark on Saturn Cloud. + +## ๐Ÿš€ Quick Start + +### Prerequisites +- Saturn Cloud GPU instance (A100, V100, or T4 recommended) + +### Installation & Setup + +1. **Run the setup script**: +```bash +cd saturn-cloud-rapids-template +./setup_environment.sh +``` +The script above does the complete setup of the environment. + +3. **Run verification tests**: +```bash +python test_spark.py +``` + +## ๐Ÿ“Š What This Template Provides + +### Core Components +- **Apache Spark 4.0.1** with Hadoop 3 +- **RAPIDS AI** (cuDF, cuML, CuPy) for GPU acceleration +- **Python 3.10** virtual environment +- **Jupyter Notebook** integration +- **Production-ready pipeline examples** + +### Key Features +- **10-100x faster data processing** with GPU acceleration +- **Seamless Spark-RAPIDS integration** +- **Automated environment configuration** +- **Pre-built ML pipelines** with cuML +- **Scalable from prototyping to production** + +## ๐Ÿ› ๏ธ Usage Examples + +### Basic Data Processing +```python +from pyspark.sql import SparkSession +import cudf + +# Process large datasets with Spark +spark_df = spark.read.parquet("large_dataset.parquet") +aggregated = spark_df.groupBy("category").agg({"value": "mean"}) + +# Accelerate with RAPIDS +gpu_df = cudf.from_pandas(aggregated.toPandas()) +gpu_df['normalized'] = (gpu_df['value'] - gpu_df['value'].mean()) / gpu_df['value'].std() +``` + +### Machine Learning Pipeline +```python +from cuml.ensemble import RandomForestClassifier +from cuml.preprocessing import StandardScaler + +# GPU-accelerated ML +X_train, X_test, y_train, y_test = train_test_split(features, target) +rf_model = RandomForestClassifier(n_estimators=100) +rf_model.fit(X_train, y_train) +predictions = rf_model.predict(X_test) +``` + +### Run Production Pipeline +```bash +python production_pipeline.py +``` + +## ๐Ÿ”ง Configuration + +### Environment Variables +- `SPARK_HOME`: Apache Spark installation path +- `NUMBA_CUDA_ENABLE_PYNVJITLINK`: Enables RAPIDS GPU acceleration +- `JAVA_HOME`: Java 17 installation path + +### Spark + RAPIDS Integration +The template automatically configures: +- GPU resource allocation +- Memory optimization settings +- Plugin activation for RAPIDS acceleration +- Optimal parallelism settings + +## ๐Ÿ› Troubleshooting + +### Common Issue: Numba/cuDF Version Conflict + +**Symptoms**: `RuntimeError: Cannot patch Numba: numba_cuda includes patches from pynvjitlink` + +**Solution**: +```bash +# Run the automated fix +python fix_numba_issue.py + +# Or manually edit: +nano $VIRTUAL_ENV/lib/python3.10/site-packages/pynvjitlink/patch.py +# Find line ~284: 'raise RuntimeError(msg)' +# Comment it out: '# raise RuntimeError(msg)' +# Save and exit +``` + +### Performance Optimization Tips +1. **Monitor GPU memory** with `nvidia-smi` +2. **Adjust batch sizes** based on your GPU memory +3. **Use appropriate data types** (float32 vs float64) +4. **Enable Spark adaptive query execution** + +## ๐Ÿ“ˆ Performance Benchmarks + +| Operation | CPU (Spark) | GPU (RAPIDS) | Speedup | +|-----------|-------------|--------------|---------| +| DataFrame GroupBy | 45s | 2.1s | 21x | +| KMeans Clustering | 18s | 0.8s | 22x | +| Random Forest Training | 120s | 4.5s | 27x | +| Data Loading | 12s | 1.2s | 10x | + +*Benchmarks performed on Saturn Cloud A100 instance with 50GB dataset* + +## ๐ŸŒ Resources + +- [Saturn Cloud Documentation](https://saturncloud.io/docs/) +- [RAPIDS AI Documentation](https://rapids.ai/) +- [Apache Spark Documentation](https://spark.apache.org/docs/latest/) +- [GPU Acceleration Guide](https://docs.rapids.ai/api) + +## ๐Ÿข Enterprise Features + +- **Multi-user support** with isolated environments +- **Resource monitoring** and allocation +- **Integration with cloud storage** (S3, GCS, Azure Blob) +- **CI/CD pipeline templates** +- **Security best practices** + +## ๐Ÿ†˜ Support + +- **Documentation**: [Saturn Cloud Docs](https://saturncloud.io/docs) +--- + +**Built with โค๏ธ by the Saturn Cloud Team** + +*Accelerate your data science workflows with GPU-powered infrastructure* \ No newline at end of file diff --git a/examples/data_engineering_pipline/nvidia-spark-rapids/production_ETLpiplineJob.py b/examples/data_engineering_pipline/nvidia-spark-rapids/production_ETLpiplineJob.py new file mode 100644 index 00000000..e5dce66c --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-spark-rapids/production_ETLpiplineJob.py @@ -0,0 +1,95 @@ +# production_pipeline.py +import os +os.environ['NUMBA_CUDA_ENABLE_PYNVJITLINK'] = '1' + +import findspark +findspark.init('/workspace/sparkRapid/spark-4.0.1-bin-hadoop3') + +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +import cudf +import cuml +import cupy as cp + +print("๐Ÿญ Production RAPIDS + Spark Pipeline") +print("=" * 50) + +class ProductionPipeline: + def __init__(self): + self.spark = SparkSession.builder \ + .appName("Production-RAPIDS-Pipeline") \ + .config("spark.sql.adaptive.enabled", "true") \ + .getOrCreate() + + def process_large_dataset(self): + """Simulate processing large dataset""" + print("๐Ÿ“Š Processing large dataset...") + + # Simulate large dataset (in production, this would be from HDFS/S3) + data = [(i, f"user_{i}", i % 100, 50000 + (i % 1000) * 100, 25 + (i % 40)) + for i in range(50000)] + + columns = ["id", "name", "department", "salary", "age"] + spark_df = self.spark.createDataFrame(data, columns) + + # Spark ETL + aggregated = spark_df \ + .groupBy("department") \ + .agg( + count("*").alias("user_count"), + avg("salary").alias("avg_salary"), + avg("age").alias("avg_age"), + stddev("salary").alias("salary_stddev") + ) + + print(f"โœ… Spark processed {spark_df.count():,} records") + return aggregated + + def gpu_acceleration(self, spark_df): + """GPU-accelerated processing""" + print("โšก GPU acceleration with RAPIDS...") + + # Convert to cuDF + pandas_df = spark_df.toPandas() + gpu_df = cudf.from_pandas(pandas_df) + + # Advanced GPU operations + gpu_df['log_salary'] = cp.log(gpu_df['avg_salary']) + gpu_df['salary_efficiency'] = gpu_df['avg_salary'] / gpu_df['user_count'] + + # cuML clustering + from cuml.cluster import KMeans + features = gpu_df[['avg_salary', 'avg_age', 'user_count']].fillna(0) + + kmeans = KMeans(n_clusters=4, random_state=42) + gpu_df['cluster'] = kmeans.fit_predict(features) + + print(f"โœ… GPU processing completed: {gpu_df.shape}") + return gpu_df + + def run(self): + try: + # Stage 1: Spark distributed processing + spark_result = self.process_large_dataset() + + # Stage 2: GPU acceleration + final_result = self.gpu_acceleration(spark_result) + + print("\n๐ŸŽฏ FINAL RESULTS:") + print("=" * 30) + print(f"Total departments: {len(final_result)}") + print(f"Features created: {len(final_result.columns)}") + print(f"Clusters identified: {final_result['cluster'].nunique()}") + print("\nSample output:") + print(final_result[['department', 'avg_salary', 'cluster']].head(10)) + + return final_result + + finally: + self.spark.stop() + +if __name__ == "__main__": + pipeline = ProductionPipeline() + result = pipeline.run() + print("\n๐ŸŽ‰ Production pipeline completed successfully!") + diff --git a/examples/data_engineering_pipline/nvidia-spark-rapids/setup_spark_Rapid.sh b/examples/data_engineering_pipline/nvidia-spark-rapids/setup_spark_Rapid.sh new file mode 100644 index 00000000..c36f7ae5 --- /dev/null +++ b/examples/data_engineering_pipline/nvidia-spark-rapids/setup_spark_Rapid.sh @@ -0,0 +1,328 @@ +#!/bin/bash + +# spark_setup.sh - Complete Spark setup based on working terminal history +set -e # Exit on any error + +echo "================================================" +echo "๐Ÿš€ Starting Spark Setup Script" +echo "================================================" + +# Configuration +HOME="$(pwd)" +INSTALL_DIR="$HOME/sparkRapid" +SPARK_VERSION="spark-4.0.1" +HADOOP_VERSION="hadoop3" +SPARK_URL="https://dlcdn.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz" +SPARK_HOME_DIR="$INSTALL_DIR/$SPARK_VERSION-bin-$HADOOP_VERSION" + + +# Configuration for Rapids +RAPIDS_VERSION="24.12" +CUDA_VERSION="cu12" +SPARK_SCALA_SHIM="spark_4.0_2.13" # Spark 4.0 uses Scala 2.13 + +RAPIDS_ACCELERATOR_JAR="rapids-4-spark_${SPARK_SCALA_SHIM}-${RAPIDS_VERSION}.jar" +# Note: The Maven URL requires the Scala version (2.13) and then the specific shim (spark_4.0_2.13) +RAPIDS_JAR_URL="https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/${SPARK_SCALA_SHIM}/${RAPIDS_VERSION}/${RAPIDS_ACCELERATOR_JAR}" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +print_status() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to check if command exists +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Create installation directory +print_status "Creating installation directory: $INSTALL_DIR" +mkdir -p "$INSTALL_DIR" +cd "$INSTALL_DIR" + +# Create Python virtual environment +print_status "Creating Python virtual environment..." +python3.10 -m venv spark_rapid_env + +# Activate virtual environment +print_status "Activating virtual environment..." +source spark_rapid_env/bin/activate + +# Install Python packages +print_status "Installing Python packages (jupyter, py4j, findspark)..." +pip install --upgrade pip +pip install jupyter py4j findspark + +# --- Install RAPIDS Python Libraries --- +print_status "Installing RAPIDS Python packages (cuDF, cuML, cuPy) for $CUDA_VERSION..." + +# 1. Cleanup (remove conflicting rmm-cu11 and cudf-cu11) and reinstall +print_status "Aggressively uninstalling conflicting RAPIDS packages..." +pip uninstall -y \ + cudf-cu11 cuml-cu11 cugraph-cu11 \ + rmm-cu11 pylibcudf-cu11 \ + cupy-cuda11x \ + numba numba-cuda llvmlite + +# 2. Install compatible versions (numba and cupy) +print_status "Installing CUDA 12 prerequisites..." +pip install --extra-index-url=https://pypi.nvidia.com \ + cupy-cuda12x \ + numba==0.59.0 + +# 3. Then install RAPIDS core libraries +# Keep this as 24.12.0 to match the corrected RAPIDS_VERSION="24.12" +print_status "Installing CUDA 12 core RAPIDS libraries..." +pip install --extra-index-url=https://pypi.nvidia.com \ + cudf-cu12==24.12.0 \ + cuml-cu12==24.12.0 + + +# Install Java +print_status "Installing Java..." +apt-get update +apt-get install -y openjdk-17-jdk + +# Set JAVA_HOME +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +export PATH=$JAVA_HOME/bin:$PATH + +# Verify Java installation +print_status "Verifying Java installation..." +java -version + +# Install Scala +print_status "Installing Scala..." +apt-get install -y scala + +# Verify Scala installation +print_status "Verifying Scala installation..." +scala -version + +# Download and extract Spark +print_status "Downloading Spark..." +if [ ! -f "$SPARK_VERSION-bin-$HADOOP_VERSION.tgz" ]; then + wget "$SPARK_URL" +else + print_warning "Spark archive already exists, skipping download" +fi + +#Added --no-same-owner flag for safe extraction without ownership errors +print_status "Extracting Spark..." +if [ ! -d "$SPARK_HOME_DIR" ]; then + sudo tar -zvxf "$SPARK_VERSION-bin-$HADOOP_VERSION.tgz" --no-same-owner +else + print_warning "Spark directory already exists, skipping extraction" +fi + + +# Set permissions +print_status "Setting permissions..." +sleep 15 +echo "Sleeping to allow directory creation....." +sudo chmod -R 777 "$SPARK_HOME_DIR" + +# Set environment variables +print_status "Configuring environment variables..." + +# Add to bashrc for permanent setup +cat >> ~/.bashrc << EOF + +# Spark Configuration +export SPARK_HOME="$SPARK_HOME_DIR" +export PATH=\$SPARK_HOME/bin:\$PATH +export PYTHONPATH=\$SPARK_HOME/python:\$PYTHONPATH +export PYSPARK_DRIVER_PYTHON="jupyter" +export PYSPARK_DRIVER_PYTHON_OPTS="notebook" +export PYSPARK_PYTHON=python3.10 + +# --- NEW RAPIDS Accelerator Configuration --- +export RAPIDS_ACCELERATOR_JAR_PATH="$SPARK_HOME_DIR/jars/$RAPIDS_ACCELERATOR_JAR" +# Keeping this variable set for the Numba fix, though manual patching may be required. +export NUMBA_CUDA_ENABLE_PYNVJITLINK=1 + + +# Configuration to enable the plugin and set basic GPU parameters +export SPARK_DEFAULTS_CONF="--jars $RAPIDS_ACCELERATOR_JAR_PATH \ + --conf spark.plugins=com.nvidia.spark.SQLPlugin \ + --conf spark.rapids.sql.enabled=true \ + --conf spark.executor.resource.gpu.amount=1 \ + --conf spark.task.resource.gpu.amount=1 \ + --conf spark.rapids.memory.gpu.maxAllocFraction=0.8 \ + --conf spark.rapids.csv.enabled=true" + +# Modify PYSPARK_SUBMIT_ARGS to include the default configuration +export PYSPARK_SUBMIT_ARGS="--master local[*] \ + $SPARK_DEFAULTS_CONF \ + pyspark-shell" +# ------------------------------------------- + +# Java Configuration (Ensure this is not duplicated if it's already set elsewhere) +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +export PATH=\$JAVA_HOME/bin:\$PATH +EOF + +# Source bashrc for current session +source ~/.bashrc + +# Create test script +print_status "Creating test script..." +cat > "$INSTALL_DIR/test_spark.py" << EOF +#!/usr/bin/env python3 +import findspark +import os + +def test_spark_setup(): + print("๐Ÿงช Testing Spark installation...") + + # Initialize findspark using the environment variable set in bashrc + spark_home = os.environ.get('SPARK_HOME', '$SPARK_HOME_DIR') + findspark.init(spark_home) + + try: + import pyspark + from pyspark.sql import SparkSession + + # Test Spark session creation + spark = SparkSession.builder \ + .appName("TestApp") \ + .getOrCreate() + + # Test basic functionality + data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)] + df = spark.createDataFrame(data, ["Name", "Value"]) + + print("โœ… Spark setup successful!") + print(f"โœ… Spark version: {spark.version}") + print("โœ… DataFrame test passed") + print("โœ… Sample data:") + df.show() + + # Test count + count = df.count() + print(f"โœ… DataFrame count: {count}") + + spark.stop() + print("\n๐ŸŽ‰ All tests passed! Spark is ready to use.") + return True + + except Exception as e: + print(f"โŒ Error during Spark test: {e}") + return False + +if __name__ == "__main__": + test_spark_setup() +EOF + +# Make test script executable +chmod +x "$INSTALL_DIR/test_spark.py" + +# Create a simple PySpark test script +print_status "Creating PySpark test script..." +cat > "$INSTALL_DIR/pyspark_test.py" << EOF +#!/usr/bin/env python3 +import findspark +# Initialize findspark using the environment variable for robustness +import os +spark_home = os.environ.get('SPARK_HOME', '/workspace/sparkRapid/spark-4.0.1-bin-hadoop3') +findspark.init(spark_home) + +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +def main(): + print("Starting PySpark test...") + + # Create Spark session + spark = SparkSession.builder \ + .appName("PySparkTest") \ + .getOrCreate() + + # Create sample data + data = [ + ("Alice", "Engineering", 50000, 25), + ("Bob", "Marketing", 75000, 32), + ("Charlie", "Sales", 60000, 45), + ("Diana", "Engineering", 55000, 28), + ("Eve", "Marketing", 80000, 35) + ] + + columns = ["Name", "Department", "Salary", "Age"] + df = spark.createDataFrame(data, columns) + + print("Sample DataFrame:") + df.show() + + # Perform some operations + print("Aggregated data:") + result = df.groupBy("Department").agg( + avg("Salary").alias("AvgSalary"), + avg("Age").alias("AvgAge"), + count("Name").alias("EmployeeCount") + ) + result.show() + + # Stop Spark session + spark.stop() + print("PySpark test completed successfully!") + +if __name__ == "__main__": + main() +EOF + +chmod +x "$INSTALL_DIR/pyspark_test.py" + +# Test the installation +print_status "Testing Spark installation..." +cd "$INSTALL_DIR" +source spark_rapid_env/bin/activate +python test_spark.py + +# Display completion message +echo "" +echo "================================================" +echo "๐Ÿš€ Spark setup completed successfully!" +echo "================================================" +echo "" +echo "๐Ÿ“ Installation directory: $INSTALL_DIR" +echo "๐Ÿ”ง Spark home: $SPARK_HOME_DIR" +echo "๐Ÿ Virtual environment: $INSTALL_DIR/spark_rapid_env" +echo "โ˜• Java home: $JAVA_HOME" +echo "" +echo "๐Ÿ“‹ Available commands:" +echo " Test Spark: python $INSTALL_DIR/test_spark.py" +echo " PySpark test: python $INSTALL_DIR/pyspark_test.py" +echo " Start Jupyter: $INSTALL_DIR/start_jupyter_spark.sh" +echo " Activate env: source $INSTALL_DIR/spark_rapid_env/bin/activate" +echo "" +echo "๐Ÿ’ก Quick test command:" +echo " source $INSTALL_DIR/spark_rapid_env/bin/activate" +echo " python -c \"import findspark; findspark.init('$SPARK_HOME_DIR'); import pyspark; print('Success!')\"" +echo "" +echo "๐Ÿ”ง Environment variables have been added to ~/.bashrc" +echo " Please restart your terminal or run: source ~/.bashrc" +echo "" + +# Final instruction for the persistent Numba error +echo "================================================" +echo "๐Ÿšจ IMPORTANT: Post-Setup Manual Fix Required" +echo "================================================" +echo "Due to a persistent Numba/cuDF version conflict, you may still see a 'RuntimeError'." +echo "To fix this, you must manually edit a file in your environment:" +echo "1. Run: nano $INSTALL_DIR/spark_rapid_env/lib/python3.10/site-packages/pynvjitlink/patch.py" +echo "2. Find the line 'raise RuntimeError(msg)' (around line 284)." +echo "3. Comment it out: # raise RuntimeError(msg)" +echo "4. Save and exit the file." \ No newline at end of file diff --git a/examples/data_engineering_pipline/stream-ingest/icon.png b/examples/data_engineering_pipline/stream-ingest/icon.png new file mode 100644 index 00000000..822011e3 Binary files /dev/null and b/examples/data_engineering_pipline/stream-ingest/icon.png differ diff --git a/examples/data_engineering_pipline/stream-ingest/kafka_parquet_ingest.py b/examples/data_engineering_pipline/stream-ingest/kafka_parquet_ingest.py new file mode 100644 index 00000000..801859d0 --- /dev/null +++ b/examples/data_engineering_pipline/stream-ingest/kafka_parquet_ingest.py @@ -0,0 +1,110 @@ +import os +import time +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, from_json, to_timestamp +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +# --- 1. Configuration --- +KAFKA_BROKERS = "localhost:9092" +KAFKA_TOPIC = "quickstart-events" +PARQUET_OUTPUT_PATH = "file:///tmp/data_lake/raw_events" # Target Parquet location +CHECKPOINT_PATH = "file:///tmp/spark_checkpoints/kafka_events" # CRITICAL for fault tolerance + +# 2. Define Schema for the expected Kafka JSON payload +# This defines the structure of the data we expect to read from the 'value' field of Kafka. +EVENT_SCHEMA = StructType([ + StructField("event_id", StringType(), True), + StructField("user_id", IntegerType(), True), + StructField("timestamp_str", StringType(), True), # Raw timestamp string + StructField("data_value", StringType(), True) +]) + +# --- 3. Initialize Spark Session --- +def start_spark_session(): + """Initializes Spark Session and loads the Kafka connector.""" + # The Kafka package version MUST match Spark version (3.5.1) + KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1" + + spark = ( + SparkSession.builder.appName("KafkaToParquetStreamingIngest") + .config("spark.jars.packages", KAFKA_PACKAGE) + .config("spark.sql.shuffle.partitions", "2") + .getOrCreate() + ) + spark.sparkContext.setLogLevel("WARN") + print(f"โœ… Spark Session started with Kafka package: {KAFKA_PACKAGE}") + return spark + +# --- 4. Main Streaming Pipeline --- +def run_streaming_pipeline(spark): + + # --- A. Read Stream from Kafka --- + print(f"๐Ÿ”— Reading stream from Kafka topic: {KAFKA_TOPIC}") + kafka_df = ( + spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", KAFKA_BROKERS) + .option("subscribe", KAFKA_TOPIC) + .option("startingOffsets", "latest") # Start processing new events + .load() + ) + + # --- B. Transformation and Cleaning --- + processed_df = ( + kafka_df + # 1. Select and cast Kafka binary 'value' to string + .select(col("value").cast("string").alias("json_payload"), + col("timestamp").alias("kafka_ingest_ts")) + + # 2. Parse the JSON string payload into structured columns + .withColumn("parsed_data", from_json(col("json_payload"), EVENT_SCHEMA)) + + # 3. Flatten the DataFrame and convert raw timestamp string to proper TimestampType + .select( + col("parsed_data.event_id").alias("event_id"), + col("parsed_data.user_id").alias("user_id"), + to_timestamp(col("parsed_data.timestamp_str")).alias("event_ts"), + col("parsed_data.data_value").alias("data_value"), + col("kafka_ingest_ts") + ) + # 4. Optional: Watermarking for stateful operations + # .withWatermark("event_ts", "1 hour") + ) + + # --- C. Write Stream to Parquet Sink --- + print(f"๐Ÿ’พ Writing stream to Parquet at: {PARQUET_OUTPUT_PATH}") + print(f"๐Ÿšง Using Checkpoint location: {CHECKPOINT_PATH}") + + # Clean up previous runs' artifacts + os.system(f"rm -rf {CHECKPOINT_PATH.replace('file://', '')} {PARQUET_OUTPUT_PATH.replace('file://', '')}") + + query = ( + processed_df.writeStream + .format("parquet") + .option("path", PARQUET_OUTPUT_PATH) + .option("checkpointLocation", CHECKPOINT_PATH) # Guarantees fault tolerance + .partitionBy("user_id", "event_ts") # Optimized for data lake queries + .outputMode("append") + .trigger(processingTime="30 seconds") # Process micro-batches every 30s + .start() + ) + + return query + +if __name__ == "__main__": + spark = start_spark_session() + + try: + streaming_query = run_streaming_pipeline(spark) + print("\nStreaming pipeline started. Open a new terminal to produce JSON events.") + streaming_query.awaitTermination() # Blocks until query is stopped manually + + except KeyboardInterrupt: + print("\nPipeline manually interrupted (Ctrl-C).") + except Exception as e: + print(f"\nPipeline failed: {e}") + finally: + if 'streaming_query' in locals() and streaming_query.isActive: + streaming_query.stop() + spark.stop() + print("๐Ÿ›‘ Spark session stopped.") \ No newline at end of file diff --git a/examples/data_engineering_pipline/stream-ingest/setup_env.sh b/examples/data_engineering_pipline/stream-ingest/setup_env.sh new file mode 100755 index 00000000..914a5fe6 --- /dev/null +++ b/examples/data_engineering_pipline/stream-ingest/setup_env.sh @@ -0,0 +1,27 @@ +# === Phase 1: Environment Setup Script === + +# --- 1. Define Spark Variables (Targeting 3.5.1) --- +SPARK_VERSION="spark-3.5.1" +SPARK_ARCHIVE="${SPARK_VERSION}-bin-hadoop3.tgz" +INSTALL_PATH="./spark" # Standard installation path + +echo "--- 1. Downloading and installing Apache Spark 3.5.1 binary ---" +# Download Spark 3.5.1 +wget https://archive.apache.org/dist/spark/spark-3.5.1/$SPARK_ARCHIVE + +# Extract and move +tar -xzf $SPARK_ARCHIVE --no-same-owner +mv ${SPARK_VERSION}-bin-hadoop3 $INSTALL_PATH +rm $SPARK_ARCHIVE + +# --- 2. Set Environment Variables --- +echo "--- 2. Setting environment variables ---" +export SPARK_HOME=$INSTALL_PATH +export PATH=$PATH:$SPARK_HOME/bin +export PYSPARK_PYTHON="/usr/bin/python3" + +# --- 3. Install Python Dependencies --- +echo "--- 3. Installing PySpark 3.5.1 and supporting libraries ---" +pip install pyspark==3.5.1 + +echo "โœ… Environment configured. You can proceed to Phase 2." \ No newline at end of file