Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 19, 2026

Summary

This PR adds a native (Rust-based) implementation of ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow format.

Note that this is an experimental feature that is disabled by default. There are still some known bugs being fixed in the follow-on PR #3228, which enables this feature by default and therefore achieves much higher test coverage.

Performance

Although this native implementation does not provide significant speed improvements, the main benefit is reduced GC pressure by avoiding Java allocations.

Benchmark Spark CometColumnarToRowExec CometNativeColumnarToRowExec
Fixed Width Only 82ms (1.0X) 77ms (1.1X) 77ms (1.1X)
Primitive Types 102ms (1.0X) 92ms (1.1X) 114ms (0.9X)
String Types 165ms (1.0X) 117ms (1.4X) 124ms (1.3X)
Struct Types 227ms (1.0X) 249ms (0.9X) 239ms (0.9X)
Array Types 214ms (1.0X) 153ms (1.4X) 162ms (1.3X)
Map Types 406ms (1.0X) 323ms (1.3X) 300ms (1.4X)
Complex Nested 477ms (1.0X) 405ms (1.2X) 406ms (1.2X)
Wide Rows 762ms (1.0X) 634ms (1.2X) 663ms (1.1X)

Configuration:

The feature is disabled by default and can be enabled by setting:

spark.comet.exec.columnarToRow.native.enabled=true

This implementation converts Arrow columnar batches to Spark's UnsafeRow format in native Rust code. The design draws inspiration from the columnar-to-row conversion approach in Gluten and Velox.

Key Optimizations

  • Pre-downcast Pattern: Following Velox's approach, all type dispatch happens once at batch initialization rather than per-row.
    Column arrays are pre-downcast into typed enums (TypedArray for top-level columns, TypedElements for nested elements), eliminating virtual dispatch overhead in the inner loop.
  • Direct Offset Access: For arrays and maps, we access the underlying offset buffers directly (list_array.value_offsets()) instead of creating per-row sliced ArrayRefs. This avoids allocation overhead for each element access.
  • Bulk Memory Copy: Fixed-width primitive arrays without nulls use direct memcpy from Arrow buffers to the output buffer, bypassing per-element processing entirely.
  • Zero-Copy Variable-Length Data: Strings and binary data are written directly to the output buffer without intermediate Java object allocation.

Supported Types

  • Primitives: Boolean, Int8/16/32/64, Float32/64, Date32, Timestamp (microseconds)
  • Decimal128 (both compact and large precision)
  • Variable-length: String, Binary (including Large variants)
  • Complex: Struct, Array, Map with arbitrary nesting depth

Test plan

  • Rust unit tests for conversion logic
  • Comprehensive Scala test suite (CometNativeColumnarToRowSuite) with 25 tests covering:
    • All primitive types with and without nulls
    • Variable-length types (String, Binary, large Decimal)
    • Complex types (Struct, Array, Map)
    • Deeply nested types (Array of Arrays, Map with Array values, Struct containing Array of Maps)
    • Fuzz testing with randomly generated nested schemas
    • Edge cases (empty batches, all nulls, large batches)
  • Performance benchmarks comparing native vs JVM implementation

🤖 Generated with Claude Code

This PR adds an experimental native (Rust-based) implementation of
ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow
format.

Benefits over the current Scala implementation:
- Zero-copy for variable-length types: String and Binary data is written
  directly to the output buffer without intermediate Java object allocation
- Vectorized processing: The native implementation processes data in a
  columnar fashion, improving CPU cache utilization
- Reduced GC pressure: All conversion happens in native memory, avoiding
  the creation of temporary Java objects that would need garbage collection
- Buffer reuse: The output buffer is allocated once and reused across
  batches, minimizing memory allocation overhead

The feature is disabled by default and can be enabled by setting:
  spark.comet.exec.columnarToRow.native.enabled=true

