Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tonic = { version = "0.10", features = ["tls", "gzip"] }

[dev-dependencies]
clap = "2"
criterion = "0.7.0"
env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
Expand All @@ -64,3 +65,7 @@ tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
name = "failpoint_tests"
path = "tests/failpoint_tests.rs"
required-features = ["fail/failpoints"]

[[bench]]
name = "tso_waker_policy"
harness = false
120 changes: 120 additions & 0 deletions benches/tso_waker_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::hint::black_box;
use std::sync::Arc;
use std::task::{Wake, Waker};
use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::task::AtomicWaker;

const MAX_PENDING_COUNT: usize = 1 << 16;
// Every `FULL_EVERY` iterations, the synthetic queue is reset to "full".
const FULL_EVERY: u64 = 1024;
// Within each `FULL_EVERY` window, keep queue full for `FULL_WINDOW` iterations.
const FULL_WINDOW: u64 = 16;

struct NoopWake;

impl Wake for NoopWake {
fn wake(self: Arc<Self>) {}
fn wake_by_ref(self: &Arc<Self>) {}
}

fn response_policy_old(iterations: u64) -> Duration {
let atomic_waker = AtomicWaker::new();
let waker = Waker::from(Arc::new(NoopWake));
atomic_waker.register(&waker);

let mut pending_len = 0usize;
let start = Instant::now();
for i in 0..iterations {
if i % FULL_EVERY == 0 {
pending_len = MAX_PENDING_COUNT;
}
black_box(pending_len >= MAX_PENDING_COUNT);
pending_len = pending_len.saturating_sub(1);
// Old behavior: wake unconditionally for every response batch.
atomic_waker.wake();
}
start.elapsed()
}

fn response_policy_new(iterations: u64) -> Duration {
let atomic_waker = AtomicWaker::new();
let waker = Waker::from(Arc::new(NoopWake));
atomic_waker.register(&waker);

let mut pending_len = 0usize;
let start = Instant::now();
for i in 0..iterations {
if i % FULL_EVERY == 0 {
pending_len = MAX_PENDING_COUNT;
}
let was_full = pending_len >= MAX_PENDING_COUNT;
pending_len = pending_len.saturating_sub(1);
let should_wake = was_full && pending_len < MAX_PENDING_COUNT;
// New behavior: wake only for full -> non-full transition.
if black_box(should_wake) {
atomic_waker.wake();
}
}
start.elapsed()
}

fn register_policy_old(iterations: u64) -> Duration {
let atomic_waker = AtomicWaker::new();
let waker = Waker::from(Arc::new(NoopWake));

let start = Instant::now();
for i in 0..iterations {
let pending_len = if i % FULL_EVERY < FULL_WINDOW {
MAX_PENDING_COUNT
} else {
MAX_PENDING_COUNT - 1
};
black_box(pending_len);
atomic_waker.register(&waker);
}
start.elapsed()
}

fn register_policy_new(iterations: u64) -> Duration {
let atomic_waker = AtomicWaker::new();
let waker = Waker::from(Arc::new(NoopWake));

let start = Instant::now();
for i in 0..iterations {
let pending_len = if i % FULL_EVERY < FULL_WINDOW {
MAX_PENDING_COUNT
} else {
MAX_PENDING_COUNT - 1
};
if black_box(pending_len >= MAX_PENDING_COUNT) {
atomic_waker.register(&waker);
}
}
start.elapsed()
}

fn bench_tso_waker_policy(c: &mut Criterion) {
let mut group = c.benchmark_group("tso_waker_policy");
group.warm_up_time(Duration::from_secs(2));
group.measurement_time(Duration::from_secs(6));

group.bench_function(BenchmarkId::new("response", "old"), |b| {
b.iter_custom(response_policy_old);
});
group.bench_function(BenchmarkId::new("response", "new"), |b| {
b.iter_custom(response_policy_new);
});
group.bench_function(BenchmarkId::new("register", "old"), |b| {
b.iter_custom(register_policy_old);
});
group.bench_function(BenchmarkId::new("register", "new"), |b| {
b.iter_custom(register_policy_new);
});

group.finish();
}

criterion_group!(benches, bench_tso_waker_policy);
criterion_main!(benches);
66 changes: 66 additions & 0 deletions doc/tso_waker_criterion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# TSO Waker Criterion Benchmark

Date: 2026-02-09
Repo: `tikv/client-rust`
Branch: `mingley/tso-waker-criterion`
Host: macOS 26.2 (Darwin 25.2.0), Apple M4 Pro, arm64
Rust toolchain: 1.84.1

## Goal

Quantify the latency impact of reducing TSO stream wake/registration churn in
`src/pd/timestamp.rs`.

These numbers are a point-in-time snapshot from this branch. Re-run the
benchmark after meaningful changes to either `src/pd/timestamp.rs` or
`benches/tso_waker_policy.rs`.

## Method

Benchmark framework:
- Criterion (`cargo bench`)

Bench target:
- `benches/tso_waker_policy.rs`

Command used:

```bash
cargo bench --bench tso_waker_policy -- --noplot
```

Criterion configuration in benchmark:
- warmup: 2 seconds
- measurement: 6 seconds
- samples: 100

The benchmark compares old vs new policies in two isolated hot paths:
- `response/*`: wake policy when processing responses
- `register/*`: self-waker registration policy in no-request branch

Note: the old/new response benchmarks intentionally do asymmetric work
(always wake vs transition-only wake), so the speedup reflects the amortized
benefit of skipping redundant wake calls under this simulation pattern.

## Results (Absolute Latency)

From Criterion output (`time` line):

- `tso_waker_policy/response/old`: `[3.4869 ns 3.4909 ns 3.4954 ns]`
- `tso_waker_policy/response/new`: `[797.61 ps 798.59 ps 799.71 ps]`

- `tso_waker_policy/register/old`: `[2.3964 ns 2.4009 ns 2.4058 ns]`
- `tso_waker_policy/register/new`: `[290.92 ps 291.35 ps 291.85 ps]`

Median-based speedups:
- response path: `3.4909 ns / 0.79859 ns = 4.37x`
- registration path: `2.4009 ns / 0.29135 ns = 8.24x`

## Interpretation

The new policy materially reduces per-operation latency in both isolated paths,
with sub-nanosecond median latency for the optimized variants in this synthetic
microbenchmark.

This benchmark is intentionally focused on internal policy overhead. It does not
by itself measure end-to-end PD/TSO RPC latency in a real TiKV deployment.
Loading