From 928f0b270a0a04cd8951ec42f76526dcaa7d936b Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Fri, 6 Feb 2026 16:02:24 +0000 Subject: [PATCH 1/3] Only increment counts when we acknowledge --- ruby/lib/ci/queue/redis/build_record.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index cb45a755..3c23de95 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -57,12 +57,13 @@ def record_warning(type, attributes) end def record_error(id, payload, stats: nil) - acknowledged, _ = redis.pipelined do |pipeline| - @queue.acknowledge(id, error: payload, pipeline: pipeline) - record_stats(stats, pipeline: pipeline) - end + acknowledged = @queue.acknowledge(id, error: payload) - @queue.increment_test_failed if acknowledged == 1 + if acknowledged + # if another worker already acknowledged the test, we don't need to update the global stats or increment the test failed count + record_stats(stats) + @queue.increment_test_failed + end nil end From 14cf9e89c34f8df3ce1105636070b182bb4f89b5 Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Sun, 8 Feb 2026 20:41:55 +0000 Subject: [PATCH 2/3] Fix appending to warning file --- ruby/lib/minitest/queue/runner.rb | 25 ++++++++++++++++---- ruby/test/integration/minitest_redis_test.rb | 4 ++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index ef50ada3..8515ca71 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'optparse' require 'json' +require 'fileutils' require 'minitest/queue' require 'ci/queue' require 'digest/md5' @@ -242,16 +243,16 @@ def bisect_command puts File.write('log/test_order.log', failing_order.to_a.map(&:id).join("\n")) - + bisect_test_details = failing_order.to_a.map do |test| source_location = test.source_location file_path = source_location&.first || 'unknown' line_number = source_location&.last || -1 "#{test.id} #{file_path}:#{line_number}" end - + File.write('log/bisect_test_details.log', bisect_test_details.join("\n")) - + exit! 0 end end @@ -336,8 +337,22 @@ def display_warnings(build) warnings = build.pop_warnings.map do |type, attributes| attributes.merge(type: type) end.compact - File.open(queue_config.warnings_file, 'w') do |f| - JSON.dump(warnings, f) + + return if warnings.empty? + + begin + # Ensure directory exists + dir = File.dirname(queue_config.warnings_file) + FileUtils.mkdir_p(dir) unless File.directory?(dir) + + # Write each warning as a separate JSON line (JSONL format) + File.open(queue_config.warnings_file, 'a') do |f| + warnings.each do |warning| + f.puts(JSON.dump(warning)) + end + end + rescue => error + STDERR.puts "Failed to write warnings: #{error.message}" end end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index dbb74d9f..2a24438a 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -89,7 +89,7 @@ def test_lost_test_with_heartbeat_monitor assert_empty err result = normalize(out.lines[1].strip) assert_equal "Ran 1 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result - warnings = JSON.parse(warnings_file.read) + warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } assert_equal 1, warnings.size end end @@ -927,7 +927,7 @@ def test_redis_reporter end warnings_file.rewind - content = JSON.parse(warnings_file.read) + content = warnings_file.read.lines.map { |line| JSON.parse(line) } assert_equal 1, content.size assert_equal "RESERVED_LOST_TEST", content[0]["type"] assert_equal "Atest#test_bar", content[0]["test"] From b1ea42b47c4f281282ebc805f55ffd6975b0bbfe Mon Sep 17 00:00:00 2001 From: Kangze Jia Date: Fri, 13 Feb 2026 19:31:52 -0800 Subject: [PATCH 3/3] Only increment stats on ack; delta-based stats; stat correction on replace - Record stats only when worker acknowledges; duplicate acks do not increment - Redis: record_stats_delta (HINCRBY); record_success returns true when ack'd or replaced - Stat correction when success replaces failure; real assertion count (test.assertions) in delta - Test helper: Requeue before Skip when both set; test_aggregation and integration expectations updated - Remove [stats] debug logging from Redis BuildRecord; test_redis_reporter assertions = 8 --- ruby/lib/ci/queue/build_record.rb | 32 +++++-- ruby/lib/ci/queue/redis/build_record.rb | 92 +++++++++++++++---- ruby/lib/ci/queue/redis/grind_record.rb | 28 +++--- ruby/lib/ci/queue/redis/key_shortener.rb | 1 + .../minitest/queue/build_status_recorder.rb | 46 +++++++--- ruby/lib/minitest/queue/grind_recorder.rb | 6 +- ruby/test/integration/minitest_redis_test.rb | 5 +- .../queue/build_status_recorder_test.rb | 56 ++++++++++- ruby/test/support/reporter_test_helper.rb | 3 +- 9 files changed, 205 insertions(+), 64 deletions(-) diff --git a/ruby/lib/ci/queue/build_record.rb b/ruby/lib/ci/queue/build_record.rb index 52b1a33c..f864f2a5 100644 --- a/ruby/lib/ci/queue/build_record.rb +++ b/ruby/lib/ci/queue/build_record.rb @@ -18,18 +18,35 @@ def queue_exhausted? @queue.exhausted? end - def record_error(id, payload, stats: nil) + def record_error(id, payload, stat_delta: nil) error_reports[id] = payload - record_stats(stats) + true end - def record_success(id, stats: nil, skip_flaky_record: false, acknowledge: true) + def record_success(id, skip_flaky_record: false, acknowledge: true) error_reports.delete(id) - record_stats(stats) + true + end + + def record_requeue(id) + true + end + + def record_stats(builds_stats) + return unless builds_stats + stats.merge!(builds_stats) + end + + def record_stats_delta(delta, pipeline: nil) + return if delta.nil? || delta.empty? + delta.each do |stat_name, value| + next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) + stats[stat_name] = (stats[stat_name] || 0).to_f + value.to_f + end end def fetch_stats(stat_names) - stat_names.zip(stats.values_at(*stat_names).map(&:to_f)) + stat_names.zip(stats.values_at(*stat_names).map(&:to_f)).to_h end def reset_stats(stat_names) @@ -47,11 +64,6 @@ def worker_errors private attr_reader :stats - - def record_stats(builds_stats) - return unless builds_stats - stats.merge!(builds_stats) - end end end end diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index 3c23de95..d894dacc 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -56,31 +56,71 @@ def record_warning(type, attributes) redis.rpush(key('warnings'), Marshal.dump([type, attributes])) end - def record_error(id, payload, stats: nil) + def record_error(id, payload, stat_delta: nil) + # Run acknowledge first so we know whether we're the first to ack acknowledged = @queue.acknowledge(id, error: payload) if acknowledged - # if another worker already acknowledged the test, we don't need to update the global stats or increment the test failed count - record_stats(stats) + # We were the first to ack; another worker already ack'd would get falsy from SADD @queue.increment_test_failed + # Only the acknowledging worker's stats include this failure (others skip increment when ack=false). + # Store so we can subtract it if another worker records success later. + store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any? end - nil + # Return so caller can roll back local counter when not acknowledged + !!acknowledged end - def record_success(id, stats: nil, skip_flaky_record: false) - _, error_reports_deleted_count, requeued_count, _ = redis.multi do |transaction| + def record_success(id, skip_flaky_record: false) + acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction| @queue.acknowledge(id, pipeline: transaction) transaction.hdel(key('error-reports'), id) transaction.hget(key('requeues-count'), id) - record_stats(stats, pipeline: transaction) + transaction.hget(key('error-report-deltas'), id) + end + # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution + if error_reports_deleted_count.to_i > 0 && delta_json + apply_error_report_delta_correction(delta_json) + redis.hdel(key('error-report-deltas'), id) end record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) - nil + # Count this run when we ack'd or when we replaced a failure (so stats delta is applied) + !!(acknowledged || error_reports_deleted_count.to_i > 0) end - def record_requeue(id, stats: nil) - redis.pipelined do |pipeline| - record_stats(stats, pipeline: pipeline) + def record_requeue(id) + true + end + + def record_stats(stats = nil, pipeline: nil) + return unless stats + if pipeline + stats.each do |stat_name, stat_value| + pipeline.hset(key(stat_name), config.worker_id, stat_value) + pipeline.expire(key(stat_name), config.redis_ttl) + end + else + redis.pipelined do |p| + record_stats(stats, pipeline: p) + end + end + end + + # Apply a delta to this worker's stats in Redis (HINCRBY). Use this instead of + # record_stats when recording per-test so we never overwrite and correction sticks. + def record_stats_delta(delta, pipeline: nil) + return if delta.nil? || delta.empty? + apply_delta = lambda do |p| + delta.each do |stat_name, value| + next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) + p.hincrbyfloat(key(stat_name), config.worker_id.to_s, value.to_f) + p.expire(key(stat_name), config.redis_ttl) + end + end + if pipeline + apply_delta.call(pipeline) + else + redis.pipelined { |p| apply_delta.call(p) } end end @@ -131,17 +171,31 @@ def reset_stats(stat_names) attr_reader :config, :redis - def record_stats(stats, pipeline: redis) - return unless stats - stats.each do |stat_name, stat_value| - pipeline.hset(key(stat_name), config.worker_id, stat_value) - pipeline.expire(key(stat_name), config.redis_ttl) - end - end - def key(*args) KeyShortener.key(config.build_id, *args) end + + def store_error_report_delta(test_id, stat_delta) + # Only the acknowledging worker's stats include this test; store their delta for correction on success + payload = { 'worker_id' => config.worker_id.to_s }.merge(stat_delta) + redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload)) + redis.expire(key('error-report-deltas'), config.redis_ttl) + end + + def apply_error_report_delta_correction(delta_json) + delta = JSON.parse(delta_json) + worker_id = delta.delete('worker_id')&.to_s + return if worker_id.nil? || worker_id.empty? || delta.empty? + + redis.pipelined do |pipeline| + delta.each do |stat_name, value| + next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) + + pipeline.hincrbyfloat(key(stat_name), worker_id, -value.to_f) + pipeline.expire(key(stat_name), config.redis_ttl) + end + end + end end end end diff --git a/ruby/lib/ci/queue/redis/grind_record.rb b/ruby/lib/ci/queue/redis/grind_record.rb index e4ed59a9..c96164f8 100644 --- a/ruby/lib/ci/queue/redis/grind_record.rb +++ b/ruby/lib/ci/queue/redis/grind_record.rb @@ -10,20 +10,32 @@ def initialize(queue, redis, config) @config = config end - def record_error(payload, stats: nil) + def record_error(payload) redis.pipelined do |pipeline| pipeline.lpush( key('error-reports'), payload, ) pipeline.expire(key('error-reports'), config.redis_ttl) - record_stats(stats, pipeline: pipeline) end nil end - def record_success(stats: nil) - record_stats(stats) + def record_success + end + + def record_stats(stats, pipeline: nil) + return unless stats + if pipeline + stats.each do |stat_name, stat_value| + pipeline.hset(key(stat_name), config.worker_id, stat_value) + pipeline.expire(key(stat_name), config.redis_ttl) + end + else + redis.pipelined do |p| + record_stats(stats, pipeline: p) + end + end end def record_warning(_,_) @@ -54,14 +66,6 @@ def pop_warnings def key(*args) KeyShortener.key(config.build_id, *args) end - - def record_stats(stats, pipeline: redis) - return unless stats - stats.each do |stat_name, stat_value| - pipeline.hset(key(stat_name), config.worker_id, stat_value) - pipeline.expire(key(stat_name), config.redis_ttl) - end - end end end end diff --git a/ruby/lib/ci/queue/redis/key_shortener.rb b/ruby/lib/ci/queue/redis/key_shortener.rb index a6ae1475..60e348bd 100644 --- a/ruby/lib/ci/queue/redis/key_shortener.rb +++ b/ruby/lib/ci/queue/redis/key_shortener.rb @@ -12,6 +12,7 @@ module KeyShortener 'queue' => 'q', 'owners' => 'o', 'error-reports' => 'e', + 'error-report-deltas' => 'ed', 'requeues-count' => 'rc', 'assertions' => 'a', 'errors' => 'er', diff --git a/ruby/lib/minitest/queue/build_status_recorder.rb b/ruby/lib/minitest/queue/build_status_recorder.rb index bf50b1f5..35cc4919 100644 --- a/ruby/lib/minitest/queue/build_status_recorder.rb +++ b/ruby/lib/minitest/queue/build_status_recorder.rb @@ -38,28 +38,46 @@ def record(test) super self.total_time = Minitest.clock_time - start_time - if test.requeued? - self.requeues += 1 - elsif test.skipped? - self.skips += 1 - elsif test.error? - self.errors += 1 - elsif test.failure - self.failures += 1 - end + + # Determine what type of result this is and record it + test_id = "#{test.klass}##{test.name}" + delta = delta_for(test) - stats = COUNTERS.zip(COUNTERS.map { |c| send(c) }).to_h - if (test.failure || test.error?) && !test.skipped? - build.record_error("#{test.klass}##{test.name}", dump(test), stats: stats) + acknowledged = if (test.failure || test.error?) && !test.skipped? + build.record_error(test_id, dump(test), stat_delta: delta) elsif test.requeued? - build.record_requeue("#{test.klass}##{test.name}", stats: stats) + build.record_requeue(test_id) else - build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?) + build.record_success(test_id, skip_flaky_record: test.skipped?) + end + + if acknowledged + if (test.failure || test.error?) && !test.skipped? + test.error? ? self.errors += 1 : self.failures += 1 + elsif test.requeued? + self.requeues += 1 + elsif test.skipped? + self.skips += 1 + end + # Apply delta to Redis (record_success returns true when ack'd or when we replaced a failure) + build.record_stats_delta(delta) end end private + def delta_for(test) + h = { 'assertions' => (test.assertions || 0).to_i, 'errors' => 0, 'failures' => 0, 'skips' => 0, 'requeues' => 0, 'total_time' => test.time.to_f } + if (test.failure || test.error?) && !test.skipped? + test.error? ? h['errors'] = 1 : h['failures'] = 1 + elsif test.requeued? + h['requeues'] = 1 + elsif test.skipped? + h['skips'] = 1 + end + h + end + def dump(test) ErrorReport.new(self.class.failure_formatter.new(test).to_h).dump end diff --git a/ruby/lib/minitest/queue/grind_recorder.rb b/ruby/lib/minitest/queue/grind_recorder.rb index 3b5c8b22..8d414f5f 100644 --- a/ruby/lib/minitest/queue/grind_recorder.rb +++ b/ruby/lib/minitest/queue/grind_recorder.rb @@ -32,12 +32,12 @@ def record(test) private def record_test(test) - stats = self.class.counters if (test.failure || test.error?) && !test.skipped? - build.record_error(dump(test), stats: stats) + build.record_error(dump(test)) else - build.record_success(stats: stats) + build.record_success end + build.record_stats(self.class.counters) end def increment_counter(test) diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 2a24438a..35bb8bdf 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -88,6 +88,7 @@ def test_lost_test_with_heartbeat_monitor assert_empty err result = normalize(out.lines[1].strip) + # lost_test.rb test_foo has no assertions (only sleep) assert_equal "Ran 1 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } assert_equal 1, warnings.size @@ -513,7 +514,8 @@ def test_retry_report error_reports.keys.each_with_index do |test_id, index| queue.instance_variable_set(:@reserved_tests, Concurrent::Set.new([test_id])) - queue.build.record_success(test_id.dup, stats: { + queue.build.record_success(test_id.dup) + queue.build.record_stats({ 'assertions' => index + 1, 'errors' => 0, 'failures' => 0, @@ -912,6 +914,7 @@ def test_redis_reporter end assert_empty err output = normalize(out.lines.last.strip) + # 8 = sum of test.assertions from Minitest (skip counts as 1 in some versions) assert_equal 'Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', output Tempfile.open('warnings') do |warnings_file| diff --git a/ruby/test/minitest/queue/build_status_recorder_test.rb b/ruby/test/minitest/queue/build_status_recorder_test.rb index c78833a8..3ea258eb 100644 --- a/ruby/test/minitest/queue/build_status_recorder_test.rb +++ b/ruby/test/minitest/queue/build_status_recorder_test.rb @@ -39,14 +39,21 @@ def test_aggregation second_reporter.record(result('g', requeued: true)) reserve(second_queue, "h") second_reporter.record(result('h', skipped: true, requeued: true)) + # W2 passes "b" (W1 had errored): stat correction subtracts W1's error. W2 records h as requeue (not success), so h stays in error_reports. + reserve(second_queue, "b") + second_reporter.record(result('b')) + # 9 runs × 1 assertion each (helper sets runnable.assertions += 1); real assertion count from delta_for assert_equal 9, summary.assertions + # W1: a, h (2 failures). W2: c. h's error report is not replaced (W2 recorded requeue), so no correction for h. assert_equal 3, summary.failures - assert_equal 3, summary.errors - assert_equal 2, summary.skips - assert_equal 1, summary.requeues + assert_equal 2, summary.errors + assert_equal 1, summary.skips + assert_equal 2, summary.requeues + # a, c, d, f, and h (W2 recorded h as requeue, so h's error report was not replaced/deleted) assert_equal 5, summary.error_reports.size - assert_equal 0, summary.flaky_reports.size + # W2's success on "b" replaced W1's error, so record_flaky("b") was called + assert_equal 1, summary.flaky_reports.size end def test_retrying_test @@ -76,6 +83,8 @@ def test_retrying_test second_reporter.record(result("a")) assert_equal 0, summary.error_reports.size assert_equal 1, @queue.test_failed + # Second worker's record_success returned false (duplicate ack), so local counters were not incremented + assert_equal 0, second_reporter.skips end def test_retrying_test_reverse @@ -105,6 +114,8 @@ def test_retrying_test_reverse second_reporter.record(result("a", failure: "Something went wrong")) assert_equal 0, summary.error_reports.size assert_equal 0, @queue.test_failed + # Second worker's record_error returned false (duplicate ack), so local counters were not incremented + assert_equal 0, second_reporter.failures end def test_static_queue_record_success @@ -121,6 +132,43 @@ def test_static_queue_record_success assert_equal 0, static_reporter.requeues end + def test_duplicate_success_does_not_increment_skips + # Worker 1 records success for "a" first + reserve(@queue, "a") + @reporter.record(result("a", skipped: true)) + assert_equal 1, @reporter.skips + + # Worker 2 records success for same test "a" (duplicate ack) + second_queue = worker(2) + second_reporter = BuildStatusRecorder.new(build: second_queue.build) + second_reporter.start + reserve(second_queue, "a") + second_reporter.record(result("a", skipped: true)) + + # Second reporter did not increment skips because record_success returned false + assert_equal 0, second_reporter.skips + end + + def test_build_record_methods_return_boolean + # Redis build: first to ack returns true, duplicate returns false + reserve(@queue, "a") + assert_equal true, @queue.build.record_success("Minitest::Test#a") + assert_equal true, @queue.build.record_requeue("Minitest::Test#b") + + second_queue = worker(2) + reserve(second_queue, "a") + assert_equal false, second_queue.build.record_success("Minitest::Test#a") + end + + def test_static_build_record_returns_true + static_queue = CI::Queue::Static.new(['test_example'], CI::Queue::Configuration.new(build_id: '42', worker_id: '1')) + build = static_queue.build + + assert_equal true, build.record_success("test_example") + assert_equal true, build.record_requeue("test_example") + assert_equal true, build.record_error("test_example", "payload") + end + private def reserve(queue, method_name) diff --git a/ruby/test/support/reporter_test_helper.rb b/ruby/test/support/reporter_test_helper.rb index b2bcd6a4..0271847b 100644 --- a/ruby/test/support/reporter_test_helper.rb +++ b/ruby/test/support/reporter_test_helper.rb @@ -11,9 +11,10 @@ def result(name, **kwargs) def runnable(name, failure: nil, requeued: false, skipped: false, unexpected_error: false) runnable = Minitest::Test.new(name) runnable.failures << generate_assertion(failure) if failure + # Push Requeue before Skip when both set so requeued? is true (recorder checks requeued? before success/skip) + runnable.failures << Minitest::Requeue.new(generate_assertion("Failed")) if requeued runnable.failures << Minitest::Skip.new if skipped runnable.failures << generate_unexpected_error if unexpected_error - runnable.failures << Minitest::Requeue.new(generate_assertion("Failed")) if requeued runnable.assertions += 1 runnable.time = 0.12 runnable