Supported data types:
- Primitive types: Boolean, Byte, Short, Int, Long, Float, Double
- Date and Timestamp (microseconds)
- Decimal (both inline precision<=18 and variable-length precision>18)
- String and Binary
- Complex types: Struct, Array, Map (nested)

This is an experimental feature for evaluation and benchmarking purposes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@andygrove andygrove changed the title [EXPERIMENTAL] Native columnar to row conversion feat: [EXPERIMENTAL] Native columnar to row conversion Jan 19, 2026
andygrove and others added 13 commits January 19, 2026 13:31
Spark's UnsafeArrayData uses the actual primitive size for elements (e.g.,
4 bytes for INT32), not always 8 bytes like UnsafeRow fields. This fix:

- Added get_element_size() to determine correct sizes for each type
- Added write_array_element() to write values with type-specific widths
- Updated write_list_data() and write_map_data() to use correct sizes
- Added LargeUtf8/LargeBinary support for struct fields
- Added comprehensive test suite (CometNativeColumnarToRowSuite)
- Updated compatibility documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a fuzz test using FuzzDataGenerator to test the native columnar to row
conversion with randomly generated schemas containing arrays, structs,
and maps.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests verifying that native columnar to row conversion correctly
handles complex nested types:
- Array<Array<Int>>
- Map<String, Array<Int>>
- Struct<Array<Map<String, Int>>, String>

These tests confirm the recursive conversion logic works for arbitrary
nesting depth.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a fuzz test using FuzzDataGenerator.generateNestedSchema to test
native columnar to row conversion with deeply nested random schemas
(depth 1-3, with arrays, structs, and maps).

The test uses only primitive types supported by native C2R (excludes
TimestampNTZType which is not yet supported).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use actual array type for dispatching instead of schema type to
  handle type mismatches between serialized schema and FFI arrays
- Add support for LargeList (64-bit offsets) arrays
- Replace .unwrap() with proper error handling to provide clear
  error messages instead of panics
- Add tests for LargeList handling

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When Parquet data is read, string columns may be dictionary-encoded
for efficiency. The schema says Utf8 but the actual Arrow array is
Dictionary(Int32, Utf8). This caused a type mismatch error.

- Add support for Dictionary-encoded arrays in get_variable_length_data
- Handle all common key types (Int8, Int16, Int32, Int64, UInt8-64)
- Support Utf8, LargeUtf8, Binary, and LargeBinary value types
- Add tests for dictionary-encoded string arrays

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@codecov-commenter
Copy link

codecov-commenter commented Jan 20, 2026

Codecov Report

❌ Patch coverage is 69.44444% with 33 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.08%. Comparing base (f09f8af) to head (377214a).
⚠️ Report is 859 commits behind head on main.

Files with missing lines Patch % Lines
...rg/apache/comet/NativeColumnarToRowConverter.scala 63.88% 7 Missing and 6 partials ⚠️
...spark/sql/comet/CometNativeColumnarToRowExec.scala 81.39% 4 Missing and 4 partials ⚠️
...ain/scala/org/apache/comet/vector/NativeUtil.scala 0.00% 6 Missing ⚠️
...he/comet/rules/EliminateRedundantTransitions.scala 63.63% 2 Missing and 2 partials ⚠️
...java/org/apache/comet/NativeColumnarToRowInfo.java 83.33% 0 Missing and 1 partial ⚠️
...n/scala/org/apache/comet/ExtendedExplainInfo.scala 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3221      +/-   ##
============================================
+ Coverage     56.12%   60.08%   +3.96%     
- Complexity      976     1453     +477     
============================================
  Files           119      173      +54     
  Lines         11743    15852    +4109     
  Branches       2251     2624     +373     
============================================
+ Hits           6591     9525    +2934     
- Misses         4012     4995     +983     
- Partials       1140     1332     +192     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove andygrove changed the title feat: [EXPERIMENTAL] Native columnar to row conversion feat: Native columnar to row conversion Jan 20, 2026
andygrove and others added 12 commits January 20, 2026 06:08
Add CometColumnarToRowBenchmark to compare performance of:
- Spark's default ColumnarToRowExec
- Comet's JVM-based CometColumnarToRowExec
- Comet's Native CometNativeColumnarToRowExec

