Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions vortex-array/src/arrays/dict/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ pub struct DictArray {
pub(super) all_values_referenced: bool,
}

pub struct DictArrayParts {
pub codes: ArrayRef,
pub values: ArrayRef,
pub dtype: DType,
}

impl DictArray {
/// Build a new `DictArray` without validating the codes or values.
///
Expand Down Expand Up @@ -114,8 +120,12 @@ impl DictArray {
Ok(unsafe { Self::new_unchecked(codes, values) })
}

pub fn into_parts(self) -> (ArrayRef, ArrayRef) {
(self.codes, self.values)
pub fn into_parts(self) -> DictArrayParts {
DictArrayParts {
codes: self.codes,
values: self.values,
dtype: self.dtype,
}
}

#[inline]
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/arrow/executor/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use vortex_error::vortex_bail;
use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::arrays::DictArray;
use crate::arrays::DictArrayParts;
use crate::arrays::DictVTable;
use crate::arrow::ArrowArrayExecutor;

Expand Down Expand Up @@ -47,7 +48,7 @@ fn dict_to_dict(
values_type: &DataType,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrowArrayRef> {
let (codes, values) = array.into_parts();
let DictArrayParts { codes, values, .. } = array.into_parts();
let codes = codes.execute_arrow(Some(codes_type), ctx)?;
let values = values.execute_arrow(Some(values_type), ctx)?;

Expand Down
8 changes: 4 additions & 4 deletions vortex-cuda/benches/for_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ fn benchmark_for_u8(c: &mut Criterion) {
&for_array,
|b, for_array| {
b.iter_custom(|iters| {
let mut cuda_ctx = CudaSession::new_ctx(VortexSession::empty())
let mut cuda_ctx = CudaSession::create_execution_ctx(VortexSession::empty())
.vortex_expect("failed to create execution context");

let encoded = for_array.encoded();
Expand Down Expand Up @@ -248,7 +248,7 @@ fn benchmark_for_u16(c: &mut Criterion) {
&for_array,
|b, for_array| {
b.iter_custom(|iters| {
let mut cuda_ctx = CudaSession::new_ctx(VortexSession::empty())
let mut cuda_ctx = CudaSession::create_execution_ctx(VortexSession::empty())
.vortex_expect("failed to create execution context");

let encoded = for_array.encoded();
Expand Down Expand Up @@ -297,7 +297,7 @@ fn benchmark_for_u32(c: &mut Criterion) {
&for_array,
|b, for_array| {
b.iter_custom(|iters| {
let mut cuda_ctx = CudaSession::new_ctx(VortexSession::empty())
let mut cuda_ctx = CudaSession::create_execution_ctx(VortexSession::empty())
.vortex_expect("failed to create execution context");

let encoded = for_array.encoded();
Expand Down Expand Up @@ -346,7 +346,7 @@ fn benchmark_for_u64(c: &mut Criterion) {
&for_array,
|b, for_array| {
b.iter_custom(|iters| {
let mut cuda_ctx = CudaSession::new_ctx(VortexSession::empty())
let mut cuda_ctx = CudaSession::create_execution_ctx(VortexSession::empty())
.vortex_expect("failed to create execution context");

let encoded = for_array.encoded();
Expand Down
105 changes: 8 additions & 97 deletions vortex-cuda/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ use cudarc::driver::LaunchArgs;
use cudarc::driver::result;
use cudarc::driver::result::memcpy_htod_async;
use cudarc::driver::sys;
use cudarc::driver::sys::CUevent_flags;
use futures::future::BoxFuture;
use kanal::Sender;
use result::stream;
use vortex_array::Array;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::VortexSessionExecute;
use vortex_array::ExecutionCtx;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Buffer;
use vortex_dtype::PType;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use crate::CudaDeviceBuffer;
use crate::CudaSession;
Expand Down Expand Up @@ -115,109 +113,23 @@ pub struct CudaKernelEvents {
pub after_launch: CudaEvent,
}

/// Convenience macro to launch a CUDA kernel.
///
/// The kernel gets launched on the stream of the execution context.
///
/// The kernel launch config:
/// LaunchConfig {
/// grid_dim: (array.len() / 2048, 1, 1),
/// block_dim: (64, 1, 1),
/// shared_mem_bytes: 0,
/// };
/// 64 threads are used per block which corresponds to 2 warps.
/// Each block handles 2048 elements. Each thread handles 32 elements.
/// The last block and thread are allowed to have less elements.
///
/// Note: A macro is necessary to unroll the launch builder arguments.
///
/// # Returns
///
/// A pair of CUDA events submitted before and after the kernel.
/// Depending on `CUevent_flags` these events can contain timestamps. Use
/// `CU_EVENT_DISABLE_TIMING` for minimal overhead and `CU_EVENT_DEFAULT` to
/// enable timestamps.
#[macro_export]
macro_rules! launch_cuda_kernel {
(
execution_ctx: $ctx:expr,
module: $module:expr,
ptypes: $ptypes:expr,
launch_args: [$($arg:expr),* $(,)?],
event_recording: $event_recording:expr,
array_len: $len:expr
) => {{
let cuda_function = $ctx.load_function($module, $ptypes)?;
let mut launch_builder = $ctx.launch_builder(&cuda_function);

$(
launch_builder.arg(&$arg);
)*

$crate::executor::launch_cuda_kernel_impl(&mut launch_builder, $event_recording, $len)?
}};
}

/// Launches a CUDA kernel with the passed launch builder.
///
/// # Arguments
///
/// * `launch_builder` - Configured launch builder
/// * `array_len` - Length of the array to process
///
/// # Returns
///
/// A pair of CUDA events submitted before and after the kernel.
/// Depending on `CUevent_flags` these events can contain timestamps. Use
/// `CU_EVENT_DISABLE_TIMING` for minimal overhead and `CU_EVENT_DEFAULT` to
/// enable timestamps.
pub fn launch_cuda_kernel_impl(
launch_builder: &mut LaunchArgs,
event_flags: CUevent_flags,
array_len: usize,
) -> VortexResult<CudaKernelEvents> {
let num_chunks = u32::try_from(array_len.div_ceil(2048))?;

let config = cudarc::driver::LaunchConfig {
grid_dim: (num_chunks, 1, 1),
block_dim: (64, 1, 1),
shared_mem_bytes: 0,
};

launch_builder.record_kernel_launch(event_flags);

unsafe {
launch_builder
.launch(config)
.map_err(|e| vortex_err!("Failed to launch kernel: {}", e))
.and_then(|events| {
events
.ok_or_else(|| vortex_err!("CUDA events not recorded"))
.map(|(before_launch, after_launch)| CudaKernelEvents {
before_launch,
after_launch,
})
})
}
}

/// CUDA execution context.
///
/// Provides access to the CUDA context and stream for kernel execution.
/// Handles memory allocation and data transfers between host and device.
pub struct CudaExecutionCtx {
stream: Arc<CudaStream>,
vortex_session: VortexSession,
ctx: ExecutionCtx,
cuda_session: CudaSession,
}

impl CudaExecutionCtx {
/// Creates a new CUDA execution context.
pub(crate) fn new(stream: Arc<CudaStream>, vortex_session: VortexSession) -> Self {
let cuda_session = vortex_session.cuda_session().clone();
pub(crate) fn new(stream: Arc<CudaStream>, ctx: ExecutionCtx) -> Self {
let cuda_session = ctx.session().cuda_session().clone();
Self {
stream,
vortex_session,
ctx,
cuda_session,
}
}
Expand Down Expand Up @@ -351,17 +263,16 @@ pub trait CudaArrayExt: Array {
#[async_trait]
impl CudaArrayExt for ArrayRef {
async fn execute_cuda(self, ctx: &mut CudaExecutionCtx) -> VortexResult<Canonical> {
if self.is_canonical() {
return self.to_canonical();
if self.is_canonical() || self.is_empty() {
return self.execute(&mut ctx.ctx);
}

let Some(support) = ctx.cuda_session.kernel(&self.encoding_id()) else {
tracing::debug!(
encoding = %self.encoding_id(),
"No CUDA support registered for encoding, falling back to CPU execution"
);
let mut array_ctx = ctx.vortex_session.create_execution_ctx();
return self.execute(&mut array_ctx);
return self.execute(&mut ctx.ctx);
};

tracing::debug!(
Expand Down
32 changes: 32 additions & 0 deletions vortex-cuda/src/kernel/arrays/dict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use async_trait::async_trait;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::DictVTable;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::executor::CudaExecute;
use crate::executor::CudaExecutionCtx;

/// CUDA executor for dictionary-encoded arrays.
#[derive(Debug)]
pub struct DictExecutor;

#[async_trait]
impl CudaExecute for DictExecutor {
async fn execute(
&self,
array: ArrayRef,
_ctx: &mut CudaExecutionCtx,
) -> VortexResult<Canonical> {
let _dict_array = array
.try_into::<DictVTable>()
.ok()
.vortex_expect("Array is not a Dict array");

todo!()
}
}
5 changes: 5 additions & 0 deletions vortex-cuda/src/kernel/arrays/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

mod dict;
pub use dict::DictExecutor;
Loading
Loading