From 761543ff34995d88b8f3400fde1a7d7751288ce8 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 19 Sep 2025 16:30:00 -0400 Subject: [PATCH 01/23] much more comprehensive perf test with analysis: ezmsg-perf --- pyproject.toml | 4 +- src/ezmsg/util/perf/__init__.py | 0 src/ezmsg/util/perf/analysis.py | 113 ++++++++++ src/ezmsg/util/perf/command.py | 55 +++++ src/ezmsg/util/perf/eval.py | 91 ++++++++ src/ezmsg/util/perf/util.py | 356 ++++++++++++++++++++++++++++++++ src/ezmsg/util/perf_test.py | 220 -------------------- 7 files changed, 618 insertions(+), 221 deletions(-) create mode 100644 src/ezmsg/util/perf/__init__.py create mode 100644 src/ezmsg/util/perf/analysis.py create mode 100644 src/ezmsg/util/perf/command.py create mode 100644 src/ezmsg/util/perf/eval.py create mode 100644 src/ezmsg/util/perf/util.py delete mode 100644 src/ezmsg/util/perf_test.py diff --git a/pyproject.toml b/pyproject.toml index df1f469a..961271b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ test = [ "pytest>=8.4.1", "pytest-asyncio>=1.1.0", "pytest-cov>=6.2.1", - "xarray>=2023.1.0;python_version<'3.13'" + "xarray>=2023.1.0;python_version<'3.13'", + "psutil>=7.1.0", ] docs = [ "ezmsg-sigproc>=2.2.0", @@ -44,6 +45,7 @@ docs = [ [project.scripts] ezmsg = "ezmsg.core.command:cmdline" +ezmsg-perf = "ezmsg.util.perf.command:command" [project.optional-dependencies] axisarray = [ diff --git a/src/ezmsg/util/perf/__init__.py b/src/ezmsg/util/perf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py new file mode 100644 index 00000000..48dd800e --- /dev/null +++ b/src/ezmsg/util/perf/analysis.py @@ -0,0 +1,113 @@ +import json +import typing +import dataclasses + +from pathlib import Path + +from ..messagecodec import MessageDecoder +from .util import ( + TestEnvironmentInfo, + TestParameters, + Metrics, +) + +import ezmsg.core as ez + +try: + import xarray as xr +except ImportError: + ez.logger.error('ezmsg perf analysis requires xarray') + raise + +try: + import numpy as np +except ImportError: + ez.logger.error('ezmsg perf analysis requires numpy') + raise + + +def load_perf(perf: Path) -> xr.Dataset: + + params: typing.List[TestParameters] = [] + results: typing.List[Metrics] = [] + + with open(perf, 'r') as perf_f: + info: TestEnvironmentInfo = json.loads(next(perf_f), cls = MessageDecoder) + + for line in perf_f: + obj = json.loads(line, cls = MessageDecoder) + params.append(obj['params']) + results.append(obj['results']) + + n_clients_axis = list(sorted(set([p.n_clients for p in params]))) + msg_size_axis = list(sorted(set([p.msg_size for p in params]))) + comms_axis = list(sorted(set([p.comms for p in params]))) + config_axis = list(sorted(set([p.config for p in params]))) + + dims = ['n_clients', 'msg_size', 'comms', 'config'] + coords = { + 'n_clients': n_clients_axis, + 'msg_size': msg_size_axis, + 'comms': comms_axis, + 'config': config_axis + } + + data_vars = {} + for field in dataclasses.fields(Metrics): + m = np.zeros(( + len(n_clients_axis), + len(msg_size_axis), + len(comms_axis), + len(config_axis) + )) * np.nan + for p, r in zip(params, results): + m[ + n_clients_axis.index(p.n_clients), + msg_size_axis.index(p.msg_size), + comms_axis.index(p.comms), + config_axis.index(p.config) + ] = getattr(r, field.name) + data_vars[field.name] = xr.DataArray(m, dims = dims, coords = coords) + + dataset = xr.Dataset(data_vars, attrs = dict(info = info)) + return dataset + +@dataclasses.dataclass +class SummaryArgs: + perf: Path + baseline: Path | None + +def summary(args: SummaryArgs): + + perf_dataset = load_perf(args.perf) + + for config, config_ds in perf_dataset.groupby('config'): + for comms, comms_ds in config_ds.groupby('comms'): + print(f'{config}: {comms}') + print(comms_ds.sample_rate.data) + + if args.baseline is not None: + baseline_dataset = load_perf(args.baseline) + + +def command() -> None: + import argparse + + parser = argparse.ArgumentParser() + + parser.add_argument( + "perf", + type=lambda x: Path(x), + help="perf test", + ) + + parser.add_argument( + "--baseline", "-b", + type=lambda x: Path(x), + default = None, + help="baseline perf test for comparison" + ) + + args = parser.parse_args(namespace=SummaryArgs) + + summary(args) diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py new file mode 100644 index 00000000..5269a1d0 --- /dev/null +++ b/src/ezmsg/util/perf/command.py @@ -0,0 +1,55 @@ +from pathlib import Path + +from .analysis import summary, SummaryArgs +from .eval import perf_eval, PerfEvalArgs + +def command() -> None: + import argparse + + parser = argparse.ArgumentParser(description = 'ezmsg perf test utility') + subparsers = parser.add_subparsers(dest="command", required=True) + + p_run = subparsers.add_parser("run", help="run performance test") + p_run.add_argument( + "--duration", + type=float, + default=2.0, + help="individual test duration in seconds (default = 2.0)", + ) + p_run.add_argument( + "--num-buffers", + type=int, + default=32, + help="shared memory buffers (default = 32)", + ) + + p_run.set_defaults(_handler=lambda ns: perf_eval( + PerfEvalArgs( + duration = ns.duration, + num_buffers = ns.num_buffers + ) + )) + + p_summary = subparsers.add_parser("summary", help = "summarise performance results") + p_summary.add_argument( + "perf", + type=Path, + help="perf test", + ) + p_summary.add_argument( + "--baseline", + "-b", + type=Path, + default=None, + help="baseline perf test for comparison", + ) + + p_summary.set_defaults(_handler=lambda ns: summary( + SummaryArgs( + perf = ns.perf, + baseline = ns.baseline + ) + )) + + ns = parser.parse_args() + ns._handler(ns) diff --git a/src/ezmsg/util/perf/eval.py b/src/ezmsg/util/perf/eval.py new file mode 100644 index 00000000..894c45db --- /dev/null +++ b/src/ezmsg/util/perf/eval.py @@ -0,0 +1,91 @@ +import json +import datetime +import itertools + +from dataclasses import dataclass + +from ..messagecodec import MessageEncoder +from .util import ( + TestEnvironmentInfo, + TestParameters, + perform_test, + Communication, + CONFIGS, +) + +import ezmsg.core as ez + +def get_datestamp() -> str: + return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + +@dataclass +class PerfEvalArgs: + duration: float + num_buffers: int + +def perf_eval(args: PerfEvalArgs) -> None: + + msg_sizes = [2 ** exp for exp in range(4, 25, 4)] + n_clients = [2 ** exp for exp in range(0, 6)] + comms = [c for c in Communication] + + test_list = list(itertools.product(msg_sizes, n_clients, CONFIGS, comms)) + + with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: + + out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") + + for test_idx, (msg_size, n_clients, config, comms) in enumerate(test_list): + + ez.logger.info(f"RUNNING TEST {test_idx + 1} / {len(test_list)} ({(test_idx / len(test_list)) * 100.0:0.2f} %)") + + params = TestParameters( + msg_size = msg_size, + n_clients = n_clients, + config = config.__name__, + comms = comms.value, + duration = args.duration, + num_buffers = args.num_buffers + ) + + results = perform_test( + n_clients = n_clients, + duration = args.duration, + msg_size = msg_size, + buffers = args.num_buffers, + comms = comms, + config = config, + ) + + output = dict( + params = params, + results = results + ) + + out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") + + +def command() -> None: + import argparse + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--duration", + type=float, + default=2.0, + help="How long to run each load test (seconds) (default = 2.0)", + ) + + parser.add_argument( + "--num-buffers", + type=int, + default=32, + help="shared memory buffers (default = 32)" + ) + + args = parser.parse_args(namespace=PerfEvalArgs) + + perf_eval(args) + + diff --git a/src/ezmsg/util/perf/util.py b/src/ezmsg/util/perf/util.py new file mode 100644 index 00000000..8883591b --- /dev/null +++ b/src/ezmsg/util/perf/util.py @@ -0,0 +1,356 @@ +import asyncio +import dataclasses +import datetime +import os +import platform +import time +import typing +import enum +import sys +import subprocess + +import ezmsg.core as ez + +try: + import numpy as np +except ImportError: + ez.logger.error("ezmsg perf requires numpy") + raise + +try: + import psutil +except ImportError: + ez.logger.error("ezmsg perf requires psutil") + raise + + +def collect( + components: typing.Optional[typing.Mapping[str, ez.Component]] = None, + network: ez.NetworkDefinition = (), + process_components: typing.Collection[ez.Component] | None = None, + **components_kwargs: ez.Component, +) -> ez.Collection: + """ collect a grouping of pre-configured components into a new "Collection" """ + from ezmsg.core.util import either_dict_or_kwargs + + components = either_dict_or_kwargs(components, components_kwargs, "collect") + if components is None: + raise ValueError("Must supply at least one component to run") + + out = ez.Collection() + for name, comp in components.items(): + comp._set_name(name) + out._components = components # FIXME: Component._components should be typehinted as a Mapping + out.network = lambda: network + out.process_components = lambda: (out,) if process_components is None else process_components + return out + + +@dataclasses.dataclass +class Metrics: + num_msgs: int + sample_rate: float + latency_mean: float + latency_total: float + data_rate: float + + +class LoadTestSettings(ez.Settings): + duration: float + dynamic_size: int + buffers: int + force_tcp: bool + + +@dataclasses.dataclass +class LoadTestSample: + _timestamp: float + counter: int + dynamic_data: np.ndarray + key: str + + +class LoadTestSender(ez.Unit): + OUTPUT = ez.OutputStream(LoadTestSample) + SETTINGS = LoadTestSettings + + async def initialize(self) -> None: + self.running = True + self.counter = 0 + + self.OUTPUT.num_buffers = self.SETTINGS.buffers + self.OUTPUT.force_tcp = self.SETTINGS.force_tcp + + @ez.publisher(OUTPUT) + async def publish(self) -> typing.AsyncGenerator: + ez.logger.info(f"Load test publisher started. (PID: {os.getpid()})") + start_time = time.perf_counter() + while self.running: + current_time = time.perf_counter() + if current_time - start_time >= self.SETTINGS.duration: + break + + yield ( + self.OUTPUT, + LoadTestSample( + _timestamp=time.perf_counter(), + counter=self.counter, + dynamic_data=np.zeros( + int(self.SETTINGS.dynamic_size // 8), dtype=np.float32 + ), + key = self.name, + ), + ) + self.counter += 1 + ez.logger.info("Exiting publish") + raise ez.Complete + +class LoadTestSource(LoadTestSender): + async def shutdown(self) -> None: + self.running = False + ez.logger.info(f"Samples sent: {self.counter}") + + +class LoadTestRelay(ez.Unit): + INPUT = ez.InputStream(LoadTestSample) + OUTPUT = ez.OutputStream(LoadTestSample) + + @ez.subscriber(INPUT, zero_copy = True) + @ez.publisher(OUTPUT) + async def on_msg(self, msg: LoadTestSample) -> typing.AsyncGenerator: + yield self.OUTPUT, msg + + +class LoadTestReceiverState(ez.State): + # Tuples of sent timestamp, received timestamp, counter, dynamic size + received_data: typing.List[typing.Tuple[float, float, int]] = dataclasses.field( + default_factory=list + ) + counters: typing.Dict[str, int] = dataclasses.field(default_factory=dict) + + +class LoadTestReceiver(ez.Unit): + INPUT = ez.InputStream(LoadTestSample) + SETTINGS = LoadTestSettings + STATE = LoadTestReceiverState + + @ez.subscriber(INPUT, zero_copy=True) + async def receive(self, sample: LoadTestSample) -> None: + counter = self.STATE.counters.get(sample.key, -1) + if sample.counter != counter + 1: + ez.logger.warning( + f"{sample.counter - counter - 1} samples skipped!" + ) + self.STATE.received_data.append( + (sample._timestamp, time.perf_counter(), sample.counter) + ) + self.STATE.counters[sample.key] = sample.counter + + +class LoadTestSink(LoadTestReceiver): + + @ez.task + async def terminate(self) -> None: + ez.logger.info(f"Load test subscriber started. (PID: {os.getpid()})") + + # Wait for the duration of the load test + await asyncio.sleep(self.SETTINGS.duration) + raise ez.NormalTermination + + +### TEST CONFIGURATIONS + +@dataclasses.dataclass +class ConfigSettings: + n_clients: int + settings: LoadTestSettings + source: LoadTestSource + sink: LoadTestSink + +Configuration = typing.Tuple[typing.Iterable[ez.Component], ez.NetworkDefinition] +Configurator = typing.Callable[[ConfigSettings], Configuration] + +def fanout(config: ConfigSettings) -> Configuration: + """ one pub to many subs """ + connections: ez.NetworkDefinition = [(config.source.OUTPUT, config.sink.INPUT)] + subs = [LoadTestReceiver(config.settings) for _ in range(config.n_clients)] + for sub in subs: + connections.append((config.source.OUTPUT, sub.INPUT)) + + return subs, connections + +def fanin(config: ConfigSettings) -> Configuration: + """ many pubs to one sub """ + connections: ez.NetworkDefinition = [(config.source.OUTPUT, config.sink.INPUT)] + pubs = [LoadTestSender(config.settings) for _ in range(config.n_clients)] + for pub in pubs: + connections.append((pub.OUTPUT, config.sink.INPUT)) + return pubs, connections + + +def relay(config: ConfigSettings) -> Configuration: + """ one pub to one sub through many relays """ + connections: ez.NetworkDefinition = [] + + relays = [LoadTestRelay(config.settings) for _ in range(config.n_clients)] + connections.append((config.source.OUTPUT, relays[0].INPUT)) + for from_relay, to_relay in zip(relays[:-1], relays[1:]): + connections.append((from_relay.OUTPUT, to_relay.INPUT)) + connections.append((relays[-1].OUTPUT, config.sink.INPUT)) + + return relays, connections + +CONFIGS: typing.Iterable[Configurator] = [fanin, fanout, relay] + +class Communication(enum.StrEnum): + LOCAL = "local" + SHM = "shm" + SHM_SPREAD = "shm_spread" + TCP = "tcp" + TCP_SPREAD = "tcp_spread" + +def perform_test( + n_clients: int, + duration: float, + msg_size: int, + buffers: int, + comms: Communication, + config: Configurator +) -> Metrics: + + settings = LoadTestSettings( + dynamic_size = int(msg_size), + duration = duration, + buffers = buffers, + force_tcp = (comms == Communication.TCP), + ) + + source = LoadTestSource(settings) + sink = LoadTestSink(settings) + + components: typing.Mapping[str, ez.Component] = dict( + SINK = sink, + ) + + clients, connections = config(ConfigSettings(n_clients, settings, source, sink)) + + # The 'sink' MUST remain in this process for us to pull its state. + process_components: typing.Iterable[ez.Component] = [] + if comms == Communication.LOCAL: + # Every component in the same process (this one) + components["SOURCE"] = source + for i, client in enumerate(clients): + components[f"CLIENT_{i+1}"] = client + + else: + + if comms in (Communication.SHM_SPREAD, Communication.TCP_SPREAD): + # Every component in its own process. + components["SOURCE"] = source + process_components.append(source) + for i, client in enumerate(clients): + components[f'CLIENT_{i+1}'] = client + process_components.append(client) + + else: + # All clients and the source in ONE other process. + collect_comps: typing.Mapping[str, ez.Component] = dict() + collect_comps["SOURCE"] = source + for i, client in enumerate(clients): + collect_comps[f"CLIENT_{i+1}"] = client + proc_collection = collect(components = collect_comps) + components["PROC"] = proc_collection + process_components = [proc_collection] + + ez.run( + components = components, + connections = connections, + process_components = process_components, + ) + + return calculate_metrics(sink) + + +def calculate_metrics(sink: LoadTestSink) -> Metrics: + + # Log some useful summary statistics + min_timestamp = min(timestamp for timestamp, _, _ in sink.STATE.received_data) + max_timestamp = max(timestamp for timestamp, _, _ in sink.STATE.received_data) + total_latency = abs( + sum( + receive_timestamp - send_timestamp + for send_timestamp, receive_timestamp, _ in sink.STATE.received_data + ) + ) + + counters = list(sorted(t[2] for t in sink.STATE.received_data)) + dropped_samples = sum( + [max((x1 - x0) - 1, 0) for x1, x0 in zip(counters[1:], counters[:-1])] + ) + + num_samples = len(sink.STATE.received_data) + ez.logger.info(f"Samples received: {num_samples}") + sample_rate = num_samples / (max_timestamp - min_timestamp) + ez.logger.info(f"Sample rate: {sample_rate} Hz") + latency_mean = total_latency / num_samples + ez.logger.info(f"Mean latency: {latency_mean} s") + ez.logger.info(f"Total latency: {total_latency} s") + + total_data = num_samples * sink.SETTINGS.dynamic_size + data_rate = total_data / (max_timestamp - min_timestamp) + ez.logger.info(f"Data rate: {data_rate * 1e-6} MB/s") + + if dropped_samples: + ez.logger.error( + f"Dropped samples: {dropped_samples} ({dropped_samples / (dropped_samples + num_samples)}%)", + ) + + return Metrics( + num_msgs = num_samples, + sample_rate = sample_rate, + latency_mean = latency_mean, + latency_total = total_latency, + data_rate = data_rate + ) + + +@dataclasses.dataclass +class TestParameters: + msg_size: int + n_clients: int + config: str + comms: str + duration: float + num_buffers: int + +def _git_commit() -> str: + try: + return subprocess.check_output( + ["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL + ).decode().strip() + except: + return "unknown" + +def _git_branch() -> str: + try: + return subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL + ).decode().strip() + except: + return "unknown" + +@dataclasses.dataclass +class TestEnvironmentInfo: + ezmsg_version: str = dataclasses.field(default_factory=lambda: ez.__version__) + numpy_version: str = dataclasses.field(default_factory=lambda: np.__version__) + python_version: str = dataclasses.field(default_factory=lambda: sys.version.replace("\n", " ")) + os: str = dataclasses.field(default_factory=lambda: platform.system()) + os_version: str = dataclasses.field(default_factory=lambda: platform.version()) + machine: str = dataclasses.field(default_factory=lambda: platform.machine()) + processor: str = dataclasses.field(default_factory=lambda: platform.processor()) + cpu_count_logical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=True)) + cpu_count_physical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=False)) + memory_gb: float = dataclasses.field(default_factory=lambda:round(psutil.virtual_memory().total / (1024**3), 2)) + start_time: str = dataclasses.field(default_factory=lambda: datetime.datetime.now().isoformat(timespec="seconds")) + git_commit: str = dataclasses.field(default_factory=_git_commit) + git_branch: str = dataclasses.field(default_factory=_git_branch) \ No newline at end of file diff --git a/src/ezmsg/util/perf_test.py b/src/ezmsg/util/perf_test.py deleted file mode 100644 index b7536b5e..00000000 --- a/src/ezmsg/util/perf_test.py +++ /dev/null @@ -1,220 +0,0 @@ -import asyncio -import dataclasses -import datetime -import os -import platform -import time - -from typing import List, Tuple, AsyncGenerator - -import ezmsg.core as ez - -# We expect this test to generate LOTS of backpressure warnings -# PERF_LOGLEVEL = os.environ.get("EZMSG_LOGLEVEL", "ERROR") -# ez.logger.setLevel(PERF_LOGLEVEL) - -PLATFORM = { - "Darwin": "mac", - "Linux": "linux", - "Windows": "win", -}[platform.system()] -SAMPLE_SUMMARY_DATASET_PREFIX = "sample_summary" -COUNT_DATASET_NAME = "count" - - -try: - import numpy as np -except ImportError: - ez.logger.error("This test requires Numpy to run.") - raise - - -def get_datestamp() -> str: - return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - - -class LoadTestSettings(ez.Settings): - duration: float = 30.0 - dynamic_size: int = 8 - buffers: int = 32 - - -@dataclasses.dataclass -class LoadTestSample: - _timestamp: float - counter: int - dynamic_data: np.ndarray - - -class LoadTestPublisher(ez.Unit): - OUTPUT = ez.OutputStream(LoadTestSample) - SETTINGS = LoadTestSettings - - async def initialize(self) -> None: - self.running = True - self.counter = 0 - self.OUTPUT.num_buffers = self.SETTINGS.buffers - - @ez.publisher(OUTPUT) - async def publish(self) -> AsyncGenerator: - ez.logger.info(f"Load test publisher started. (PID: {os.getpid()})") - start_time = time.time() - while self.running: - current_time = time.time() - if current_time - start_time >= self.SETTINGS.duration: - break - - yield ( - self.OUTPUT, - LoadTestSample( - _timestamp=time.time(), - counter=self.counter, - dynamic_data=np.zeros( - int(self.SETTINGS.dynamic_size // 8), dtype=np.float32 - ), - ), - ) - self.counter += 1 - ez.logger.info("Exiting publish") - raise ez.Complete - - async def shutdown(self) -> None: - self.running = False - ez.logger.info(f"Samples sent: {self.counter}") - - -class LoadTestSubscriberState(ez.State): - # Tuples of sent timestamp, received timestamp, counter, dynamic size - received_data: List[Tuple[float, float, int]] = dataclasses.field( - default_factory=list - ) - counter: int = -1 - - -class LoadTestSubscriber(ez.Unit): - INPUT = ez.InputStream(LoadTestSample) - SETTINGS = LoadTestSettings - STATE = LoadTestSubscriberState - - @ez.subscriber(INPUT, zero_copy=True) - async def receive(self, sample: LoadTestSample) -> None: - if sample.counter != self.STATE.counter + 1: - ez.logger.warning( - f"{sample.counter - self.STATE.counter - 1} samples skipped!" - ) - self.STATE.received_data.append( - (sample._timestamp, time.time(), sample.counter) - ) - self.STATE.counter = sample.counter - - @ez.task - async def log_result(self) -> None: - ez.logger.info(f"Load test subscriber started. (PID: {os.getpid()})") - - # Wait for the duration of the load test - await asyncio.sleep(self.SETTINGS.duration) - # logger.info(f"STATE = {self.STATE.received_data}") - - # Log some useful summary statistics - min_timestamp = min(timestamp for timestamp, _, _ in self.STATE.received_data) - max_timestamp = max(timestamp for timestamp, _, _ in self.STATE.received_data) - total_latency = abs( - sum( - receive_timestamp - send_timestamp - for send_timestamp, receive_timestamp, _ in self.STATE.received_data - ) - ) - - counters = list(sorted(t[2] for t in self.STATE.received_data)) - dropped_samples = sum( - [(x1 - x0) - 1 for x1, x0 in zip(counters[1:], counters[:-1])] - ) - - num_samples = len(self.STATE.received_data) - ez.logger.info(f"Samples received: {num_samples}") - ez.logger.info( - f"Sample rate: {num_samples / (max_timestamp - min_timestamp)} Hz" - ) - ez.logger.info(f"Mean latency: {total_latency / num_samples} s") - ez.logger.info(f"Total latency: {total_latency} s") - - total_data = num_samples * self.SETTINGS.dynamic_size - ez.logger.info( - f"Data rate: {total_data / (max_timestamp - min_timestamp) * 1e-6} MB/s" - ) - ez.logger.info( - f"Dropped samples: {dropped_samples} ({dropped_samples / (dropped_samples + num_samples)}%)", - ) - - raise ez.NormalTermination - - -class LoadTest(ez.Collection): - SETTINGS = LoadTestSettings - - PUBLISHER = LoadTestPublisher() - SUBSCRIBER = LoadTestSubscriber() - - def configure(self) -> None: - self.PUBLISHER.apply_settings(self.SETTINGS) - self.SUBSCRIBER.apply_settings(self.SETTINGS) - - def network(self) -> ez.NetworkDefinition: - return ((self.PUBLISHER.OUTPUT, self.SUBSCRIBER.INPUT),) - - def process_components(self): - return ( - self.PUBLISHER, - self.SUBSCRIBER, - ) - - -def get_time() -> float: - # time.perf_counter() isn't system-wide on Windows Python 3.6: - # https://bugs.python.org/issue37205 - return time.time() if PLATFORM == "win" else time.perf_counter() - - -def test_performance(duration, size, buffers) -> None: - ez.logger.info(f"Running load test for dynamic size: {size} bytes") - system = LoadTest( - LoadTestSettings(dynamic_size=int(size), duration=duration, buffers=buffers) - ) - ez.run(SYSTEM=system) - - -def run_many_dynamic_sizes(duration, buffers) -> None: - for exp in range(5, 22, 4): - test_performance(duration, 2**exp, buffers) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser() - parser.add_argument( - "--many-dynamic-sizes", - action="store_true", - help="Run load test for many dynamic sizes", - ) - parser.add_argument( - "--duration", - type=int, - default=2, - help="How long to run the load test (seconds)", - ) - parser.add_argument( - "--num-buffers", type=int, default=32, help="Shared memory buffers" - ) - - class Args: - many_dynamic_sizes: bool - duration: int - num_buffers: int - - args = parser.parse_args(namespace=Args) - - if args.many_dynamic_sizes: - run_many_dynamic_sizes(args.duration, args.num_buffers) - else: - test_performance(args.duration, 8, args.num_buffers) From 0722bc9c8c47544df2377cb8b93e5564136678c2 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sat, 20 Sep 2025 08:50:38 -0400 Subject: [PATCH 02/23] less aggressive test strategy --- src/ezmsg/util/perf/eval.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ezmsg/util/perf/eval.py b/src/ezmsg/util/perf/eval.py index 894c45db..f7f7e27b 100644 --- a/src/ezmsg/util/perf/eval.py +++ b/src/ezmsg/util/perf/eval.py @@ -25,8 +25,8 @@ class PerfEvalArgs: def perf_eval(args: PerfEvalArgs) -> None: - msg_sizes = [2 ** exp for exp in range(4, 25, 4)] - n_clients = [2 ** exp for exp in range(0, 6)] + msg_sizes = [2 ** exp for exp in range(4, 25, 8)] + n_clients = [2 ** exp for exp in range(0, 6, 2)] comms = [c for c in Communication] test_list = list(itertools.product(msg_sizes, n_clients, CONFIGS, comms)) From d1fc6e3874b9506420e0abd94c6fbd2c9359ede2 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sat, 20 Sep 2025 10:00:16 -0400 Subject: [PATCH 03/23] slight refactor --- src/ezmsg/util/perf/analysis.py | 60 ++++++++--------- src/ezmsg/util/perf/command.py | 53 ++------------- src/ezmsg/util/perf/envinfo.py | 83 ++++++++++++++++++++++++ src/ezmsg/util/perf/{util.py => impl.py} | 41 ------------ src/ezmsg/util/perf/{eval.py => run.py} | 42 ++++++------ 5 files changed, 139 insertions(+), 140 deletions(-) create mode 100644 src/ezmsg/util/perf/envinfo.py rename src/ezmsg/util/perf/{util.py => impl.py} (84%) rename src/ezmsg/util/perf/{eval.py => run.py} (72%) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 48dd800e..0be97744 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -1,12 +1,13 @@ import json import typing import dataclasses +import argparse from pathlib import Path from ..messagecodec import MessageDecoder -from .util import ( - TestEnvironmentInfo, +from .envinfo import TestEnvironmentInfo +from .impl import ( TestParameters, Metrics, ) @@ -72,42 +73,41 @@ def load_perf(perf: Path) -> xr.Dataset: dataset = xr.Dataset(data_vars, attrs = dict(info = info)) return dataset -@dataclasses.dataclass -class SummaryArgs: - perf: Path - baseline: Path | None -def summary(args: SummaryArgs): +def summary(perf_path: Path, baseline_path: Path | None) -> None: + """ print perf test results and comparisons to the console """ - perf_dataset = load_perf(args.perf) + perf = load_perf(perf_path) + info = perf.attrs['info'] + if baseline_path is not None: + baseline = load_perf(baseline_path) + perf = (perf / baseline) * 100.0 - for config, config_ds in perf_dataset.groupby('config'): - for comms, comms_ds in config_ds.groupby('comms'): - print(f'{config}: {comms}') - print(comms_ds.sample_rate.data) + print(info) - if args.baseline is not None: - baseline_dataset = load_perf(args.baseline) + for _, config_ds in perf.groupby('config'): + for _, comms_ds in config_ds.groupby('comms'): + print(comms_ds.squeeze().to_dataframe()) + print("\n") + print("\n") -def command() -> None: - import argparse - - parser = argparse.ArgumentParser() - - parser.add_argument( +def setup_summary_cmdline(subparsers: argparse._SubParsersAction) -> None: + p_summary = subparsers.add_parser("summary", help = "summarize performance results") + p_summary.add_argument( "perf", - type=lambda x: Path(x), + type=Path, help="perf test", ) - - parser.add_argument( - "--baseline", "-b", - type=lambda x: Path(x), - default = None, - help="baseline perf test for comparison" + p_summary.add_argument( + "--baseline", + "-b", + type=Path, + default=None, + help="baseline perf test for comparison", ) - args = parser.parse_args(namespace=SummaryArgs) - - summary(args) + p_summary.set_defaults(_handler=lambda ns: summary( + perf_path = ns.perf, + baseline_path = ns.baseline + )) \ No newline at end of file diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py index 5269a1d0..a100863d 100644 --- a/src/ezmsg/util/perf/command.py +++ b/src/ezmsg/util/perf/command.py @@ -1,55 +1,14 @@ -from pathlib import Path +import argparse -from .analysis import summary, SummaryArgs -from .eval import perf_eval, PerfEvalArgs +from .analysis import setup_summary_cmdline +from .run import setup_run_cmdline def command() -> None: - import argparse - parser = argparse.ArgumentParser(description = 'ezmsg perf test utility') subparsers = parser.add_subparsers(dest="command", required=True) - p_run = subparsers.add_parser("run", help="run performance test") - p_run.add_argument( - "--duration", - type=float, - default=2.0, - help="individual test duration in seconds (default = 2.0)", - ) - p_run.add_argument( - "--num-buffers", - type=int, - default=32, - help="shared memory buffers (default = 32)", - ) - - p_run.set_defaults(_handler=lambda ns: perf_eval( - PerfEvalArgs( - duration = ns.duration, - num_buffers = ns.num_buffers - ) - )) - - p_summary = subparsers.add_parser("summary", help = "summarise performance results") - p_summary.add_argument( - "perf", - type=Path, - help="perf test", - ) - p_summary.add_argument( - "--baseline", - "-b", - type=Path, - default=None, - help="baseline perf test for comparison", - ) - - p_summary.set_defaults(_handler=lambda ns: summary( - SummaryArgs( - perf = ns.perf, - baseline = ns.baseline - ) - )) - + setup_run_cmdline(subparsers) + setup_summary_cmdline(subparsers) + ns = parser.parse_args() ns._handler(ns) diff --git a/src/ezmsg/util/perf/envinfo.py b/src/ezmsg/util/perf/envinfo.py new file mode 100644 index 00000000..66fff18b --- /dev/null +++ b/src/ezmsg/util/perf/envinfo.py @@ -0,0 +1,83 @@ +import dataclasses +import datetime +import platform +import typing +import sys +import subprocess + +import ezmsg.core as ez + +try: + import numpy as np +except ImportError: + ez.logger.error("ezmsg perf requires numpy") + raise + +try: + import psutil +except ImportError: + ez.logger.error("ezmsg perf requires psutil") + raise + + +def _git_commit() -> str: + try: + return subprocess.check_output( + ["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL + ).decode().strip() + except: + return "unknown" + +def _git_branch() -> str: + try: + return subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL + ).decode().strip() + except: + return "unknown" + +@dataclasses.dataclass +class TestEnvironmentInfo: + ezmsg_version: str = dataclasses.field(default_factory=lambda: ez.__version__) + numpy_version: str = dataclasses.field(default_factory=lambda: np.__version__) + python_version: str = dataclasses.field(default_factory=lambda: sys.version.replace("\n", " ")) + os: str = dataclasses.field(default_factory=lambda: platform.system()) + os_version: str = dataclasses.field(default_factory=lambda: platform.version()) + machine: str = dataclasses.field(default_factory=lambda: platform.machine()) + processor: str = dataclasses.field(default_factory=lambda: platform.processor()) + cpu_count_logical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=True)) + cpu_count_physical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=False)) + memory_gb: float = dataclasses.field(default_factory=lambda:round(psutil.virtual_memory().total / (1024**3), 2)) + start_time: str = dataclasses.field(default_factory=lambda: datetime.datetime.now().isoformat(timespec="seconds")) + git_commit: str = dataclasses.field(default_factory=_git_commit) + git_branch: str = dataclasses.field(default_factory=_git_branch) + + def __str__(self) -> str: + fields = dataclasses.asdict(self) + width = max(len(k) for k in fields) + lines = ["TestEnvironmentInfo:"] + for key, value in fields.items(): + lines.append(f" {key.ljust(width)} : {value}") + return "\n".join(lines) + + def diff(self, other: "TestEnvironmentInfo") -> typing.Dict[str, typing.Tuple[typing.Any, typing.Any]]: + """Return a structured diff: {field: (self_value, other_value)} for changed fields.""" + a = dataclasses.asdict(self) + b = dataclasses.asdict(other) + keys = set(a) | set(b) + return {k: (a.get(k), b.get(k)) for k in keys if a.get(k) != b.get(k)} + + +def format_env_diff(diffs: typing.Dict[str, typing.Tuple[typing.Any, typing.Any]]) -> str: + """Pretty-print the structured diff in the same aligned style.""" + if not diffs: + return "No differences." + width = max(len(k) for k in diffs) + lines = ["Differences in TestEnvironmentInfo:"] + for k in sorted(diffs): + left, right = diffs[k] + lines.append(f" {k.ljust(width)} : {left} != {right}") + return "\n".join(lines) + +def diff_envs(a: TestEnvironmentInfo, b: TestEnvironmentInfo) -> str: + return format_env_diff(a.diff(b)) \ No newline at end of file diff --git a/src/ezmsg/util/perf/util.py b/src/ezmsg/util/perf/impl.py similarity index 84% rename from src/ezmsg/util/perf/util.py rename to src/ezmsg/util/perf/impl.py index 8883591b..58357b8c 100644 --- a/src/ezmsg/util/perf/util.py +++ b/src/ezmsg/util/perf/impl.py @@ -1,13 +1,9 @@ import asyncio import dataclasses -import datetime import os -import platform import time import typing import enum -import sys -import subprocess import ezmsg.core as ez @@ -17,12 +13,6 @@ ez.logger.error("ezmsg perf requires numpy") raise -try: - import psutil -except ImportError: - ez.logger.error("ezmsg perf requires psutil") - raise - def collect( components: typing.Optional[typing.Mapping[str, ez.Component]] = None, @@ -323,34 +313,3 @@ class TestParameters: duration: float num_buffers: int -def _git_commit() -> str: - try: - return subprocess.check_output( - ["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL - ).decode().strip() - except: - return "unknown" - -def _git_branch() -> str: - try: - return subprocess.check_output( - ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL - ).decode().strip() - except: - return "unknown" - -@dataclasses.dataclass -class TestEnvironmentInfo: - ezmsg_version: str = dataclasses.field(default_factory=lambda: ez.__version__) - numpy_version: str = dataclasses.field(default_factory=lambda: np.__version__) - python_version: str = dataclasses.field(default_factory=lambda: sys.version.replace("\n", " ")) - os: str = dataclasses.field(default_factory=lambda: platform.system()) - os_version: str = dataclasses.field(default_factory=lambda: platform.version()) - machine: str = dataclasses.field(default_factory=lambda: platform.machine()) - processor: str = dataclasses.field(default_factory=lambda: platform.processor()) - cpu_count_logical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=True)) - cpu_count_physical: int | None = dataclasses.field(default_factory=lambda: psutil.cpu_count(logical=False)) - memory_gb: float = dataclasses.field(default_factory=lambda:round(psutil.virtual_memory().total / (1024**3), 2)) - start_time: str = dataclasses.field(default_factory=lambda: datetime.datetime.now().isoformat(timespec="seconds")) - git_commit: str = dataclasses.field(default_factory=_git_commit) - git_branch: str = dataclasses.field(default_factory=_git_branch) \ No newline at end of file diff --git a/src/ezmsg/util/perf/eval.py b/src/ezmsg/util/perf/run.py similarity index 72% rename from src/ezmsg/util/perf/eval.py rename to src/ezmsg/util/perf/run.py index f7f7e27b..d36dd02c 100644 --- a/src/ezmsg/util/perf/eval.py +++ b/src/ezmsg/util/perf/run.py @@ -1,12 +1,13 @@ import json import datetime import itertools +import argparse from dataclasses import dataclass from ..messagecodec import MessageEncoder -from .util import ( - TestEnvironmentInfo, +from .envinfo import TestEnvironmentInfo +from .impl import ( TestParameters, perform_test, Communication, @@ -19,11 +20,11 @@ def get_datestamp() -> str: return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @dataclass -class PerfEvalArgs: +class PerfRunArgs: duration: float num_buffers: int -def perf_eval(args: PerfEvalArgs) -> None: +def perf_run(args: PerfRunArgs) -> None: msg_sizes = [2 ** exp for exp in range(4, 25, 8)] n_clients = [2 ** exp for exp in range(0, 6, 2)] @@ -64,28 +65,25 @@ def perf_eval(args: PerfEvalArgs) -> None: out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") +def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: -def command() -> None: - import argparse - - parser = argparse.ArgumentParser() - - parser.add_argument( + p_run = subparsers.add_parser("run", help="run performance test") + p_run.add_argument( "--duration", type=float, default=2.0, - help="How long to run each load test (seconds) (default = 2.0)", + help="individual test duration in seconds (default = 2.0)", ) - - parser.add_argument( - "--num-buffers", - type=int, - default=32, - help="shared memory buffers (default = 32)" + p_run.add_argument( + "--num-buffers", + type=int, + default=32, + help="shared memory buffers (default = 32)", ) - args = parser.parse_args(namespace=PerfEvalArgs) - - perf_eval(args) - - + p_run.set_defaults(_handler=lambda ns: perf_run( + PerfRunArgs( + duration = ns.duration, + num_buffers = ns.num_buffers + ) + )) \ No newline at end of file From b8841180299e52bc560a53854cd8b15dbd398fa0 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sat, 20 Sep 2025 13:28:20 -0400 Subject: [PATCH 04/23] minor --- src/ezmsg/util/perf/ai.py | 111 ++++++++++++++++++++++++++++++++ src/ezmsg/util/perf/analysis.py | 38 ++++++++--- 2 files changed, 141 insertions(+), 8 deletions(-) create mode 100644 src/ezmsg/util/perf/ai.py diff --git a/src/ezmsg/util/perf/ai.py b/src/ezmsg/util/perf/ai.py new file mode 100644 index 00000000..a212b81c --- /dev/null +++ b/src/ezmsg/util/perf/ai.py @@ -0,0 +1,111 @@ +import os +import json +import textwrap +import urllib.request + +DEFAULT_TEST_DESCRIPTION = """\ +You are analyzing performance test results for the ezmsg pub/sub system. + +Configurations (config): +- fanin: Many publishers to one subscriber +- fanout: one publisher to many subscribers +- relay: one publisher to one subscriber through many relays + +Communication strategies (comms): +- local: all subs, relays, and pubs are in the SAME process +- shm / tcp: some clients move to a second process; comms via shared memory / TCP + * fanin: all publishers moved + * fanout: all subscribers moved + * relay: the publisher and all relay nodes moved +- shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively + +Variables: +- n_clients: pubs (fanin), subs (fanout), or relays (relay) +- msg_size: nominal message size (bytes) + +Metrics: +- sample_rate: messages/sec at the sink (higher = better) +- data_rate: bytes/sec at the sink (higher = better) +- latency_mean: average send -> receive latency in seconds (lower = better) + +Task: +Summarize performance test results. Explain trade-offs by comms/config, call out anomalies/outliers. +Keep the tone concise, technical, and actionable. +Please format output such that it will display nicely in a terminal output. + +If the rest results are a "PERFORMANCE COMPARISON": +- Metrics are in percentages (100.0 = 100 percent = no change) and do not reflect the ground-truth physical units. +- Summarize key improvements/regressions +- Performance differences +/- 5 percent are likely in the noise. +""" + +def chatgpt_analyze_results( + results_text: str, + *, + prompt: str | None = None, + model: str | None = None, + max_chars: int = 120_000, + temperature: float = 0.2, +) -> str: + """ + Send results + a test description to OpenAI's Responses API and print the analysis. + + Env vars: + - OPENAI_API_KEY (required) + - OPENAI_MODEL (optional; e.g., 'gpt-4o-mini' or a newer model) + """ + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise RuntimeError("Please set OPENAI_API_KEY in your environment.") + + model = model or os.getenv("OPENAI_MODEL", "gpt-4o-mini") + + # Keep requests reasonable in size + results_snippet = results_text if len(results_text) <= max_chars else ( + results_text[:max_chars] + "\n\n[...truncated for token budget...]" + ) + + # You can tweak the system instruction here to steer tone/format + system_instruction = "You are a senior performance engineer. Prefer precise, structured analysis." + + user_payload = textwrap.dedent(f"""\ + {prompt or DEFAULT_TEST_DESCRIPTION} + + === BEGIN RESULTS === + {results_snippet} + === END RESULTS === + """) + + body = { + "model": model, + "temperature": temperature, + "input": [ + {"role": "system", "content": [{"type": "text", "text": system_instruction}]}, + {"role": "user", "content": [{"type": "text", "text": user_payload}]}, + ], + } + + req = urllib.request.Request( + "https://api.openai.com/v1/responses", + data=json.dumps(body).encode("utf-8"), + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + method="POST", + ) + + with urllib.request.urlopen(req) as resp: + data = json.load(resp) + + # Robust extraction: prefer output_text; fall back to concatenating output content + text = data.get("output_text") + if not text: + parts = [] + for item in data.get("output", []) or []: + for c in item.get("content", []) or []: + if c.get("type") in ("output_text", "text") and "text" in c: + parts.append(c["text"]) + text = "\n".join(parts) if parts else json.dumps(data, indent=2) + + return text diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 0be97744..be0cb1e7 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -6,7 +6,8 @@ from pathlib import Path from ..messagecodec import MessageDecoder -from .envinfo import TestEnvironmentInfo +from .envinfo import TestEnvironmentInfo, format_env_diff +from .ai import chatgpt_analyze_results from .impl import ( TestParameters, Metrics, @@ -74,22 +75,37 @@ def load_perf(perf: Path) -> xr.Dataset: return dataset -def summary(perf_path: Path, baseline_path: Path | None) -> None: +def summary(perf_path: Path, baseline_path: Path | None, ai: bool = False) -> None: """ print perf test results and comparisons to the console """ + output = '' + perf = load_perf(perf_path) - info = perf.attrs['info'] + info: TestEnvironmentInfo = perf.attrs['info'] + output += str(info) + '\n\n' + + if baseline_path is not None: + output += "PERFORMANCE COMPARISON\n\n" baseline = load_perf(baseline_path) perf = (perf / baseline) * 100.0 + baseline_info: TestEnvironmentInfo = baseline.attrs['info'] + output += format_env_diff(info.diff(baseline_info)) + '\n\n' - print(info) + # These raw stats are still valuable to have, but are confusing + # when making relative comparisons + perf = perf.drop_vars(['latency_total', 'num_msgs']) for _, config_ds in perf.groupby('config'): for _, comms_ds in config_ds.groupby('comms'): - print(comms_ds.squeeze().to_dataframe()) - print("\n") - print("\n") + output += str(comms_ds.squeeze().to_dataframe()) + '\n\n' + output += '\n' + + print(output) + + if ai: + print('Querying ChatGPT for AI-assisted analysis of performance test results') + print(chatgpt_analyze_results(output)) def setup_summary_cmdline(subparsers: argparse._SubParsersAction) -> None: @@ -106,8 +122,14 @@ def setup_summary_cmdline(subparsers: argparse._SubParsersAction) -> None: default=None, help="baseline perf test for comparison", ) + p_summary.add_argument( + "--ai", + action="store_true", + help="ask chatgpt for an analysis of the results. requires OPENAI_API_KEY set in environment" + ) p_summary.set_defaults(_handler=lambda ns: summary( perf_path = ns.perf, - baseline_path = ns.baseline + baseline_path = ns.baseline, + ai = ns.ai )) \ No newline at end of file From 6beb4baf90f8b39794f8d56bfa46770f34296b1d Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sat, 20 Sep 2025 17:40:44 -0400 Subject: [PATCH 05/23] html analysis output; ai was a bust --- src/ezmsg/util/perf/ai.py | 111 ------------ src/ezmsg/util/perf/analysis.py | 300 ++++++++++++++++++++++++++++++-- src/ezmsg/util/perf/run.py | 23 +++ 3 files changed, 312 insertions(+), 122 deletions(-) delete mode 100644 src/ezmsg/util/perf/ai.py diff --git a/src/ezmsg/util/perf/ai.py b/src/ezmsg/util/perf/ai.py deleted file mode 100644 index a212b81c..00000000 --- a/src/ezmsg/util/perf/ai.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -import json -import textwrap -import urllib.request - -DEFAULT_TEST_DESCRIPTION = """\ -You are analyzing performance test results for the ezmsg pub/sub system. - -Configurations (config): -- fanin: Many publishers to one subscriber -- fanout: one publisher to many subscribers -- relay: one publisher to one subscriber through many relays - -Communication strategies (comms): -- local: all subs, relays, and pubs are in the SAME process -- shm / tcp: some clients move to a second process; comms via shared memory / TCP - * fanin: all publishers moved - * fanout: all subscribers moved - * relay: the publisher and all relay nodes moved -- shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively - -Variables: -- n_clients: pubs (fanin), subs (fanout), or relays (relay) -- msg_size: nominal message size (bytes) - -Metrics: -- sample_rate: messages/sec at the sink (higher = better) -- data_rate: bytes/sec at the sink (higher = better) -- latency_mean: average send -> receive latency in seconds (lower = better) - -Task: -Summarize performance test results. Explain trade-offs by comms/config, call out anomalies/outliers. -Keep the tone concise, technical, and actionable. -Please format output such that it will display nicely in a terminal output. - -If the rest results are a "PERFORMANCE COMPARISON": -- Metrics are in percentages (100.0 = 100 percent = no change) and do not reflect the ground-truth physical units. -- Summarize key improvements/regressions -- Performance differences +/- 5 percent are likely in the noise. -""" - -def chatgpt_analyze_results( - results_text: str, - *, - prompt: str | None = None, - model: str | None = None, - max_chars: int = 120_000, - temperature: float = 0.2, -) -> str: - """ - Send results + a test description to OpenAI's Responses API and print the analysis. - - Env vars: - - OPENAI_API_KEY (required) - - OPENAI_MODEL (optional; e.g., 'gpt-4o-mini' or a newer model) - """ - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - raise RuntimeError("Please set OPENAI_API_KEY in your environment.") - - model = model or os.getenv("OPENAI_MODEL", "gpt-4o-mini") - - # Keep requests reasonable in size - results_snippet = results_text if len(results_text) <= max_chars else ( - results_text[:max_chars] + "\n\n[...truncated for token budget...]" - ) - - # You can tweak the system instruction here to steer tone/format - system_instruction = "You are a senior performance engineer. Prefer precise, structured analysis." - - user_payload = textwrap.dedent(f"""\ - {prompt or DEFAULT_TEST_DESCRIPTION} - - === BEGIN RESULTS === - {results_snippet} - === END RESULTS === - """) - - body = { - "model": model, - "temperature": temperature, - "input": [ - {"role": "system", "content": [{"type": "text", "text": system_instruction}]}, - {"role": "user", "content": [{"type": "text", "text": user_payload}]}, - ], - } - - req = urllib.request.Request( - "https://api.openai.com/v1/responses", - data=json.dumps(body).encode("utf-8"), - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - method="POST", - ) - - with urllib.request.urlopen(req) as resp: - data = json.load(resp) - - # Robust extraction: prefer output_text; fall back to concatenating output content - text = data.get("output_text") - if not text: - parts = [] - for item in data.get("output", []) or []: - for c in item.get("content", []) or []: - if c.get("type") in ("output_text", "text") and "text" in c: - parts.append(c["text"]) - text = "\n".join(parts) if parts else json.dumps(data, indent=2) - - return text diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index be0cb1e7..2ddc7062 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -2,12 +2,16 @@ import typing import dataclasses import argparse +import html +import math +import webbrowser +import tempfile from pathlib import Path from ..messagecodec import MessageDecoder from .envinfo import TestEnvironmentInfo, format_env_diff -from .ai import chatgpt_analyze_results +from .run import get_datestamp from .impl import ( TestParameters, Metrics, @@ -17,6 +21,7 @@ try: import xarray as xr + import pandas as pd # xarray depends on pandas except ImportError: ez.logger.error('ezmsg perf analysis requires xarray') raise @@ -74,8 +79,186 @@ def load_perf(perf: Path) -> xr.Dataset: dataset = xr.Dataset(data_vars, attrs = dict(info = info)) return dataset +NOISE_BAND_PCT = 5.0 # +/-5% is "in the noise" for comparisons -def summary(perf_path: Path, baseline_path: Path | None, ai: bool = False) -> None: +def _escape(s: str) -> str: + return html.escape(str(s), quote=True) + +def _env_block(title: str, body: str) -> str: + return f""" +
+

{_escape(title)}

+
{_escape(body).strip()}
+
+ """ + +def _legend_block() -> str: + return """ +
+

Legend

+
    +
  • Comparison mode: values are percentages (100 = no change).
  • +
  • Noise band: ±5% considered negligible (no color).
  • +
  • Green: improvement (↑ sample/data rate, ↓ latency).
  • +
  • Red: regression (↓ sample/data rate, ↑ latency).
  • +
+
+ """ + +def _base_css() -> str: + # Minimal, print-friendly CSS + color scales for cells. + return """ + + """ + +def _color_for_comparison(value: float, metric: str) -> str: + """ + Returns inline CSS background for a comparison % value. + value: e.g., 97.3, 104.8, etc. + For sample_rate/data_rate: improvement > 100 (good). + For latency_mean: improvement < 100 (good). + Noise band ±5% around 100 is neutral. + """ + if not (isinstance(value, (int, float)) and math.isfinite(value)): + return "" + + delta = value - 100.0 + # Determine direction: + is good for sample/data; - is good for latency + if metric in ("sample_rate", "data_rate"): + # positive delta good, negative bad + magnitude = abs(delta) + sign_good = delta > 0 + elif metric == "latency_mean": + # negative delta good (lower latency) + magnitude = abs(delta) + sign_good = delta < 0 + else: + return "" + + # Noise band: keep neutral + if magnitude <= NOISE_BAND_PCT: + return "" + + # Scale 5%..50% across 0..1; clamp + scale = max(0.0, min(1.0, (magnitude - NOISE_BAND_PCT) / 45.0)) + + # Choose hue and lightness; use HSL with gentle saturation + hue = "var(--green)" if sign_good else "var(--red)" + # opacity via alpha blend on lightness via HSLa + # Use saturation ~70%, lightness around 40–50% blended with table bg + alpha = 0.15 + 0.35 * scale # 0.15..0.50 + return f"background-color: hsla({hue}, 70%, 45%, {alpha});" + +def _format_number(x) -> str: + if isinstance(x, (int,)) and not isinstance(x, bool): + return f"{x:d}" + try: + xf = float(x) + except Exception: + return _escape(str(x)) + # Heuristic: for comparison percentages, 1 decimal is nice; for absolute, 3 decimals for latency. + return f"{xf:.3f}" + + +def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> None: """ print perf test results and comparisons to the console """ output = '' @@ -84,17 +267,21 @@ def summary(perf_path: Path, baseline_path: Path | None, ai: bool = False) -> No info: TestEnvironmentInfo = perf.attrs['info'] output += str(info) + '\n\n' - + relative = False + env_diff = None if baseline_path is not None: + relative = True output += "PERFORMANCE COMPARISON\n\n" baseline = load_perf(baseline_path) perf = (perf / baseline) * 100.0 baseline_info: TestEnvironmentInfo = baseline.attrs['info'] - output += format_env_diff(info.diff(baseline_info)) + '\n\n' + env_diff = format_env_diff(info.diff(baseline_info)) + output += env_diff + '\n\n' # These raw stats are still valuable to have, but are confusing # when making relative comparisons perf = perf.drop_vars(['latency_total', 'num_msgs']) + df = perf.squeeze().to_dataframe() for _, config_ds in perf.groupby('config'): for _, comms_ds in config_ds.groupby('comms'): @@ -103,9 +290,100 @@ def summary(perf_path: Path, baseline_path: Path | None, ai: bool = False) -> No print(output) - if ai: - print('Querying ChatGPT for AI-assisted analysis of performance test results') - print(chatgpt_analyze_results(output)) + if html: + # Ensure expected columns exist + expected_cols = {"sample_rate", "data_rate", "latency_mean"} + missing = expected_cols - set(df.columns) + if missing: + raise ValueError(f"Missing expected columns in dataset: {missing}") + + # We'll render a table per (config, comms) group. + groups = df.reset_index().sort_values( + by=["config", "comms", "n_clients", "msg_size"] + ).groupby(["config", "comms"], sort=False) + + # Build HTML + parts: list[str] = [] + parts.append("") + parts.append("") + parts.append("ezmsg perf report") + parts.append(_base_css()) + parts.append("
") + + parts.append("
") + parts.append("

ezmsg Performance Report

") + sub = str(perf_path) + if baseline_path is not None: + sub += f' relative to {str(baseline_path)}' + parts.append(f"
{_escape(sub)}
") + parts.append("
") + + if info is not None: + parts.append(_env_block("Test Environment", str(info))) + + if env_diff is not None: + # Show diffs using your helper + parts.append("
") + parts.append("

Environment Differences vs Baseline

") + parts.append(f"
{_escape(env_diff)}
") + parts.append("
") + parts.append(_legend_block()) + + # Render each group + for (config, comms), g in groups: + # Keep only expected columns in order + cols = ["n_clients", "msg_size", "sample_rate", "data_rate", "latency_mean"] + g = g[cols].copy() + + # String format some columns (msg_size with separators) + g["msg_size"] = g["msg_size"].map(lambda x: f"{int(x):,}" if pd.notna(x) else x) + + # Build table manually so we can inject inline cell styles easily + # (pandas Styler is great but produces bulky HTML; manual keeps it clean) + header = f""" + + + n_clients + msg_size {'' if relative else '(b)'} + sample_rate {'' if relative else '(msgs/s)'} + data_rate {'' if relative else '(MB/s)'} + latency_mean {'' if relative else '(us)'} + + + """ + body_rows: list[str] = [] + for _, row in g.iterrows(): + sr, dr, lt = row["sample_rate"], row["data_rate"], row["latency_mean"] + dr = dr if relative else dr / 2**20 + lt = lt if relative else lt * 1e6 + sr_style = _color_for_comparison(sr, "sample_rate") if relative else "" + dr_style = _color_for_comparison(dr, "data_rate") if relative else "" + lt_style = _color_for_comparison(lt, "latency_mean") if relative else "" + + body_rows.append( + "" + f"{_format_number(row['n_clients'])}" + f"{_escape(row['msg_size'])}" + f"{_format_number(sr)}" + f"{_format_number(dr)}" + f"{_format_number(lt)}" + "" + ) + table_html = f"{header}{''.join(body_rows)}
" + + parts.append( + f"

" + f"{_escape(config)}" + f"{_escape(comms)}" + f"

{table_html}
" + ) + + parts.append("
") + html_text = "".join(parts) + + out_path = Path(f'report_{get_datestamp()}.html') + out_path.write_text(html_text, encoding="utf-8") + webbrowser.open(out_path.resolve().as_uri()) def setup_summary_cmdline(subparsers: argparse._SubParsersAction) -> None: @@ -123,13 +401,13 @@ def setup_summary_cmdline(subparsers: argparse._SubParsersAction) -> None: help="baseline perf test for comparison", ) p_summary.add_argument( - "--ai", - action="store_true", - help="ask chatgpt for an analysis of the results. requires OPENAI_API_KEY set in environment" + "--html", + action = 'store_true', + help = "generate an html output file and render results in browser", ) p_summary.set_defaults(_handler=lambda ns: summary( perf_path = ns.perf, baseline_path = ns.baseline, - ai = ns.ai + html = ns.html )) \ No newline at end of file diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index d36dd02c..1856f2a6 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -25,6 +25,29 @@ class PerfRunArgs: num_buffers: int def perf_run(args: PerfRunArgs) -> None: + """ + Configurations (config): + - fanin: Many publishers to one subscriber + - fanout: one publisher to many subscribers + - relay: one publisher to one subscriber through many relays + + Communication strategies (comms): + - local: all subs, relays, and pubs are in the SAME process + - shm / tcp: some clients move to a second process; comms via shared memory / TCP + * fanin: all publishers moved + * fanout: all subscribers moved + * relay: the publisher and all relay nodes moved + - shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively + + Variables: + - n_clients: pubs (fanin), subs (fanout), or relays (relay) + - msg_size: nominal message size (bytes) + + Metrics: + - sample_rate: messages/sec at the sink (higher = better) + - data_rate: bytes/sec at the sink (higher = better) + - latency_mean: average send -> receive latency in seconds (lower = better) + """ msg_sizes = [2 ** exp for exp in range(4, 25, 8)] n_clients = [2 ** exp for exp in range(0, 6, 2)] From a661ce5bef1d70be552338b9d2a52c92782aed23 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sat, 20 Sep 2025 17:49:44 -0400 Subject: [PATCH 06/23] report tweaks --- src/ezmsg/util/perf/analysis.py | 27 ++++++++++++++++++++++++++- src/ezmsg/util/perf/run.py | 24 ------------------------ 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 2ddc7062..9d33a068 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -5,7 +5,6 @@ import html import math import webbrowser -import tempfile from pathlib import Path @@ -32,6 +31,30 @@ ez.logger.error('ezmsg perf analysis requires numpy') raise +TEST_DESCRIPTION = """ +Configurations (config): +- fanin: many publishers to one subscriber +- fanout: one publisher to many subscribers +- relay: one publisher to one subscriber through many relays + +Communication strategies (comms): +- local: all subs, relays, and pubs are in the SAME process +- shm / tcp: some clients move to a second process; comms via shared memory / TCP + * fanin: all publishers moved + * fanout: all subscribers moved + * relay: the publisher and all relay nodes moved +- shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively + +Variables: +- n_clients: pubs (fanin), subs (fanout), or relays (relay) +- msg_size: nominal message size (bytes) + +Metrics: +- sample_rate: messages/sec at the sink (higher = better) +- data_rate: bytes/sec at the sink (higher = better) +- latency_mean: average send -> receive latency in seconds (lower = better) +""" + def load_perf(perf: Path) -> xr.Dataset: @@ -321,6 +344,8 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> if info is not None: parts.append(_env_block("Test Environment", str(info))) + parts.append(_env_block("Test Details", TEST_DESCRIPTION)) + if env_diff is not None: # Show diffs using your helper parts.append("
") diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 1856f2a6..d7280e51 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -25,30 +25,6 @@ class PerfRunArgs: num_buffers: int def perf_run(args: PerfRunArgs) -> None: - """ - Configurations (config): - - fanin: Many publishers to one subscriber - - fanout: one publisher to many subscribers - - relay: one publisher to one subscriber through many relays - - Communication strategies (comms): - - local: all subs, relays, and pubs are in the SAME process - - shm / tcp: some clients move to a second process; comms via shared memory / TCP - * fanin: all publishers moved - * fanout: all subscribers moved - * relay: the publisher and all relay nodes moved - - shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively - - Variables: - - n_clients: pubs (fanin), subs (fanout), or relays (relay) - - msg_size: nominal message size (bytes) - - Metrics: - - sample_rate: messages/sec at the sink (higher = better) - - data_rate: bytes/sec at the sink (higher = better) - - latency_mean: average send -> receive latency in seconds (lower = better) - """ - msg_sizes = [2 ** exp for exp in range(4, 25, 8)] n_clients = [2 ** exp for exp in range(0, 6, 2)] comms = [c for c in Communication] From 5464887ca95f58b75ec728ab1ef0defb49f53e0e Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Sun, 21 Sep 2025 09:52:18 -0400 Subject: [PATCH 07/23] added test iters --- src/ezmsg/util/perf/analysis.py | 23 ++++++++------- src/ezmsg/util/perf/impl.py | 5 ++++ src/ezmsg/util/perf/run.py | 52 +++++++++++++++++++-------------- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 9d33a068..681c9dac 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -14,6 +14,7 @@ from .impl import ( TestParameters, Metrics, + TestLogEntry, ) import ezmsg.core as ez @@ -59,15 +60,15 @@ def load_perf(perf: Path) -> xr.Dataset: params: typing.List[TestParameters] = [] - results: typing.List[Metrics] = [] + results: typing.List[typing.List[Metrics]] = [] with open(perf, 'r') as perf_f: info: TestEnvironmentInfo = json.loads(next(perf_f), cls = MessageDecoder) for line in perf_f: - obj = json.loads(line, cls = MessageDecoder) - params.append(obj['params']) - results.append(obj['results']) + obj: TestLogEntry = json.loads(line, cls = MessageDecoder) + params.append(obj.params) + results.append(obj.results) n_clients_axis = list(sorted(set([p.n_clients for p in params]))) msg_size_axis = list(sorted(set([p.msg_size for p in params]))) @@ -91,19 +92,20 @@ def load_perf(perf: Path) -> xr.Dataset: len(config_axis) )) * np.nan for p, r in zip(params, results): + # tests are run multiple times; get the median value for each metric + values = list(sorted([getattr(v, field.name) for v in r])) + value = values[len(values)//2] m[ n_clients_axis.index(p.n_clients), msg_size_axis.index(p.msg_size), comms_axis.index(p.comms), config_axis.index(p.config) - ] = getattr(r, field.name) + ] = value data_vars[field.name] = xr.DataArray(m, dims = dims, coords = coords) dataset = xr.Dataset(data_vars, attrs = dict(info = info)) return dataset -NOISE_BAND_PCT = 5.0 # +/-5% is "in the noise" for comparisons - def _escape(s: str) -> str: return html.escape(str(s), quote=True) @@ -121,7 +123,6 @@ def _legend_block() -> str:

Legend

  • Comparison mode: values are percentages (100 = no change).
  • -
  • Noise band: ±5% considered negligible (no color).
  • Green: improvement (↑ sample/data rate, ↓ latency).
  • Red: regression (↓ sample/data rate, ↑ latency).
@@ -232,7 +233,7 @@ def _base_css() -> str: """ -def _color_for_comparison(value: float, metric: str) -> str: +def _color_for_comparison(value: float, metric: str, noise_band_pct: float = 5.0) -> str: """ Returns inline CSS background for a comparison % value. value: e.g., 97.3, 104.8, etc. @@ -257,11 +258,11 @@ def _color_for_comparison(value: float, metric: str) -> str: return "" # Noise band: keep neutral - if magnitude <= NOISE_BAND_PCT: + if magnitude <= noise_band_pct: return "" # Scale 5%..50% across 0..1; clamp - scale = max(0.0, min(1.0, (magnitude - NOISE_BAND_PCT) / 45.0)) + scale = max(0.0, min(1.0, (magnitude - noise_band_pct) / 45.0)) # Choose hue and lightness; use HSL with gentle saturation hue = "var(--green)" if sign_good else "var(--red)" diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 58357b8c..5edc0474 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -313,3 +313,8 @@ class TestParameters: duration: float num_buffers: int + +@dataclasses.dataclass +class TestLogEntry: + params: TestParameters + results: typing.List[Metrics] \ No newline at end of file diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index d7280e51..599f3b02 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -8,7 +8,8 @@ from ..messagecodec import MessageEncoder from .envinfo import TestEnvironmentInfo from .impl import ( - TestParameters, + TestParameters, + TestLogEntry, perform_test, Communication, CONFIGS, @@ -19,12 +20,12 @@ def get_datestamp() -> str: return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") -@dataclass -class PerfRunArgs: - duration: float - num_buffers: int -def perf_run(args: PerfRunArgs) -> None: +def perf_run( + duration: float, + num_buffers: int, + iters: int, +) -> None: msg_sizes = [2 ** exp for exp in range(4, 25, 8)] n_clients = [2 ** exp for exp in range(0, 6, 2)] comms = [c for c in Communication] @@ -44,20 +45,22 @@ def perf_run(args: PerfRunArgs) -> None: n_clients = n_clients, config = config.__name__, comms = comms.value, - duration = args.duration, - num_buffers = args.num_buffers + duration = duration, + num_buffers = num_buffers ) - results = perform_test( - n_clients = n_clients, - duration = args.duration, - msg_size = msg_size, - buffers = args.num_buffers, - comms = comms, - config = config, - ) - - output = dict( + results = [ + perform_test( + n_clients = n_clients, + duration = duration, + msg_size = msg_size, + buffers = num_buffers, + comms = comms, + config = config, + ) for _ in range(iters) + ] + + output = TestLogEntry( params = params, results = results ) @@ -79,10 +82,15 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: default=32, help="shared memory buffers (default = 32)", ) + p_run.add_argument( + "--iters", "-i", + type = int, + default = 3, + help = "number of times to run each test" + ) p_run.set_defaults(_handler=lambda ns: perf_run( - PerfRunArgs( - duration = ns.duration, - num_buffers = ns.num_buffers - ) + duration = ns.duration, + num_buffers = ns.num_buffers, + iters = ns.iters )) \ No newline at end of file From 460ca9d230dcc55bc66dd1bf6236027e7296d734 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 13:06:30 -0400 Subject: [PATCH 08/23] more cmdline --- src/ezmsg/util/perf/impl.py | 8 ++- src/ezmsg/util/perf/run.py | 99 +++++++++++++++++++++++++++++++------ 2 files changed, 90 insertions(+), 17 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 5edc0474..6bb0ded7 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -190,7 +190,13 @@ def relay(config: ConfigSettings) -> Configuration: return relays, connections -CONFIGS: typing.Iterable[Configurator] = [fanin, fanout, relay] +CONFIGS: typing.Mapping[str, Configurator] = { + c.__name__: c for c in [ + fanin, + fanout, + relay + ] +} class Communication(enum.StrEnum): LOCAL = "local" diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 599f3b02..094389c2 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -2,8 +2,7 @@ import datetime import itertools import argparse - -from dataclasses import dataclass +import typing from ..messagecodec import MessageEncoder from .envinfo import TestEnvironmentInfo @@ -17,6 +16,10 @@ import ezmsg.core as ez +DEFAULT_MSG_SIZES = [2 ** exp for exp in range(4, 25, 8)] +DEFAULT_N_CLIENTS = [2 ** exp for exp in range(0, 6, 2)] +DEFAULT_COMMS = [c for c in Communication] + def get_datestamp() -> str: return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -25,38 +28,62 @@ def perf_run( duration: float, num_buffers: int, iters: int, + msg_sizes: typing.Iterable[int] | None, + n_clients: typing.Iterable[int] | None, + comms: typing.Iterable[str] | None, + configs: typing.Iterable[str] | None, ) -> None: - msg_sizes = [2 ** exp for exp in range(4, 25, 8)] - n_clients = [2 ** exp for exp in range(0, 6, 2)] - comms = [c for c in Communication] - - test_list = list(itertools.product(msg_sizes, n_clients, CONFIGS, comms)) + + if n_clients is None: + n_clients = DEFAULT_N_CLIENTS + if any(c <= 0 for c in n_clients): + ez.logger.error('All tests must have >0 clients') + return + + if msg_sizes is None: + msg_sizes = DEFAULT_MSG_SIZES + if any(s <= 0 for s in msg_sizes): + ez.logger.error('All msg_sizes must be >0 bytes') + + try: + communications = DEFAULT_COMMS if comms is None else [Communication(c) for c in comms] + except ValueError: + ez.logger.error(f"Invalid test communications requested. Valid communications: {', '.join([c.value for c in Communication])}") + return + + try: + configurators = list(CONFIGS.values()) if configs is None else [CONFIGS[c] for c in configs] + except ValueError: + ez.logger.error(f"Invalid test configuration requested. Valid configurations: {', '.join([c for c in CONFIGS])}") + return + + test_list = list(itertools.product(msg_sizes, n_clients, configurators, communications)) with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") - for test_idx, (msg_size, n_clients, config, comms) in enumerate(test_list): + for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): ez.logger.info(f"RUNNING TEST {test_idx + 1} / {len(test_list)} ({(test_idx / len(test_list)) * 100.0:0.2f} %)") params = TestParameters( msg_size = msg_size, - n_clients = n_clients, - config = config.__name__, - comms = comms.value, + n_clients = clients, + config = conf.__name__, + comms = comm.value, duration = duration, num_buffers = num_buffers ) results = [ perform_test( - n_clients = n_clients, + n_clients = clients, duration = duration, msg_size = msg_size, buffers = num_buffers, - comms = comms, - config = config, + comms = comm, + config = conf, ) for _ in range(iters) ] @@ -70,27 +97,67 @@ def perf_run( def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: p_run = subparsers.add_parser("run", help="run performance test") + p_run.add_argument( "--duration", type=float, default=2.0, help="individual test duration in seconds (default = 2.0)", ) + p_run.add_argument( "--num-buffers", type=int, default=32, help="shared memory buffers (default = 32)", ) + p_run.add_argument( "--iters", "-i", type = int, default = 3, - help = "number of times to run each test" + help = "number of times to run each test (default = 3)" ) + p_run.add_argument( + "--msg-sizes", + type = int, + default = None, + nargs = "*", + help = f"message sizes in bytes (default = {DEFAULT_MSG_SIZES})" + ) + + p_run.add_argument( + "--n-clients", + type = int, + default = None, + nargs = "*", + help = f"number of clients (default = {DEFAULT_N_CLIENTS})" + ) + + p_run.add_argument( + "--comms", + type = str, + default = None, + nargs = "*", + help = f"communication strategies to test (default = {[c.value for c in DEFAULT_COMMS]})" + ) + + p_run.add_argument( + "--configs", + type = str, + default = None, + nargs = "*", + help = f"configurations to test (default = {[c for c in CONFIGS]})" + ) + + p_run.set_defaults(_handler=lambda ns: perf_run( duration = ns.duration, num_buffers = ns.num_buffers, - iters = ns.iters + iters = ns.iters, + msg_sizes = ns.msg_sizes, + n_clients = ns.n_clients, + comms = ns.comms, + configs = ns.configs, )) \ No newline at end of file From 173c09609941b72b80aae478c674da25904f9035 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 15:00:11 -0400 Subject: [PATCH 09/23] changing sample_rate calculation --- src/ezmsg/util/perf/impl.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 6bb0ded7..8421a51b 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -264,10 +264,10 @@ def perform_test( process_components = process_components, ) - return calculate_metrics(sink) + return calculate_metrics(sink, duration) -def calculate_metrics(sink: LoadTestSink) -> Metrics: +def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: # Log some useful summary statistics min_timestamp = min(timestamp for timestamp, _, _ in sink.STATE.received_data) @@ -286,7 +286,7 @@ def calculate_metrics(sink: LoadTestSink) -> Metrics: num_samples = len(sink.STATE.received_data) ez.logger.info(f"Samples received: {num_samples}") - sample_rate = num_samples / (max_timestamp - min_timestamp) + sample_rate = num_samples / duration ez.logger.info(f"Sample rate: {sample_rate} Hz") latency_mean = total_latency / num_samples ez.logger.info(f"Mean latency: {latency_mean} s") From d17a4fe17dfc229d79c6e4ded3473d4a4639dce7 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 16:52:50 -0400 Subject: [PATCH 10/23] fix: n-clients = 0 is useful and works --- src/ezmsg/util/perf/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 094389c2..3ff70e91 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -36,8 +36,8 @@ def perf_run( if n_clients is None: n_clients = DEFAULT_N_CLIENTS - if any(c <= 0 for c in n_clients): - ez.logger.error('All tests must have >0 clients') + if any(c < 0 for c in n_clients): + ez.logger.error('All tests must have >=0 clients') return if msg_sizes is None: From 01212d3608d4dbdcba1e1ea34c8d556e6af2230f Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 19:10:46 -0400 Subject: [PATCH 11/23] better support for msg_size = 0 and n_clients = 0 --- src/ezmsg/util/perf/impl.py | 10 ++++++---- src/ezmsg/util/perf/run.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 8421a51b..faec5d19 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -183,10 +183,12 @@ def relay(config: ConfigSettings) -> Configuration: connections: ez.NetworkDefinition = [] relays = [LoadTestRelay(config.settings) for _ in range(config.n_clients)] - connections.append((config.source.OUTPUT, relays[0].INPUT)) - for from_relay, to_relay in zip(relays[:-1], relays[1:]): - connections.append((from_relay.OUTPUT, to_relay.INPUT)) - connections.append((relays[-1].OUTPUT, config.sink.INPUT)) + if len(relays): + connections.append((config.source.OUTPUT, relays[0].INPUT)) + for from_relay, to_relay in zip(relays[:-1], relays[1:]): + connections.append((from_relay.OUTPUT, to_relay.INPUT)) + connections.append((relays[-1].OUTPUT, config.sink.INPUT)) + else: connections.append((config.source.OUTPUT, config.sink.INPUT)) return relays, connections diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 3ff70e91..80bf80cb 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -42,8 +42,8 @@ def perf_run( if msg_sizes is None: msg_sizes = DEFAULT_MSG_SIZES - if any(s <= 0 for s in msg_sizes): - ez.logger.error('All msg_sizes must be >0 bytes') + if any(s < 0 for s in msg_sizes): + ez.logger.error('All msg_sizes must be >=0 bytes') try: communications = DEFAULT_COMMS if comms is None else [Communication(c) for c in comms] From 30234ae07827666c5f26645a33fd31179de584a1 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 19:49:13 -0400 Subject: [PATCH 12/23] also force_tcp on tcp_spread oops --- src/ezmsg/util/perf/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index faec5d19..114a8b7a 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -220,7 +220,7 @@ def perform_test( dynamic_size = int(msg_size), duration = duration, buffers = buffers, - force_tcp = (comms == Communication.TCP), + force_tcp = (comms in (Communication.TCP, Communication.TCP_SPREAD)), ) source = LoadTestSource(settings) From e8d1a3dbe1d9cd6bcb7853ca557bb684f04f1934 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 22 Sep 2025 20:41:09 -0400 Subject: [PATCH 13/23] viztracer is useful for profiling --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 961271b4..c0888f86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dev = [ {include-group = "lint"}, {include-group = "test"}, "pre-commit>=4.3.0", + "viztracer>=1.0.4", ] lint = [ "flake8>=7.3.0", From a7913656790da097fbbd119036923ed185df6087 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 23 Sep 2025 09:22:54 -0400 Subject: [PATCH 14/23] fix for rare exception on system shutdown --- src/ezmsg/core/subclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ezmsg/core/subclient.py b/src/ezmsg/core/subclient.py index 50e1aa0c..9a28d9ee 100644 --- a/src/ezmsg/core/subclient.py +++ b/src/ezmsg/core/subclient.py @@ -210,7 +210,7 @@ async def _handle_publisher( self._incoming.put_nowait((id, msg_id)) - except (ConnectionResetError, BrokenPipeError): + except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError): logger.debug(f"connection fail: sub:{self.id} -> pub:{id}") finally: From 77e38e9e48c2e9b289d424ff4cd78a46ec575e27 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 23 Sep 2025 13:09:54 -0400 Subject: [PATCH 15/23] added median latency --- src/ezmsg/util/perf/impl.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 114a8b7a..99d11046 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -41,6 +41,7 @@ class Metrics: num_msgs: int sample_rate: float latency_mean: float + latency_median: float latency_total: float data_rate: float @@ -274,12 +275,11 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: # Log some useful summary statistics min_timestamp = min(timestamp for timestamp, _, _ in sink.STATE.received_data) max_timestamp = max(timestamp for timestamp, _, _ in sink.STATE.received_data) - total_latency = abs( - sum( - receive_timestamp - send_timestamp - for send_timestamp, receive_timestamp, _ in sink.STATE.received_data - ) - ) + latency = [ + receive_timestamp - send_timestamp + for send_timestamp, receive_timestamp, _ in sink.STATE.received_data + ] + total_latency = abs(sum(latency)) counters = list(sorted(t[2] for t in sink.STATE.received_data)) dropped_samples = sum( @@ -291,7 +291,9 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: sample_rate = num_samples / duration ez.logger.info(f"Sample rate: {sample_rate} Hz") latency_mean = total_latency / num_samples + latency_median = list(sorted(latency))[len(latency) // 2] ez.logger.info(f"Mean latency: {latency_mean} s") + ez.logger.info(f"Median latency: {latency_median} s") ez.logger.info(f"Total latency: {total_latency} s") total_data = num_samples * sink.SETTINGS.dynamic_size @@ -307,6 +309,7 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: num_msgs = num_samples, sample_rate = sample_rate, latency_mean = latency_mean, + latency_median = latency_median, latency_total = total_latency, data_rate = data_rate ) From 4693d5cfd5d04d52300468ad77297ab6a936c3c4 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 23 Sep 2025 15:30:46 -0400 Subject: [PATCH 16/23] added median latency to performance report --- src/ezmsg/util/perf/analysis.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 681c9dac..ed729187 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -246,11 +246,11 @@ def _color_for_comparison(value: float, metric: str, noise_band_pct: float = 5.0 delta = value - 100.0 # Determine direction: + is good for sample/data; - is good for latency - if metric in ("sample_rate", "data_rate"): + if 'rate' in metric: # positive delta good, negative bad magnitude = abs(delta) sign_good = delta > 0 - elif metric == "latency_mean": + elif 'latency' in metric: # negative delta good (lower latency) magnitude = abs(delta) sign_good = delta < 0 @@ -316,7 +316,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> if html: # Ensure expected columns exist - expected_cols = {"sample_rate", "data_rate", "latency_mean"} + expected_cols = {"sample_rate", "data_rate", "latency_mean", "latency_median"} missing = expected_cols - set(df.columns) if missing: raise ValueError(f"Missing expected columns in dataset: {missing}") @@ -358,7 +358,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> # Render each group for (config, comms), g in groups: # Keep only expected columns in order - cols = ["n_clients", "msg_size", "sample_rate", "data_rate", "latency_mean"] + cols = ["n_clients", "msg_size", "sample_rate", "data_rate", "latency_mean", "latency_median"] g = g[cols].copy() # String format some columns (msg_size with separators) @@ -374,17 +374,20 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> sample_rate {'' if relative else '(msgs/s)'} data_rate {'' if relative else '(MB/s)'} latency_mean {'' if relative else '(us)'} + latency_median {'' if relative else '(us)'} """ body_rows: list[str] = [] for _, row in g.iterrows(): - sr, dr, lt = row["sample_rate"], row["data_rate"], row["latency_mean"] + sr, dr, lt, lm = row["sample_rate"], row["data_rate"], row["latency_mean"], row["latency_median"] dr = dr if relative else dr / 2**20 lt = lt if relative else lt * 1e6 + lm = lm if relative else lm * 1e6 sr_style = _color_for_comparison(sr, "sample_rate") if relative else "" dr_style = _color_for_comparison(dr, "data_rate") if relative else "" lt_style = _color_for_comparison(lt, "latency_mean") if relative else "" + lm_style = _color_for_comparison(lm, "latency_median") if relative else "" body_rows.append( "" @@ -393,6 +396,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> f"{_format_number(sr)}" f"{_format_number(dr)}" f"{_format_number(lt)}" + f"{_format_number(lm)}" "" ) table_html = f"{header}{''.join(body_rows)}
" From e8f49947cb71eddb1b0f8970bca3950770ad0563 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 23 Sep 2025 16:07:10 -0400 Subject: [PATCH 17/23] try using time.time --- src/ezmsg/util/perf/impl.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 99d11046..9fdfe740 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -75,16 +75,16 @@ async def initialize(self) -> None: @ez.publisher(OUTPUT) async def publish(self) -> typing.AsyncGenerator: ez.logger.info(f"Load test publisher started. (PID: {os.getpid()})") - start_time = time.perf_counter() + start_time = time.time() while self.running: - current_time = time.perf_counter() + current_time = time.time() if current_time - start_time >= self.SETTINGS.duration: break yield ( self.OUTPUT, LoadTestSample( - _timestamp=time.perf_counter(), + _timestamp=time.time(), counter=self.counter, dynamic_data=np.zeros( int(self.SETTINGS.dynamic_size // 8), dtype=np.float32 @@ -133,7 +133,7 @@ async def receive(self, sample: LoadTestSample) -> None: f"{sample.counter - counter - 1} samples skipped!" ) self.STATE.received_data.append( - (sample._timestamp, time.perf_counter(), sample.counter) + (sample._timestamp, time.time(), sample.counter) ) self.STATE.counters[sample.key] = sample.counter From dbd48b1b81a2f655700a05bcd60ac41dfcb6e00b Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 28 Oct 2025 11:56:31 -0400 Subject: [PATCH 18/23] more diagnostic test results in less time --- src/ezmsg/util/perf/command.py | 3 +++ src/ezmsg/util/perf/run.py | 27 ++++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py index a100863d..ab919999 100644 --- a/src/ezmsg/util/perf/command.py +++ b/src/ezmsg/util/perf/command.py @@ -12,3 +12,6 @@ def command() -> None: ns = parser.parse_args() ns._handler(ns) + +if __name__ == "__main__": + command() diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 80bf80cb..855a94f2 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -3,6 +3,7 @@ import itertools import argparse import typing +import random from ..messagecodec import MessageEncoder from .envinfo import TestEnvironmentInfo @@ -32,6 +33,7 @@ def perf_run( n_clients: typing.Iterable[int] | None, comms: typing.Iterable[str] | None, configs: typing.Iterable[str] | None, + grid: bool, ) -> None: if n_clients is None: @@ -45,6 +47,13 @@ def perf_run( if any(s < 0 for s in msg_sizes): ez.logger.error('All msg_sizes must be >=0 bytes') + if not grid and len(list(n_clients)) != len(list(msg_sizes)): + ez.logger.warning( + "Not performing a grid test of all combinations of n_clients and msg_sizes, but " + \ + "len(n_clients) != len(msg_sizes). " + \ + "If you want to perform all combinations of n_clients and msg_sizes, use --grid" + ) + try: communications = DEFAULT_COMMS if comms is None else [Communication(c) for c in comms] except ValueError: @@ -56,8 +65,16 @@ def perf_run( except ValueError: ez.logger.error(f"Invalid test configuration requested. Valid configurations: {', '.join([c for c in CONFIGS])}") return + + subitr = itertools.product if grid else zip + + test_list = [ + (clients, msg_size, config, comms) + for clients, msg_size in subitr(n_clients, msg_sizes) + for config, comms in itertools.product(configurators, communications) + ] - test_list = list(itertools.product(msg_sizes, n_clients, configurators, communications)) + random.shuffle(test_list) with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: @@ -151,6 +168,13 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: help = f"configurations to test (default = {[c for c in CONFIGS]})" ) + p_run.add_argument( + "--grid", + action = "store_true", + help = "perform all combinations of msg_sizes and n_clients " + \ + "(default: False; msg_sizes and n_clients must match in length)" + ) + p_run.set_defaults(_handler=lambda ns: perf_run( duration = ns.duration, @@ -160,4 +184,5 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: n_clients = ns.n_clients, comms = ns.comms, configs = ns.configs, + grid = ns.grid )) \ No newline at end of file From 7fcfaa864427427eaaeb3db025bd7902d5b56c3a Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 28 Oct 2025 16:25:10 -0400 Subject: [PATCH 19/23] early quit and test randomization --- src/ezmsg/util/perf/analysis.py | 21 +++--- src/ezmsg/util/perf/impl.py | 4 +- src/ezmsg/util/perf/run.py | 113 ++++++++++++++++++++++++-------- 3 files changed, 101 insertions(+), 37 deletions(-) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index ed729187..c8699ab9 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -59,21 +59,21 @@ def load_perf(perf: Path) -> xr.Dataset: - params: typing.List[TestParameters] = [] - results: typing.List[typing.List[Metrics]] = [] + all_results: typing.Dict[TestParameters, typing.List[Metrics]] = dict() with open(perf, 'r') as perf_f: info: TestEnvironmentInfo = json.loads(next(perf_f), cls = MessageDecoder) for line in perf_f: obj: TestLogEntry = json.loads(line, cls = MessageDecoder) - params.append(obj.params) - results.append(obj.results) + metrics = all_results.get(obj.params, list()) + metrics.append(obj.results) + all_results[obj.params] = metrics - n_clients_axis = list(sorted(set([p.n_clients for p in params]))) - msg_size_axis = list(sorted(set([p.msg_size for p in params]))) - comms_axis = list(sorted(set([p.comms for p in params]))) - config_axis = list(sorted(set([p.config for p in params]))) + n_clients_axis = list(sorted(set([p.n_clients for p in all_results.keys()]))) + msg_size_axis = list(sorted(set([p.msg_size for p in all_results.keys()]))) + comms_axis = list(sorted(set([p.comms for p in all_results.keys()]))) + config_axis = list(sorted(set([p.config for p in all_results.keys()]))) dims = ['n_clients', 'msg_size', 'comms', 'config'] coords = { @@ -91,7 +91,7 @@ def load_perf(perf: Path) -> xr.Dataset: len(comms_axis), len(config_axis) )) * np.nan - for p, r in zip(params, results): + for p, r in all_results.items(): # tests are run multiple times; get the median value for each metric values = list(sorted([getattr(v, field.name) for v in r])) value = values[len(values)//2] @@ -305,7 +305,10 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> # These raw stats are still valuable to have, but are confusing # when making relative comparisons perf = perf.drop_vars(['latency_total', 'num_msgs']) + perf = perf.stack(params = ['n_clients', 'msg_size']).dropna('params') df = perf.squeeze().to_dataframe() + df = df.drop('n_clients', axis = 1) + df = df.drop('msg_size', axis = 1) for _, config_ds in perf.groupby('config'): for _, comms_ds in config_ds.groupby('comms'): diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 9fdfe740..289dde8c 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -315,7 +315,7 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: ) -@dataclasses.dataclass +@dataclasses.dataclass(unsafe_hash=True) class TestParameters: msg_size: int n_clients: int @@ -328,4 +328,4 @@ class TestParameters: @dataclasses.dataclass class TestLogEntry: params: TestParameters - results: typing.List[Metrics] \ No newline at end of file + results: Metrics \ No newline at end of file diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 855a94f2..8ae5eda7 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -1,9 +1,14 @@ +import os +import sys import json import datetime import itertools import argparse import typing import random +import time + +from contextlib import contextmanager, redirect_stdout, redirect_stderr from ..messagecodec import MessageEncoder from .envinfo import TestEnvironmentInfo @@ -21,10 +26,66 @@ DEFAULT_N_CLIENTS = [2 ** exp for exp in range(0, 6, 2)] DEFAULT_COMMS = [c for c in Communication] +# --- Output Suppression Context Manager (from the previous solution) --- +@contextmanager +def suppress_output(verbose: bool = False): + """Context manager to redirect stdout and stderr to os.devnull""" + if verbose: + yield + else: + # Open the null device for writing + with open(os.devnull, 'w') as fnull: + # Redirect both stdout and stderr to the null device + with redirect_stderr(fnull): + with redirect_stdout(fnull): + yield + +CHECK_FOR_QUIT = lambda: False + +if sys.platform.startswith('win'): + import msvcrt + def _check_for_quit_win() -> bool: + """ + Checks for the 'q' key press in a non-blocking way. + Returns True if 'q' is pressed (case-insensitive), False otherwise. + """ + # Windows: Use msvcrt for non-blocking keyboard hit detection + if msvcrt.kbhit(): # type: ignore + # Read the key press (returns bytes) + key = msvcrt.getch() # type: ignore + try: + # Decode and check for 'q' + return key.decode().lower() == 'q' + except UnicodeDecodeError: + # Handle potential non-text key presses gracefully + return False + return False + + CHECK_FOR_QUIT = _check_for_quit_win + +else: + import select + def _check_for_quit() -> bool: + """ + Checks for the 'q' key press in a non-blocking way. + Returns True if 'q' is pressed (case-insensitive), False otherwise. + """ + # Linux/macOS: Use select to check if stdin has data + # select.select(rlist, wlist, xlist, timeout) + # timeout=0 makes it non-blocking + if sys.stdin.isatty(): + i, o, e = select.select([sys.stdin], [], [], 0) # type: ignore + if i: + # Read the buffered character + key = sys.stdin.read(1) + return key.lower() == 'q' + return False + + CHECK_FOR_QUIT = _check_for_quit + def get_datestamp() -> str: return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - def perf_run( duration: float, num_buffers: int, @@ -69,10 +130,10 @@ def perf_run( subitr = itertools.product if grid else zip test_list = [ - (clients, msg_size, config, comms) - for clients, msg_size in subitr(n_clients, msg_sizes) - for config, comms in itertools.product(configurators, communications) - ] + (msg_size, clients, conf, comm) + for msg_size, clients in subitr(msg_sizes, n_clients) + for conf, comm in itertools.product(configurators, communications) + ] * iters random.shuffle(test_list) @@ -80,33 +141,34 @@ def perf_run( out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") + ez.logger.info("Starting perf tests. Press 'q' + enter to quit tests early.") + time.sleep(3.0) # Give user an opportunity to read message. + for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): - ez.logger.info(f"RUNNING TEST {test_idx + 1} / {len(test_list)} ({(test_idx / len(test_list)) * 100.0:0.2f} %)") - - params = TestParameters( - msg_size = msg_size, - n_clients = clients, - config = conf.__name__, - comms = comm.value, - duration = duration, - num_buffers = num_buffers - ) - - results = [ - perform_test( + if CHECK_FOR_QUIT(): + ez.logger.info("Stopping tests early...") + break + + ez.logger.info(f"TEST {test_idx + 1}/{len(test_list)}: {clients=}, {msg_size=}, conf={conf.__name__}, comm={comm.value}") + + output = TestLogEntry( + params = TestParameters( + msg_size = msg_size, + n_clients = clients, + config = conf.__name__, + comms = comm.value, + duration = duration, + num_buffers = num_buffers + ), + results = perform_test( n_clients = clients, duration = duration, msg_size = msg_size, buffers = num_buffers, comms = comm, config = conf, - ) for _ in range(iters) - ] - - output = TestLogEntry( - params = params, - results = results + ) ) out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") @@ -175,7 +237,6 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: "(default: False; msg_sizes and n_clients must match in length)" ) - p_run.set_defaults(_handler=lambda ns: perf_run( duration = ns.duration, num_buffers = ns.num_buffers, @@ -184,5 +245,5 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: n_clients = ns.n_clients, comms = ns.comms, configs = ns.configs, - grid = ns.grid + grid = ns.grid, )) \ No newline at end of file From 2489c783e95686c75aba3d5833d8d4455274902d Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Wed, 5 Nov 2025 11:45:00 -0500 Subject: [PATCH 20/23] attempting to stabilize perf test results --- src/ezmsg/util/perf/analysis.py | 28 ++-- src/ezmsg/util/perf/impl.py | 40 +++-- src/ezmsg/util/perf/run.py | 111 +++++++----- src/ezmsg/util/perf/util.py | 288 ++++++++++++++++++++++++++++++++ 4 files changed, 399 insertions(+), 68 deletions(-) create mode 100644 src/ezmsg/util/perf/util.py diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index c8699ab9..7f731e6d 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -93,14 +93,12 @@ def load_perf(perf: Path) -> xr.Dataset: )) * np.nan for p, r in all_results.items(): # tests are run multiple times; get the median value for each metric - values = list(sorted([getattr(v, field.name) for v in r])) - value = values[len(values)//2] m[ n_clients_axis.index(p.n_clients), msg_size_axis.index(p.msg_size), comms_axis.index(p.comms), config_axis.index(p.config) - ] = value + ] = np.median([getattr(v, field.name) for v in r]) data_vars[field.name] = xr.DataArray(m, dims = dims, coords = coords) dataset = xr.Dataset(data_vars, attrs = dict(info = info)) @@ -233,13 +231,13 @@ def _base_css() -> str: """ -def _color_for_comparison(value: float, metric: str, noise_band_pct: float = 5.0) -> str: +def _color_for_comparison(value: float, metric: str, noise_band_pct: float = 10.0) -> str: """ Returns inline CSS background for a comparison % value. value: e.g., 97.3, 104.8, etc. For sample_rate/data_rate: improvement > 100 (good). For latency_mean: improvement < 100 (good). - Noise band ±5% around 100 is neutral. + Noise band ±10% around 100 is neutral. """ if not (isinstance(value, (int, float)) and math.isfinite(value)): return "" @@ -302,9 +300,10 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> env_diff = format_env_diff(info.diff(baseline_info)) output += env_diff + '\n\n' - # These raw stats are still valuable to have, but are confusing - # when making relative comparisons - perf = perf.drop_vars(['latency_total', 'num_msgs']) + # These raw stats are still valuable to have, but are confusing + # when making relative comparisons + perf = perf.drop_vars(['latency_total', 'num_msgs']) + perf = perf.stack(params = ['n_clients', 'msg_size']).dropna('params') df = perf.squeeze().to_dataframe() df = df.drop('n_clients', axis = 1) @@ -319,7 +318,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> if html: # Ensure expected columns exist - expected_cols = {"sample_rate", "data_rate", "latency_mean", "latency_median"} + expected_cols = {"sample_rate_mean", "sample_rate_median", "data_rate", "latency_mean", "latency_median"} missing = expected_cols - set(df.columns) if missing: raise ValueError(f"Missing expected columns in dataset: {missing}") @@ -361,7 +360,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> # Render each group for (config, comms), g in groups: # Keep only expected columns in order - cols = ["n_clients", "msg_size", "sample_rate", "data_rate", "latency_mean", "latency_median"] + cols = ["n_clients", "msg_size", "sample_rate_mean", "sample_rate_median", "data_rate", "latency_mean", "latency_median"] g = g[cols].copy() # String format some columns (msg_size with separators) @@ -374,7 +373,8 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> n_clients msg_size {'' if relative else '(b)'} - sample_rate {'' if relative else '(msgs/s)'} + sample_rate_mean {'' if relative else '(msgs/s)'} + sample_rate_median {'' if relative else '(msgs/s)'} data_rate {'' if relative else '(MB/s)'} latency_mean {'' if relative else '(us)'} latency_median {'' if relative else '(us)'} @@ -383,11 +383,12 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> """ body_rows: list[str] = [] for _, row in g.iterrows(): - sr, dr, lt, lm = row["sample_rate"], row["data_rate"], row["latency_mean"], row["latency_median"] + sr, srm, dr, lt, lm = row["sample_rate_mean"], row["sample_rate_median"], row["data_rate"], row["latency_mean"], row["latency_median"] dr = dr if relative else dr / 2**20 lt = lt if relative else lt * 1e6 lm = lm if relative else lm * 1e6 - sr_style = _color_for_comparison(sr, "sample_rate") if relative else "" + sr_style = _color_for_comparison(sr, "sample_rate_mean") if relative else "" + srm_style = _color_for_comparison(srm, "sample_rate_median") if relative else "" dr_style = _color_for_comparison(dr, "data_rate") if relative else "" lt_style = _color_for_comparison(lt, "latency_mean") if relative else "" lm_style = _color_for_comparison(lm, "latency_median") if relative else "" @@ -397,6 +398,7 @@ def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> f"{_format_number(row['n_clients'])}" f"{_escape(row['msg_size'])}" f"{_format_number(sr)}" + f"{_format_number(srm)}" f"{_format_number(dr)}" f"{_format_number(lt)}" f"{_format_number(lm)}" diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 289dde8c..4788cc2c 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -7,12 +7,16 @@ import ezmsg.core as ez +from ezmsg.core.netprotocol import Address + try: import numpy as np except ImportError: ez.logger.error("ezmsg perf requires numpy") raise +from .util import stable_perf + def collect( components: typing.Optional[typing.Mapping[str, ez.Component]] = None, @@ -39,7 +43,8 @@ def collect( @dataclasses.dataclass class Metrics: num_msgs: int - sample_rate: float + sample_rate_mean: float + sample_rate_median: float latency_mean: float latency_median: float latency_total: float @@ -214,7 +219,8 @@ def perform_test( msg_size: int, buffers: int, comms: Communication, - config: Configurator + config: Configurator, + graph_address: Address ) -> Metrics: settings = LoadTestSettings( @@ -261,11 +267,13 @@ def perform_test( components["PROC"] = proc_collection process_components = [proc_collection] - ez.run( - components = components, - connections = connections, - process_components = process_components, - ) + with stable_perf(): + ez.run( + components = components, + connections = connections, + process_components = process_components, + graph_address = graph_address + ) return calculate_metrics(sink, duration) @@ -286,18 +294,21 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: [max((x1 - x0) - 1, 0) for x1, x0 in zip(counters[1:], counters[:-1])] ) + rx_timestamps = np.array([rx_ts for _, rx_ts, _ in sink.STATE.received_data]) num_samples = len(sink.STATE.received_data) - ez.logger.info(f"Samples received: {num_samples}") - sample_rate = num_samples / duration - ez.logger.info(f"Sample rate: {sample_rate} Hz") + samplerate_mean = num_samples / duration + samplerate_median = 1.0 / float(np.median(np.diff(rx_timestamps))) latency_mean = total_latency / num_samples latency_median = list(sorted(latency))[len(latency) // 2] + total_data = num_samples * sink.SETTINGS.dynamic_size + data_rate = total_data / (max_timestamp - min_timestamp) + + ez.logger.info(f"Samples received: {num_samples}") + ez.logger.info(f"Mean sample rate: {samplerate_mean} Hz") + ez.logger.info(f"Median sample rate: {samplerate_median} Hz") ez.logger.info(f"Mean latency: {latency_mean} s") ez.logger.info(f"Median latency: {latency_median} s") ez.logger.info(f"Total latency: {total_latency} s") - - total_data = num_samples * sink.SETTINGS.dynamic_size - data_rate = total_data / (max_timestamp - min_timestamp) ez.logger.info(f"Data rate: {data_rate * 1e-6} MB/s") if dropped_samples: @@ -307,7 +318,8 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: return Metrics( num_msgs = num_samples, - sample_rate = sample_rate, + sample_rate_mean = samplerate_mean, + sample_rate_median = samplerate_median, latency_mean = latency_mean, latency_median = latency_median, latency_total = total_latency, diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 8ae5eda7..c23913f2 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -6,12 +6,16 @@ import argparse import typing import random -import time +from datetime import datetime, timedelta from contextlib import contextmanager, redirect_stdout, redirect_stderr +import ezmsg.core as ez +from ezmsg.core.graphserver import GraphServer + from ..messagecodec import MessageEncoder from .envinfo import TestEnvironmentInfo +from .util import warmup from .impl import ( TestParameters, TestLogEntry, @@ -20,8 +24,6 @@ CONFIGS, ) -import ezmsg.core as ez - DEFAULT_MSG_SIZES = [2 ** exp for exp in range(4, 25, 8)] DEFAULT_N_CLIENTS = [2 ** exp for exp in range(0, 6, 2)] DEFAULT_COMMS = [c for c in Communication] @@ -84,7 +86,7 @@ def _check_for_quit() -> bool: CHECK_FOR_QUIT = _check_for_quit def get_datestamp() -> str: - return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + return datetime.now().strftime("%Y%m%d_%H%M%S") def perf_run( duration: float, @@ -95,6 +97,7 @@ def perf_run( comms: typing.Iterable[str] | None, configs: typing.Iterable[str] | None, grid: bool, + warmup_dur: float, ) -> None: if n_clients is None: @@ -137,41 +140,59 @@ def perf_run( random.shuffle(test_list) - with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: - - out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") - - ez.logger.info("Starting perf tests. Press 'q' + enter to quit tests early.") - time.sleep(3.0) # Give user an opportunity to read message. - - for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): - - if CHECK_FOR_QUIT(): - ez.logger.info("Stopping tests early...") - break - - ez.logger.info(f"TEST {test_idx + 1}/{len(test_list)}: {clients=}, {msg_size=}, conf={conf.__name__}, comm={comm.value}") - - output = TestLogEntry( - params = TestParameters( - msg_size = msg_size, - n_clients = clients, - config = conf.__name__, - comms = comm.value, - duration = duration, - num_buffers = num_buffers - ), - results = perform_test( - n_clients = clients, - duration = duration, - msg_size = msg_size, - buffers = num_buffers, - comms = comm, - config = conf, + server = GraphServer() + server.start() + + d = datetime(1,1,1) + timedelta(seconds = len(test_list) * duration) + total_dur_str = ':'.join([str(n) for n in [d.day - 1, d.hour, d.minute, d.second] if n != 0]) + ez.logger.info(f"About to run {len(test_list)} tests of {duration} sec each.") + ez.logger.info(f"Expected total duration ~{total_dur_str})") + ez.logger.info(f"Please try to avoid running other taxing software while this perf test runs.") + ez.logger.info(f"NOTE: Tests swallow interrupt. After warmup, use 'q' then [enter] to quit tests early.") + + try: + ez.logger.info(f"Warming up for {warmup_dur} seconds...") + warmup(warmup_dur) + + with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: + out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") + + for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): + + if CHECK_FOR_QUIT(): + ez.logger.info("Stopping tests early...") + break + + ez.logger.info( + f"TEST {test_idx + 1}/{len(test_list)}: " \ + f"{clients=}, {msg_size=}, conf={conf.__name__}, " \ + f"comm={comm.value}" + ) + + output = TestLogEntry( + params = TestParameters( + msg_size = msg_size, + n_clients = clients, + config = conf.__name__, + comms = comm.value, + duration = duration, + num_buffers = num_buffers + ), + results = perform_test( + n_clients = clients, + duration = duration, + msg_size = msg_size, + buffers = num_buffers, + comms = comm, + config = conf, + graph_address = server.address + ) ) - ) - out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") + out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") + finally: + server.stop() + def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: @@ -180,8 +201,8 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: p_run.add_argument( "--duration", type=float, - default=2.0, - help="individual test duration in seconds (default = 2.0)", + default=0.5, + help="individual test duration in seconds (default = 0.5)", ) p_run.add_argument( @@ -194,8 +215,8 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: p_run.add_argument( "--iters", "-i", type = int, - default = 3, - help = "number of times to run each test (default = 3)" + default = 50, + help = "number of times to run each test (default = 50)" ) p_run.add_argument( @@ -237,6 +258,13 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: "(default: False; msg_sizes and n_clients must match in length)" ) + p_run.add_argument( + "--warmup", + type = float, + default = 60.0, + help = "warmup CPU with busy task for some number of seconds (default = 60.0)" + ) + p_run.set_defaults(_handler=lambda ns: perf_run( duration = ns.duration, num_buffers = ns.num_buffers, @@ -246,4 +274,5 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: comms = ns.comms, configs = ns.configs, grid = ns.grid, + warmup_dur = ns.warmup, )) \ No newline at end of file diff --git a/src/ezmsg/util/perf/util.py b/src/ezmsg/util/perf/util.py new file mode 100644 index 00000000..d22d1496 --- /dev/null +++ b/src/ezmsg/util/perf/util.py @@ -0,0 +1,288 @@ +import os +import sys +import gc +import time +import statistics as stats +import contextlib +import subprocess +import platform +from dataclasses import dataclass +from typing import Iterable, List, Optional + +try: + import psutil # optional but helpful +except Exception: + psutil = None + +_IS_WIN = os.name == "nt" +_IS_MAC = sys.platform == "darwin" +_IS_LINUX = sys.platform.startswith("linux") + +# ---------- Utilities ---------- + +def _set_env_threads(single_thread: bool = True): + """ + Normalize math/threading libs so they don't spawn surprise worker threads. + """ + if single_thread: + os.environ.setdefault("OMP_NUM_THREADS", "1") + os.environ.setdefault("MKL_NUM_THREADS", "1") + os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1") + os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") + os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") + # Keep PYTHONHASHSEED stable for deterministic dict/set iteration costs + os.environ.setdefault("PYTHONHASHSEED", "0") + +# ---------- Priority & Affinity ---------- + +@contextlib.contextmanager +def _process_priority(): + """ + Elevate process priority in a cross-platform best-effort way. + """ + if psutil is None: + yield + return + + p = psutil.Process() + orig_nice = None + if _IS_WIN: + try: + import ctypes, ctypes.wintypes as wt + kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + ABOVE_NORMAL_PRIORITY_CLASS = 0x00008000 + HIGH_PRIORITY_CLASS = 0x00000080 + # Try High, fall back to Above Normal + if not kernel32.SetPriorityClass(kernel32.GetCurrentProcess(), HIGH_PRIORITY_CLASS): + kernel32.SetPriorityClass(kernel32.GetCurrentProcess(), ABOVE_NORMAL_PRIORITY_CLASS) + except Exception: + pass + else: + try: + orig_nice = p.nice() + # Negative nice may need privileges; try smaller magnitude first + for nice_val in (-10, -5, 0): + try: + p.nice(nice_val) + break + except Exception: + continue + except Exception: + pass + try: + yield + finally: + # restore nice if we changed it + if psutil is not None and not _IS_WIN and orig_nice is not None: + try: + p.nice(orig_nice) + except Exception: + pass + +@contextlib.contextmanager +def _cpu_affinity(prefer_isolation: bool = True): + """ + Set CPU affinity to a small, stable set of CPUs (where supported). + macOS does not support affinity via psutil; we no-op there. + """ + if psutil is None or _IS_MAC: + yield + return + + p = psutil.Process() + original = None + try: + if hasattr(p, "cpu_affinity"): + original = p.cpu_affinity() + cpus = original + if prefer_isolation and len(cpus) > 2: + # Pick two middle CPUs to avoid 0 which often handles interrupts + mid = len(cpus) // 2 + cpus = [cpus[mid-1], cpus[mid]] + p.cpu_affinity(cpus) + yield + finally: + try: + if original is not None and hasattr(p, "cpu_affinity"): + p.cpu_affinity(original) + except Exception: + pass + +# ---------- Platform-specific helpers ---------- + +@contextlib.contextmanager +def _mac_caffeinate(): + """ + Keep macOS awake during the run via a background caffeinate process. + """ + if not _IS_MAC: + yield + return + proc = None + try: + proc = subprocess.Popen(["caffeinate", "-dimsu"]) + except Exception: + proc = None + try: + yield + finally: + if proc is not None: + try: + proc.terminate() + except Exception: + pass + +@contextlib.contextmanager +def _win_timer_resolution(ms: int = 1): + """ + On Windows, request a finer system timer to stabilize sleeps and scheduling slices. + """ + if not _IS_WIN: + yield + return + import ctypes + winmm = ctypes.WinDLL("winmm") + timeBeginPeriod = winmm.timeBeginPeriod + timeEndPeriod = winmm.timeEndPeriod + try: + timeBeginPeriod(ms) + except Exception: + pass + try: + yield + finally: + try: + timeEndPeriod(ms) + except Exception: + pass + +# ---------- Warm-up & GC ---------- + +def warmup(seconds: float = 60.0, fn=None, *args, **kwargs): + """ + Optional warm-up to reach steady clocks/caches. + If fn is provided, call it in a loop for the given time. + """ + if seconds <= 0: + return + end = time.perf_counter() + target = end + seconds + if fn is None: + # Busy wait / sleep mix to heat up without heavy CPU + while time.perf_counter() < target: + x = 0 + for _ in range(10000): + x += 1 + time.sleep(0) + else: + while time.perf_counter() < target: + fn(*args, **kwargs) + +@contextlib.contextmanager +def gc_pause(): + """ + Disable GC inside timing windows; re-enable and collect after. + """ + was_enabled = gc.isenabled() + try: + gc.disable() + yield + finally: + if was_enabled: + gc.enable() + gc.collect() + +# ---------- Robust statistics ---------- + +def median_of_means(samples: Iterable[float], k: int = 5) -> float: + """ + Robust estimate: split samples into k buckets (round-robin), average each, take median of bucket means. + """ + samples = list(samples) + if not samples: + return float("nan") + k = max(1, min(k, len(samples))) + buckets = [[] for _ in range(k)] + for i, v in enumerate(samples): + buckets[i % k].append(v) + means = [sum(b)/len(b) for b in buckets if b] + means.sort() + return means[len(means)//2] + +def coef_var(samples: Iterable[float]) -> float: + vals = list(samples) + if len(vals) < 2: + return 0.0 + m = sum(vals)/len(vals) + if m == 0: + return 0.0 + sd = stats.pstdev(vals) + return sd / m + +# ---------- Public context manager ---------- + +@dataclass +class PerfOptions: + single_thread_math: bool = True + prefer_isolated_cpus: bool = True + warmup_seconds: float = 0.0 + adjust_priority: bool = True + tweak_timer_windows: bool = True + keep_mac_awake: bool = True + +@contextlib.contextmanager +def stable_perf(opts: PerfOptions = PerfOptions()): + """ + Wrap your perf runs with this context manager for a stabler environment. + """ + _set_env_threads(opts.single_thread_math) + + cm_stack = contextlib.ExitStack() + try: + if opts.adjust_priority: + cm_stack.enter_context(_process_priority()) + if opts.tweak_timer_windows: + cm_stack.enter_context(_win_timer_resolution(1)) + if opts.prefer_isolated_cpus: + cm_stack.enter_context(_cpu_affinity(True)) + if opts.keep_mac_awake: + cm_stack.enter_context(_mac_caffeinate()) + + if opts.warmup_seconds > 0: + warmup(opts.warmup_seconds) + + with gc_pause(): + yield + finally: + cm_stack.close() + +# ---------- Example runners ---------- + +def run_interleaved(configs: List[dict], run_fn, trials: int = 5, trial_seconds: float = 30.0, seed: int = 42): + """ + Interleave scenarios to cancel slow drift. `run_fn(config, seconds, seed_offset)` should return a dict of metrics. + Returns a list of per-trial results per config. + """ + import random + random.seed(seed) + results = [ [] for _ in configs ] + order = list(range(len(configs))) + # fixed order per pass; you'll re-use the same order for stability + for t in range(trials): + for idx in order: + res = run_fn(configs[idx], trial_seconds, seed + t) + results[idx].append(res) + return results + +def summarize_metric(trials: List[dict], key: str, mom_buckets: int = 5): + """ + Extract a metric across trial dicts and summarize with median-of-means and CV. + """ + vals = [float(tr[key]) for tr in trials if key in tr] + return { + "count": len(vals), + "mom": median_of_means(vals, mom_buckets), + "mean": sum(vals)/len(vals) if vals else float("nan"), + "p50": stats.median(vals) if vals else float("nan"), + "cv": coef_var(vals) if vals else float("nan"), + } \ No newline at end of file From f3e2560c625ca63b49efcda945fcf099109695a8 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Thu, 6 Nov 2025 09:51:06 -0500 Subject: [PATCH 21/23] tests stabilized with median of means and num-buffers = 1 --- src/ezmsg/util/perf/analysis.py | 24 ++++--- src/ezmsg/util/perf/impl.py | 60 +++++++++++----- src/ezmsg/util/perf/run.py | 120 ++++++++++++++++++++------------ 3 files changed, 133 insertions(+), 71 deletions(-) diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 7f731e6d..2df4bebb 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -59,16 +59,22 @@ def load_perf(perf: Path) -> xr.Dataset: - all_results: typing.Dict[TestParameters, typing.List[Metrics]] = dict() + all_results: typing.Dict[TestParameters, typing.Dict[int, typing.List[Metrics]]] = dict() + + run_idx = 0 with open(perf, 'r') as perf_f: info: TestEnvironmentInfo = json.loads(next(perf_f), cls = MessageDecoder) - for line in perf_f: - obj: TestLogEntry = json.loads(line, cls = MessageDecoder) - metrics = all_results.get(obj.params, list()) - metrics.append(obj.results) - all_results[obj.params] = metrics + obj = json.loads(line, cls = MessageDecoder) + if isinstance(obj, TestEnvironmentInfo): + run_idx += 1 + elif isinstance(obj, TestLogEntry): + runs = all_results.get(obj.params, dict()) + metrics = runs.get(run_idx, list()) + metrics.append(obj.results) + runs[run_idx] = metrics + all_results[obj.params] = runs n_clients_axis = list(sorted(set([p.n_clients for p in all_results.keys()]))) msg_size_axis = list(sorted(set([p.msg_size for p in all_results.keys()]))) @@ -91,14 +97,14 @@ def load_perf(perf: Path) -> xr.Dataset: len(comms_axis), len(config_axis) )) * np.nan - for p, r in all_results.items(): - # tests are run multiple times; get the median value for each metric + for p, a in all_results.items(): + # tests are run multiple times; get the median of means m[ n_clients_axis.index(p.n_clients), msg_size_axis.index(p.msg_size), comms_axis.index(p.comms), config_axis.index(p.config) - ] = np.median([getattr(v, field.name) for v in r]) + ] = np.median([np.mean([getattr(v, field.name) for v in r]) for r in a.values()]) data_vars[field.name] = xr.DataArray(m, dims = dims, coords = coords) dataset = xr.Dataset(data_vars, attrs = dict(info = info)) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index 4788cc2c..a5eb18c2 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -7,6 +7,7 @@ import ezmsg.core as ez +from ezmsg.util.messages.util import replace from ezmsg.core.netprotocol import Address try: @@ -52,7 +53,8 @@ class Metrics: class LoadTestSettings(ez.Settings): - duration: float + max_duration: float + num_msgs: int dynamic_size: int buffers: int force_tcp: bool @@ -66,7 +68,7 @@ class LoadTestSample: key: str -class LoadTestSender(ez.Unit): +class LoadTestSource(ez.Unit): OUTPUT = ez.OutputStream(LoadTestSample) SETTINGS = LoadTestSettings @@ -81,9 +83,12 @@ async def initialize(self) -> None: async def publish(self) -> typing.AsyncGenerator: ez.logger.info(f"Load test publisher started. (PID: {os.getpid()})") start_time = time.time() - while self.running: + for _ in range(self.SETTINGS.num_msgs): + if not self.running: + break + current_time = time.time() - if current_time - start_time >= self.SETTINGS.duration: + if current_time - start_time >= self.SETTINGS.max_duration: break yield ( @@ -92,21 +97,22 @@ async def publish(self) -> typing.AsyncGenerator: _timestamp=time.time(), counter=self.counter, dynamic_data=np.zeros( - int(self.SETTINGS.dynamic_size // 8), dtype=np.float32 + int(self.SETTINGS.dynamic_size // 4), dtype=np.float32 ), key = self.name, ), ) self.counter += 1 + ez.logger.info("Exiting publish") raise ez.Complete - -class LoadTestSource(LoadTestSender): + async def shutdown(self) -> None: self.running = False ez.logger.info(f"Samples sent: {self.counter}") + class LoadTestRelay(ez.Unit): INPUT = ez.InputStream(LoadTestSample) OUTPUT = ez.OutputStream(LoadTestSample) @@ -130,6 +136,9 @@ class LoadTestReceiver(ez.Unit): SETTINGS = LoadTestSettings STATE = LoadTestReceiverState + async def initialize(self) -> None: + ez.logger.info(f"Load test subscriber started. (PID: {os.getpid()})") + @ez.subscriber(INPUT, zero_copy=True) async def receive(self, sample: LoadTestSample) -> None: counter = self.STATE.counters.get(sample.key, -1) @@ -145,12 +154,19 @@ async def receive(self, sample: LoadTestSample) -> None: class LoadTestSink(LoadTestReceiver): + INPUT = ez.InputStream(LoadTestSample) + + @ez.subscriber(INPUT, zero_copy=True) + async def receive(self, sample: LoadTestSample) -> None: + await super().receive(sample) + if len(self.STATE.received_data) == self.SETTINGS.num_msgs: + raise ez.NormalTermination + @ez.task async def terminate(self) -> None: - ez.logger.info(f"Load test subscriber started. (PID: {os.getpid()})") - - # Wait for the duration of the load test - await asyncio.sleep(self.SETTINGS.duration) + # Wait for the max duration of the load test + await asyncio.sleep(self.SETTINGS.max_duration) + ez.logger.warning("TIMEOUT -- terminating test.") raise ez.NormalTermination @@ -178,7 +194,9 @@ def fanout(config: ConfigSettings) -> Configuration: def fanin(config: ConfigSettings) -> Configuration: """ many pubs to one sub """ connections: ez.NetworkDefinition = [(config.source.OUTPUT, config.sink.INPUT)] - pubs = [LoadTestSender(config.settings) for _ in range(config.n_clients)] + pubs = [LoadTestSource(config.settings) for _ in range(config.n_clients)] + expected_num_msgs = config.sink.SETTINGS.num_msgs * len(pubs) + config.sink.SETTINGS = replace(config.sink.SETTINGS, num_msgs = expected_num_msgs) # type: ignore for pub in pubs: connections.append((pub.OUTPUT, config.sink.INPUT)) return pubs, connections @@ -215,7 +233,8 @@ class Communication(enum.StrEnum): def perform_test( n_clients: int, - duration: float, + max_duration: float, + num_msgs: int, msg_size: int, buffers: int, comms: Communication, @@ -225,7 +244,8 @@ def perform_test( settings = LoadTestSettings( dynamic_size = int(msg_size), - duration = duration, + num_msgs = num_msgs, + max_duration = max_duration, buffers = buffers, force_tcp = (comms in (Communication.TCP, Communication.TCP_SPREAD)), ) @@ -275,10 +295,10 @@ def perform_test( graph_address = graph_address ) - return calculate_metrics(sink, duration) + return calculate_metrics(sink) -def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: +def calculate_metrics(sink: LoadTestSink) -> Metrics: # Log some useful summary statistics min_timestamp = min(timestamp for timestamp, _, _ in sink.STATE.received_data) @@ -295,13 +315,14 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: ) rx_timestamps = np.array([rx_ts for _, rx_ts, _ in sink.STATE.received_data]) + runtime = max_timestamp - min_timestamp num_samples = len(sink.STATE.received_data) - samplerate_mean = num_samples / duration + samplerate_mean = num_samples / runtime samplerate_median = 1.0 / float(np.median(np.diff(rx_timestamps))) latency_mean = total_latency / num_samples latency_median = list(sorted(latency))[len(latency) // 2] total_data = num_samples * sink.SETTINGS.dynamic_size - data_rate = total_data / (max_timestamp - min_timestamp) + data_rate = total_data / runtime ez.logger.info(f"Samples received: {num_samples}") ez.logger.info(f"Mean sample rate: {samplerate_mean} Hz") @@ -330,10 +351,11 @@ def calculate_metrics(sink: LoadTestSink, duration: float) -> Metrics: @dataclasses.dataclass(unsafe_hash=True) class TestParameters: msg_size: int + num_msgs: int n_clients: int config: str comms: str - duration: float + max_duration: float num_buffers: int diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index c23913f2..3c391bdc 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -6,6 +6,7 @@ import argparse import typing import random +import time from datetime import datetime, timedelta from contextlib import contextmanager, redirect_stdout, redirect_stderr @@ -89,9 +90,11 @@ def get_datestamp() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") def perf_run( - duration: float, + max_duration: float, + num_msgs: int, num_buffers: int, iters: int, + repeats: int, msg_sizes: typing.Iterable[int] | None, n_clients: typing.Iterable[int] | None, comms: typing.Iterable[str] | None, @@ -143,55 +146,70 @@ def perf_run( server = GraphServer() server.start() - d = datetime(1,1,1) + timedelta(seconds = len(test_list) * duration) - total_dur_str = ':'.join([str(n) for n in [d.day - 1, d.hour, d.minute, d.second] if n != 0]) - ez.logger.info(f"About to run {len(test_list)} tests of {duration} sec each.") - ez.logger.info(f"Expected total duration ~{total_dur_str})") + ez.logger.info(f"About to run {len(test_list)} tests (repeated {repeats} times) of {max_duration} sec (max) each.") + ez.logger.info(f"During each test, source will attempt to send {num_msgs} messages to the sink.") ez.logger.info(f"Please try to avoid running other taxing software while this perf test runs.") ez.logger.info(f"NOTE: Tests swallow interrupt. After warmup, use 'q' then [enter] to quit tests early.") + quitting = False + + start_time = time.time() + try: ez.logger.info(f"Warming up for {warmup_dur} seconds...") warmup(warmup_dur) with open(f'perf_{get_datestamp()}.txt', 'w') as out_f: - out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") + for _ in range(repeats): + out_f.write(json.dumps(TestEnvironmentInfo(), cls = MessageEncoder) + "\n") - for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): + for test_idx, (msg_size, clients, conf, comm) in enumerate(test_list): - if CHECK_FOR_QUIT(): - ez.logger.info("Stopping tests early...") - break + if CHECK_FOR_QUIT(): + ez.logger.info("Stopping tests early...") + quitting = True + break + + ez.logger.info( + f"TEST {test_idx + 1}/{len(test_list)}: " \ + f"{clients=}, {msg_size=}, conf={conf.__name__}, " \ + f"comm={comm.value}" + ) - ez.logger.info( - f"TEST {test_idx + 1}/{len(test_list)}: " \ - f"{clients=}, {msg_size=}, conf={conf.__name__}, " \ - f"comm={comm.value}" - ) - - output = TestLogEntry( - params = TestParameters( - msg_size = msg_size, - n_clients = clients, - config = conf.__name__, - comms = comm.value, - duration = duration, - num_buffers = num_buffers - ), - results = perform_test( - n_clients = clients, - duration = duration, - msg_size = msg_size, - buffers = num_buffers, - comms = comm, - config = conf, - graph_address = server.address + output = TestLogEntry( + params = TestParameters( + msg_size = msg_size, + num_msgs = num_msgs, + n_clients = clients, + config = conf.__name__, + comms = comm.value, + max_duration = max_duration, + num_buffers = num_buffers + ), + results = perform_test( + n_clients = clients, + max_duration = max_duration, + num_msgs = num_msgs, + msg_size = msg_size, + buffers = num_buffers, + comms = comm, + config = conf, + graph_address = server.address + ) ) - ) - out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") + out_f.write(json.dumps(output, cls = MessageEncoder) + "\n") + + if quitting: + break + finally: server.stop() + d = datetime(1,1,1) + timedelta(seconds = time.time() - start_time) + dur_str = ':'.join([str(n) for n in [d.day - 1, d.hour, d.minute, d.second] if n != 0]) + ez.logger.info(f"Tests concluded. Wallclock Runtime: {dur_str}s") + + def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: @@ -199,24 +217,38 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: p_run = subparsers.add_parser("run", help="run performance test") p_run.add_argument( - "--duration", + "--max-duration", type=float, - default=0.5, - help="individual test duration in seconds (default = 0.5)", + default=5.0, + help="maximum individual test duration in seconds (default = 5.0)", + ) + + p_run.add_argument( + "--num-msgs", + type=int, + default=1000, + help = "number of messages to send per-test (default = 1000)" ) p_run.add_argument( "--num-buffers", type=int, - default=32, - help="shared memory buffers (default = 32)", + default=1, + help="shared memory buffers (default = 1)", ) p_run.add_argument( "--iters", "-i", type = int, - default = 50, - help = "number of times to run each test (default = 50)" + default = 5, + help = "number of times to run each test (default = 5)" + ) + + p_run.add_argument( + "--repeats", "-r", + type = int, + default = 10, + help = "number of times to repeat the perf (default = 10)" ) p_run.add_argument( @@ -266,9 +298,11 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: ) p_run.set_defaults(_handler=lambda ns: perf_run( - duration = ns.duration, + max_duration = ns.max_duration, + num_msgs = ns.num_msgs, num_buffers = ns.num_buffers, iters = ns.iters, + repeats = ns.repeats, msg_sizes = ns.msg_sizes, n_clients = ns.n_clients, comms = ns.comms, From c6b985f4a612963194478530344cd56d4242af71 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 10 Nov 2025 11:59:18 -0500 Subject: [PATCH 22/23] further simplifying tests --- src/ezmsg/util/perf/run.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 3c391bdc..ce6c2071 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -25,11 +25,11 @@ CONFIGS, ) -DEFAULT_MSG_SIZES = [2 ** exp for exp in range(4, 25, 8)] -DEFAULT_N_CLIENTS = [2 ** exp for exp in range(0, 6, 2)] +DEFAULT_MSG_SIZES = [2**4, 2**20] +DEFAULT_N_CLIENTS = [1, 16] DEFAULT_COMMS = [c for c in Communication] -# --- Output Suppression Context Manager (from the previous solution) --- +# --- Output Suppression Context Manager --- @contextmanager def suppress_output(verbose: bool = False): """Context manager to redirect stdout and stderr to os.devnull""" @@ -95,8 +95,8 @@ def perf_run( num_buffers: int, iters: int, repeats: int, - msg_sizes: typing.Iterable[int] | None, - n_clients: typing.Iterable[int] | None, + msg_sizes: typing.List[int] | None, + n_clients: typing.List[int] | None, comms: typing.Iterable[str] | None, configs: typing.Iterable[str] | None, grid: bool, @@ -117,8 +117,7 @@ def perf_run( if not grid and len(list(n_clients)) != len(list(msg_sizes)): ez.logger.warning( "Not performing a grid test of all combinations of n_clients and msg_sizes, but " + \ - "len(n_clients) != len(msg_sizes). " + \ - "If you want to perform all combinations of n_clients and msg_sizes, use --grid" + f"{len(n_clients)=} which is not equal to {len(msg_sizes)=}. " ) try: @@ -230,6 +229,19 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: help = "number of messages to send per-test (default = 1000)" ) + # NOTE: We default num-buffers = 1 because this degenerate perf test scenario (blasting + # messages as fast as possible through the system) results in one of two scenerios: + # 1. A (few) messages is/are enqueued and dequeued before another message is posted + # 2. The buffer fills up before being FULLY emptied resulting in longer latency. + # (once a channel enters this condition, it tends to stay in this condition) + # + # This _indeterminate_ behavior results in bimodal distributions of runtimes that make + # A/B performance comparisons difficult. The perf test is not representative of the vast + # majority of production ezmsg systems where publishing is generally rate-limited. + # + # A flow-control algorithm could stabilize perf-test results with num_buffers > 1, but is + # generally implemented by enforcing delays on the publish side which simply degrades + # performance in the vast majority of ezmsg systems. - Griff p_run.add_argument( "--num-buffers", type=int, @@ -283,13 +295,6 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: help = f"configurations to test (default = {[c for c in CONFIGS]})" ) - p_run.add_argument( - "--grid", - action = "store_true", - help = "perform all combinations of msg_sizes and n_clients " + \ - "(default: False; msg_sizes and n_clients must match in length)" - ) - p_run.add_argument( "--warmup", type = float, @@ -307,6 +312,6 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: n_clients = ns.n_clients, comms = ns.comms, configs = ns.configs, - grid = ns.grid, + grid = True, warmup_dur = ns.warmup, )) \ No newline at end of file From 7b7cdceed64b4b26b8fb253a6b674b1d87965bcd Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Tue, 11 Nov 2025 12:51:48 -0500 Subject: [PATCH 23/23] tweak to remove nonstandard handling of state vars --- src/ezmsg/util/perf/impl.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/ezmsg/util/perf/impl.py b/src/ezmsg/util/perf/impl.py index a5eb18c2..51ae600b 100644 --- a/src/ezmsg/util/perf/impl.py +++ b/src/ezmsg/util/perf/impl.py @@ -67,15 +67,15 @@ class LoadTestSample: dynamic_data: np.ndarray key: str +class LoadTestSourceState(ez.State): + counter: int = 0 class LoadTestSource(ez.Unit): OUTPUT = ez.OutputStream(LoadTestSample) SETTINGS = LoadTestSettings + STATE = LoadTestSourceState async def initialize(self) -> None: - self.running = True - self.counter = 0 - self.OUTPUT.num_buffers = self.SETTINGS.buffers self.OUTPUT.force_tcp = self.SETTINGS.force_tcp @@ -84,8 +84,6 @@ async def publish(self) -> typing.AsyncGenerator: ez.logger.info(f"Load test publisher started. (PID: {os.getpid()})") start_time = time.time() for _ in range(self.SETTINGS.num_msgs): - if not self.running: - break current_time = time.time() if current_time - start_time >= self.SETTINGS.max_duration: @@ -95,21 +93,20 @@ async def publish(self) -> typing.AsyncGenerator: self.OUTPUT, LoadTestSample( _timestamp=time.time(), - counter=self.counter, + counter=self.STATE.counter, dynamic_data=np.zeros( int(self.SETTINGS.dynamic_size // 4), dtype=np.float32 ), key = self.name, ), ) - self.counter += 1 + self.STATE.counter += 1 ez.logger.info("Exiting publish") raise ez.Complete async def shutdown(self) -> None: - self.running = False - ez.logger.info(f"Samples sent: {self.counter}") + ez.logger.info(f"Samples sent: {self.STATE.counter}")