Benchmark covers:
- Primitive types (int, long, double, string, boolean, date)
- String-heavy workloads (short, medium, long strings)
- Struct types (simple, nested, deeply nested)
- Array types (primitives and strings)
- Map types (various key/value combinations)
- Complex nested types (arrays of structs, maps with arrays)
- Wide rows (50 columns of mixed types)

Run with:
SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometColumnarToRowBenchmark

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The native columnar-to-row conversion was allocating intermediate Vec<u8>
for every variable-length field (strings, binary). This change:

- Adds write_variable_length_to_buffer() that writes directly to the
  output buffer instead of returning a Vec
- Adds write_dictionary_to_buffer() functions for dictionary-encoded arrays
- Adds #[inline] hints to hot-path functions
- Removes intermediate allocations for Utf8, LargeUtf8, Binary, LargeBinary

Benchmark results for String Types:
- Before: Native was slower than Spark
- After: Native matches Spark (1.0X)

Primitive types and complex nested types (struct, array, map) still have
overhead from JNI/FFI and remaining intermediate allocations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Inspired by Velox UnsafeRowFast, add optimizations for all-fixed-width schemas:

- Add is_fixed_width() and is_all_fixed_width() detection functions
- Add convert_fixed_width() fast path that:
  - Pre-allocates entire buffer at once (row_size * num_rows)
  - Pre-fills offsets/lengths arrays (constant row size)
  - Processes column-by-column for better cache locality
- Add write_column_fixed_width() for type-specific column processing
- Add tests for fixed-width fast path detection

Limitations:
- UnsafeRow format stores 8-byte fields per row (not columnar), so
  bulk memcpy of entire columns is not possible
- JNI/FFI boundary crossing still has overhead
- The "primitive types" benchmark includes strings, so it doesn't
  trigger the fixed-width fast path

For schemas with only fixed-width columns (no strings, arrays, maps,
structs), this reduces allocations and improves cache locality.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add fixedWidthOnlyBenchmark() with only fixed-width types (no strings)
  to test the native C2R fast path that pre-allocates buffers
- Refactor all benchmark methods to use addC2RBenchmarkCases() helper,
  reducing ~110 lines of duplicated code

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…e allocations

- Add direct-write functions (write_struct_to_buffer, write_list_to_buffer,
  write_map_to_buffer) that write directly to output buffer
- Remove legacy functions that returned intermediate Vec<u8> objects
- Eliminates memory allocation per complex type value

Benchmark improvements:
- Struct: 604ms → 330ms (1.8x faster)
- Array: 580ms → 410ms (1.4x faster)
- Map: 1141ms → 705ms (1.6x faster)
- Complex Nested: 1434ms → 798ms (1.8x faster)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add memcpy-style bulk copying for arrays of primitive types without nulls.
When array elements are fixed-width primitives (Int8, Int16, Int32, Int64,
Float32, Float64, Date32, Timestamp) and have no null values, copy the
entire values buffer at once instead of iterating element by element.

Benchmark improvement for Array Types:
- Before: 410ms (0.5X of Spark)
- After: 301ms (0.7X of Spark)
- 27% faster

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Move type dispatch outside the inner row loop by pre-downcasting all
arrays to typed variants before processing. This eliminates the O(rows *
columns * type_dispatch_cost) overhead in the general path.

Adds TypedArray enum with variants for all supported types, with
methods for null checking, fixed-value extraction, and variable-length
writing that operate directly on concrete array types.

Benchmark improvements:
- Primitive Types: 201ms → 126ms (37% faster, 0.5X → 0.7X)
- String Types: 164ms → 120ms (27% faster, 1.0X → 1.4X)
- Wide Rows: 1242ms → 737ms (41% faster, 0.6X → 1.0X)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use correct Arrow array types for bulk copy (Date32Array instead of
  Int32Array, TimestampMicrosecondArray instead of Int64Array)
- Add Boolean array support to bulk copy path (element-by-element but
  still avoiding type dispatch overhead)
- Enable bulk copy for arrays with nulls - copy values buffer then set
  null bits separately (null slots contain garbage but won't be read)
- Restore fixed-width value writing in slow path for unsupported types
  (e.g., Decimal128 in arrays)

This fixes the fuzz test failure where Date32 arrays in maps were
producing incorrect values due to failed downcast falling through
to an incomplete slow path.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implements Velox-style optimizations for array and map conversion:

1. **TypedElements enum**: Pre-downcast element arrays once to avoid
   type dispatch in inner loops

2. **Direct offset access**: Use ListArray/MapArray offsets directly
   instead of calling value(row_idx) which allocates a sliced ArrayRef

3. **Range-based bulk copy**: Copy element ranges directly from the
   underlying values buffer using pointer arithmetic

Benchmark improvements:
- Array Types: 274ms → 163ms (40% faster, 0.8X → 1.4X)
- Map Types: 605ms → 292ms (52% faster, 0.6X → 1.4X)
- Complex Nested: 701ms → 410ms (42% faster, 0.6X → 1.2X)

Native C2R now matches or beats Comet JVM for array/map types.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Remove Vec allocation overhead by using inline type dispatch for struct
fields instead of pre-collecting into a Vec<TypedElements>. This improves
struct type performance from 357ms to 272ms (24% faster).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Pre-downcast all struct field columns into TypedElements at batch
initialization time (in TypedArray::from_array). This eliminates
per-row type dispatch overhead for struct fields.

Performance improvement for struct types:
- Before: 272ms (0.8X of Spark)
- After: 220ms (1.0X of Spark, matching Spark performance)

The pre-downcast pattern is now consistently applied to:
- Top-level columns (TypedArray)
- Array/List elements (TypedElements)
- Map keys/values (TypedElements)
- Struct fields (TypedElements) - NEW

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Pre-compute variable-length column indices once per batch instead of
calling is_variable_length() for every column in every row. In pass 2,
only iterate over variable-length columns using the pre-computed indices.

Also skip writing placeholder values for variable-length columns in pass 1,
since they will be overwritten in pass 2.

Performance improvement for primitive types (mixed with strings):
- Before: 131ms (0.8X of Spark)
- After: ~114ms (0.9X of Spark)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@andygrove andygrove marked this pull request as ready for review January 20, 2026 17:15
- Add #[allow(clippy::too_many_arguments)] to write_elements_slow
- Remove unused functions that were added during development:
  - write_variable_length_to_buffer
  - get_element_size
  - try_bulk_copy_primitive_array_with_nulls
  - write_array_data_to_buffer
  - write_array_data_to_buffer_for_map
- Remove #[inline] from write_struct_to_buffer (too large/complex)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Address review feedback: the #[inline] hint doesn't make sense for a
function with macro-generated match arms. Let the compiler decide.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@andygrove
Copy link
Member Author

Claude's response to @mbutrovich's comment on #[inline]:

Good point. I've removed the #[inline] hint in ab074bd and will let the compiler decide. If profiling shows it's a bottleneck, we can revisit with benchmarks.

@andygrove andygrove requested review from comphead and wForget January 21, 2026 13:55
@andygrove andygrove added this to the 0.13.0 milestone Jan 22, 2026
val (arrayAddrs, schemaAddrs, exportedNumRows) = nativeUtil.exportBatchToAddresses(batch)

// Call native conversion
val info = nativeLib.columnarToRowConvert(c2rHandle, arrayAddrs, schemaAddrs, exportedNumRows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider exportedNumRows > batchSize?

numOutputRows += numRows

val startTime = System.nanoTime()
val result = converter.convert(batch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the columnBatch need to be closed after converting?

@andygrove andygrove removed this from the 0.13.0 milestone Jan 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants