diff --git a/redis/_entry_helpers.lua b/redis/_entry_helpers.lua new file mode 100644 index 00000000..c1755278 --- /dev/null +++ b/redis/_entry_helpers.lua @@ -0,0 +1,9 @@ +local function test_id_from_entry(value, delimiter) + if delimiter then + local pos = string.find(value, delimiter, 1, true) + if pos then + return string.sub(value, 1, pos - 1) + end + end + return value +end diff --git a/redis/acknowledge.lua b/redis/acknowledge.lua index 865dd88c..7ba9b6cf 100644 --- a/redis/acknowledge.lua +++ b/redis/acknowledge.lua @@ -3,15 +3,16 @@ local processed_key = KEYS[2] local owners_key = KEYS[3] local error_reports_key = KEYS[4] -local test = ARGV[1] -local error = ARGV[2] -local ttl = ARGV[3] -redis.call('zrem', zset_key, test) -redis.call('hdel', owners_key, test) -- Doesn't matter if it was reclaimed by another workers -local acknowledged = redis.call('sadd', processed_key, test) == 1 +local entry = ARGV[1] +local test_id = ARGV[2] +local error = ARGV[3] +local ttl = ARGV[4] +redis.call('zrem', zset_key, entry) +redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers +local acknowledged = redis.call('sadd', processed_key, test_id) == 1 if acknowledged and error ~= "" then - redis.call('hset', error_reports_key, test, error) + redis.call('hset', error_reports_key, test_id, error) redis.call('expire', error_reports_key, ttl) end diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 6c9c1e9d..6b88fe2b 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -1,17 +1,22 @@ +-- @include _entry_helpers + local zset_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] local worker_queue_key = KEYS[4] local current_time = ARGV[1] -local test = ARGV[2] +local entry = ARGV[2] +local entry_delimiter = ARGV[3] + +local test_id = test_id_from_entry(entry, entry_delimiter) -- already processed, we do not need to bump the timestamp -if redis.call('sismember', processed_key, test) == 1 then +if redis.call('sismember', processed_key, test_id) == 1 then return false end -- we're still the owner of the test, we can bump the timestamp -if redis.call('hget', owners_key, test) == worker_queue_key then - return redis.call('zadd', zset_key, current_time, test) +if redis.call('hget', owners_key, entry) == worker_queue_key then + return redis.call('zadd', zset_key, current_time, entry) end diff --git a/redis/requeue.lua b/redis/requeue.lua index b1dd35ec..d70fcc4c 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -8,14 +8,15 @@ local error_reports_key = KEYS[7] local max_requeues = tonumber(ARGV[1]) local global_max_requeues = tonumber(ARGV[2]) -local test = ARGV[3] -local offset = ARGV[4] +local entry = ARGV[3] +local test_id = ARGV[4] +local offset = ARGV[5] -if redis.call('hget', owners_key, test) == worker_queue_key then - redis.call('hdel', owners_key, test) +if redis.call('hget', owners_key, entry) == worker_queue_key then + redis.call('hdel', owners_key, entry) end -if redis.call('sismember', processed_key, test) == 1 then +if redis.call('sismember', processed_key, test_id) == 1 then return false end @@ -24,23 +25,23 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then return false end -local requeues = tonumber(redis.call('hget', requeues_count_key, test)) +local requeues = tonumber(redis.call('hget', requeues_count_key, test_id)) if requeues and requeues >= max_requeues then return false end redis.call('hincrby', requeues_count_key, '___total___', 1) -redis.call('hincrby', requeues_count_key, test, 1) +redis.call('hincrby', requeues_count_key, test_id, 1) -redis.call('hdel', error_reports_key, test) +redis.call('hdel', error_reports_key, test_id) local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1] if pivot then - redis.call('linsert', queue_key, 'BEFORE', pivot, test) + redis.call('linsert', queue_key, 'BEFORE', pivot, entry) else - redis.call('lpush', queue_key, test) + redis.call('lpush', queue_key, entry) end -redis.call('zrem', zset_key, test) +redis.call('zrem', zset_key, entry) return true diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index 9dfaa616..f595dd28 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -1,3 +1,5 @@ +-- @include _entry_helpers + local zset_key = KEYS[1] local processed_key = KEYS[2] local worker_queue_key = KEYS[3] @@ -5,10 +7,12 @@ local owners_key = KEYS[4] local current_time = ARGV[1] local timeout = ARGV[2] +local entry_delimiter = ARGV[3] local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do - if redis.call('sismember', processed_key, test) == 0 then + local test_id = test_id_from_entry(test, entry_delimiter) + if redis.call('sismember', processed_key, test_id) == 0 then redis.call('zadd', zset_key, current_time, test) redis.call('lpush', worker_queue_key, test) redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership diff --git a/ruby/README.md b/ruby/README.md index 92ba5b99..463f4aa3 100644 --- a/ruby/README.md +++ b/ruby/README.md @@ -38,6 +38,33 @@ minitest-queue --queue redis://example.com run -Itest test/**/*_test.rb Additionally you can configure the requeue settings (see main README) with `--max-requeues` and `--requeue-tolerance`. +#### Lazy loading (opt-in) + +Lazy loading and streaming are currently supported only by `minitest-queue` (not `rspec-queue`). + +To reduce worker memory usage, you can enable lazy loading so test files are loaded on-demand: + +```bash +minitest-queue --queue redis://example.com --lazy-load run -Itest test/**/*_test.rb +``` + +You can tune streaming with `--lazy-load-stream-batch-size` (default: 5000) and `--lazy-load-stream-timeout` (default 300s). + +Environment variables: + +- `CI_QUEUE_LAZY_LOAD=1` +- `CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE=10000` +- `CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT=300` +- `CI_QUEUE_LAZY_LOAD_TEST_HELPERS=test/test_helper.rb` + +Backward-compatible aliases still work: + +- `CI_QUEUE_STREAM_BATCH_SIZE` +- `CI_QUEUE_STREAM_TIMEOUT` +- `CI_QUEUE_TEST_HELPERS` + +When enabled, file loading stats are printed at the end of the run if debug is enabled. + If you'd like to centralize the error reporting you can do so with: diff --git a/ruby/lib/ci/queue.rb b/ruby/lib/ci/queue.rb index 145f51db..84124f12 100644 --- a/ruby/lib/ci/queue.rb +++ b/ruby/lib/ci/queue.rb @@ -14,6 +14,9 @@ require 'ci/queue/file' require 'ci/queue/grind' require 'ci/queue/bisect' +require 'ci/queue/queue_entry' +require 'ci/queue/class_resolver' +require 'ci/queue/file_loader' module CI module Queue @@ -22,6 +25,18 @@ module Queue attr_accessor :shuffler, :requeueable Error = Class.new(StandardError) + ClassNotFoundError = Class.new(Error) + + class FileLoadError < Error + attr_reader :file_path, :original_error + + def initialize(file_path, original_error) + @file_path = file_path + @original_error = original_error + super("Failed to load #{file_path}: #{original_error.class}: #{original_error.message}") + set_backtrace(original_error.backtrace) + end + end module Warnings RESERVED_LOST_TEST = :RESERVED_LOST_TEST @@ -48,6 +63,11 @@ def shuffle(tests, random) end end + def debug? + value = ENV['CI_QUEUE_DEBUG'] + value && !value.strip.empty? && !%w[0 false].include?(value.strip.downcase) + end + def from_uri(url, config) uri = URI(url) implementation = case uri.scheme @@ -65,3 +85,4 @@ def from_uri(url, config) end end end + diff --git a/ruby/lib/ci/queue/class_resolver.rb b/ruby/lib/ci/queue/class_resolver.rb new file mode 100644 index 00000000..56516082 --- /dev/null +++ b/ruby/lib/ci/queue/class_resolver.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module CI + module Queue + module ClassResolver + def self.resolve(class_name, file_path: nil, loader: nil) + klass = try_direct_lookup(class_name) + return klass if klass + + if file_path && loader + loader.load_file(file_path) + klass = try_direct_lookup(class_name) + return klass if klass + end + + raise ClassNotFoundError, "Unable to resolve class #{class_name}" + end + + def self.try_direct_lookup(class_name) + parts = class_name.sub(/\A::/, '').split('::') + current = Object + + parts.each do |name| + return nil unless current.const_defined?(name, false) + + current = current.const_get(name, false) + end + + return nil unless current.is_a?(Class) + + current + rescue NameError + nil + end + private_class_method :try_direct_lookup + end + end +end diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 6743742f..09b213da 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -6,12 +6,16 @@ class Configuration attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds + attr_accessor :lazy_load, :lazy_load_stream_batch_size + attr_accessor :lazy_load_streaming_timeout, :lazy_load_test_helpers attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout class << self def from_env(env) + lazy_load_value = env['CI_QUEUE_LAZY_LOAD'] + lazy_load = lazy_load_value && !lazy_load_value.strip.empty? && !%w(0 false).include?(lazy_load_value.strip.downcase) new( build_id: env['CIRCLE_BUILD_URL'] || env['BUILDKITE_BUILD_ID'] || env['TRAVIS_BUILD_ID'] || env['HEROKU_TEST_RUN_ID'] || env['SEMAPHORE_PIPELINE_ID'], worker_id: env['CIRCLE_NODE_INDEX'] || env['BUILDKITE_PARALLEL_JOB'] || env['CI_NODE_INDEX'] || env['SEMAPHORE_JOB_ID'], @@ -22,6 +26,10 @@ def from_env(env) debug_log: env['CI_QUEUE_DEBUG_LOG'], max_requeues: env['CI_QUEUE_MAX_REQUEUES']&.to_i || 0, requeue_tolerance: env['CI_QUEUE_REQUEUE_TOLERANCE']&.to_f || 0, + lazy_load: lazy_load || false, + lazy_load_stream_batch_size: (env['CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE'] || env['CI_QUEUE_STREAM_BATCH_SIZE'])&.to_i, + lazy_load_streaming_timeout: (env['CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT'] || env['CI_QUEUE_STREAM_TIMEOUT'])&.to_i, + lazy_load_test_helpers: env['CI_QUEUE_LAZY_LOAD_TEST_HELPERS'] || env['CI_QUEUE_TEST_HELPERS'], ) end @@ -46,7 +54,8 @@ def initialize( grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil, max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil, queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil, - export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil) + export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, + lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @failure_file = failure_file @@ -73,6 +82,16 @@ def initialize( @warnings_file = warnings_file @debug_log = debug_log @max_missed_heartbeat_seconds = max_missed_heartbeat_seconds + @lazy_load = lazy_load + @lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000 + @lazy_load_streaming_timeout = lazy_load_streaming_timeout + @lazy_load_test_helpers = lazy_load_test_helpers + end + + def lazy_load_test_helper_paths + return [] unless @lazy_load_test_helpers + + @lazy_load_test_helpers.split(',').map(&:strip) end def queue_init_timeout @@ -83,6 +102,43 @@ def report_timeout @report_timeout || timeout end + def lazy_load_streaming_timeout + if @lazy_load_streaming_timeout && @lazy_load_streaming_timeout > 0 + @lazy_load_streaming_timeout + else + [queue_init_timeout, 300].max + end + end + + # Backward-compatible aliases for existing callers. + def stream_batch_size + lazy_load_stream_batch_size + end + + def stream_batch_size=(value) + self.lazy_load_stream_batch_size = value + end + + def streaming_timeout + lazy_load_streaming_timeout + end + + def streaming_timeout=(value) + self.lazy_load_streaming_timeout = value + end + + def test_helpers + lazy_load_test_helpers + end + + def test_helpers=(value) + self.lazy_load_test_helpers = value + end + + def test_helper_paths + lazy_load_test_helper_paths + end + def inactive_workers_timeout @inactive_workers_timeout || timeout end diff --git a/ruby/lib/ci/queue/file_loader.rb b/ruby/lib/ci/queue/file_loader.rb new file mode 100644 index 00000000..e18ba42e --- /dev/null +++ b/ruby/lib/ci/queue/file_loader.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require 'set' + +module CI + module Queue + class FileLoader + attr_reader :load_stats + + def initialize + @loaded_files = Set.new + @pid = Process.pid + @forked = false + @load_stats = {} + @loaded_features = nil + end + + def load_file(file_path) + detect_fork! + return if @loaded_files.include?(file_path) + + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + error = nil + + begin + required = require file_path + if should_force_load_after_fork?(required, file_path) + with_warning_suppression { load file_path } + end + rescue Exception => e + raise if e.is_a?(SignalException) || e.is_a?(SystemExit) + error = e + ensure + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start + @load_stats[file_path] = duration + end + + raise FileLoadError.new(file_path, error) if error + + remember_loaded_feature(file_path) + @loaded_files.add(file_path) + nil + end + + def total_load_time + load_stats.values.sum + end + + def slowest_files(limit = 10) + load_stats.sort_by { |_, duration| -duration }.take(limit) + end + + private + + def detect_fork! + return if @pid == Process.pid + + @pid = Process.pid + @forked = true + @loaded_files.clear + @load_stats.clear + @loaded_features = nil + end + + def file_in_loaded_features?(file_path) + loaded_features.include?(::File.expand_path(file_path)) + end + + def loaded_features + @loaded_features ||= Set.new($LOADED_FEATURES.map { |loaded| ::File.expand_path(loaded) }) + end + + def remember_loaded_feature(file_path) + loaded_features.add(::File.expand_path(file_path)) + end + + def should_force_load_after_fork?(required, file_path) + @forked && !required && file_in_loaded_features?(file_path) + end + + def with_warning_suppression + previous = $VERBOSE + $VERBOSE = nil + yield + ensure + $VERBOSE = previous + end + end + end +end diff --git a/ruby/lib/ci/queue/queue_entry.rb b/ruby/lib/ci/queue/queue_entry.rb new file mode 100644 index 00000000..81846f27 --- /dev/null +++ b/ruby/lib/ci/queue/queue_entry.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require 'base64' +require 'json' + +module CI + module Queue + module QueueEntry + DELIMITER = "\t" + LOAD_ERROR_PREFIX = '__ciq_load_error__:'.freeze + + def self.test_id(entry) + pos = entry.index(DELIMITER) + pos ? entry[0, pos] : entry + end + + def self.parse(entry) + return { test_id: entry, file_path: nil } unless entry.include?(DELIMITER) + + test_id, file_path = entry.split(DELIMITER, 2) + file_path = nil if file_path == "" + { test_id: test_id, file_path: file_path } + end + + def self.format(test_id, file_path) + return test_id if file_path.nil? || file_path == "" + + "#{test_id}#{DELIMITER}#{file_path}" + end + + def self.load_error_payload?(file_path) + file_path&.start_with?(LOAD_ERROR_PREFIX) + end + + def self.encode_load_error(file_path, error) + original = error.respond_to?(:original_error) ? error.original_error : error + payload = { + 'file_path' => file_path, + 'error_class' => original.class.name, + 'error_message' => original.message, + 'backtrace' => original.backtrace, + } + "#{LOAD_ERROR_PREFIX}#{Base64.strict_encode64(JSON.dump(payload))}" + end + + def self.decode_load_error(file_path) + return nil unless load_error_payload?(file_path) + + encoded = file_path.sub(LOAD_ERROR_PREFIX, '') + JSON.parse(Base64.strict_decode64(encoded)) + rescue ArgumentError, JSON::ParserError + nil + end + end + end +end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 021d419e..8c253aee 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -144,19 +144,26 @@ def test_ids end def to_a - test_ids.reverse.map { |k| index.fetch(k) } + test_ids.reverse.map do |entry| + index.fetch(entry) do + test_id = CI::Queue::QueueEntry.test_id(entry) + index.fetch(test_id) + end + end end def progress - total - size + progress = total - size + progress < 0 ? 0 : progress end - def wait_for_master(timeout: 30) + def wait_for_master(timeout: 30, allow_streaming: false) return true if master? return true if queue_initialized? + return true if allow_streaming && streaming? (timeout * 10 + 1).to_i.times do - if queue_initialized? + if queue_initialized? || (allow_streaming && streaming?) return true else sleep 0.1 @@ -177,6 +184,10 @@ def queue_initialized? end end + def streaming? + master_status == 'streaming' + end + def queue_initializing? master_status == 'setup' end @@ -235,18 +246,31 @@ def load_script(script) end def read_script(name) - ::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua")) + resolve_lua_includes( + ::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua")), + CI::Queue::DEV_SCRIPTS_ROOT, + ) rescue SystemCallError - ::File.read(::File.join(CI::Queue::RELEASE_SCRIPTS_ROOT, "#{name}.lua")) + resolve_lua_includes( + ::File.read(::File.join(CI::Queue::RELEASE_SCRIPTS_ROOT, "#{name}.lua")), + CI::Queue::RELEASE_SCRIPTS_ROOT, + ) + end + + def resolve_lua_includes(script, root) + script.gsub(/^-- @include (\S+)$/) do + ::File.read(::File.join(root, "#{$1}.lua")) + end end class HeartbeatProcess - def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) @redis_url = redis_url @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key + @entry_delimiter = entry_delimiter end def boot! @@ -261,6 +285,7 @@ def boot! @processed_key, @owners_key, @worker_queue_key, + @entry_delimiter, in: child_read, out: child_write, ) @@ -335,6 +360,7 @@ def heartbeat_process key('processed'), key('owners'), key('worker', worker_id, 'queue'), + entry_delimiter: CI::Queue::QueueEntry::DELIMITER, ) end diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index cb45a755..31336a0c 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -108,6 +108,18 @@ def flaky_reports redis.smembers(key('flaky-reports')) end + def record_worker_profile(profile) + redis.pipelined do |pipeline| + pipeline.hset(key('worker-profiles'), config.worker_id, JSON.dump(profile)) + pipeline.expire(key('worker-profiles'), config.redis_ttl) + end + end + + def worker_profiles + raw = redis.hgetall(key('worker-profiles')) + raw.transform_values { |v| JSON.parse(v) } + end + def fetch_stats(stat_names) counts = redis.pipelined do |pipeline| stat_names.each { |c| pipeline.hvals(key(c)) } diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb index bdf1466e..b737690c 100755 --- a/ruby/lib/ci/queue/redis/monitor.rb +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -13,11 +13,12 @@ class Monitor DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__) RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__) - def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter) @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key + @entry_delimiter = entry_delimiter @logger = logger @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5]) @shutdown = false @@ -40,7 +41,7 @@ def process_tick!(id:) eval_script( :heartbeat, keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key], - argv: [Time.now.to_f, id] + argv: [Time.now.to_f, id, @entry_delimiter] ) rescue => error @logger.info(error) @@ -56,9 +57,21 @@ def load_script(script) end def read_script(name) - ::File.read(::File.join(DEV_SCRIPTS_ROOT, "#{name}.lua")) + resolve_lua_includes( + ::File.read(::File.join(DEV_SCRIPTS_ROOT, "#{name}.lua")), + DEV_SCRIPTS_ROOT, + ) rescue SystemCallError - ::File.read(::File.join(RELEASE_SCRIPTS_ROOT, "#{name}.lua")) + resolve_lua_includes( + ::File.read(::File.join(RELEASE_SCRIPTS_ROOT, "#{name}.lua")), + RELEASE_SCRIPTS_ROOT, + ) + end + + def resolve_lua_includes(script, root) + script.gsub(/^-- @include (\S+)$/) do + ::File.read(::File.join(root, "#{$1}.lua")) + end end HEADER = 'L' @@ -142,9 +155,10 @@ def monitor processed_key = ARGV[2] owners_key = ARGV[3] worker_queue_key = ARGV[4] +entry_delimiter = ARGV[5] logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}") -manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter) # Notify the parent we're ready $stdout.puts(".") diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index 0fcc47d5..385a4325 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -9,7 +9,7 @@ def master? end def total - wait_for_master(timeout: config.queue_init_timeout) + wait_for_master(timeout: config.queue_init_timeout, allow_streaming: true) redis.get(key('total')).to_i end @@ -19,7 +19,7 @@ def build def wait_for_workers duration = measure do - wait_for_master(timeout: config.queue_init_timeout) + wait_for_master(timeout: config.queue_init_timeout, allow_streaming: true) end yield if block_given? diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index d311be39..983ab42d 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'ci/queue/static' require 'concurrent/set' +require 'concurrent/map' module CI module Queue @@ -13,11 +14,13 @@ class << self self.max_sleep_time = 2 class Worker < Base - attr_reader :total + attr_accessor :entry_resolver + attr_reader :first_reserve_at def initialize(redis, config) @reserved_tests = Concurrent::Set.new @shutdown_required = false + @first_reserve_at = nil super(redis, config) end @@ -27,15 +30,65 @@ def distributed? def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h - tests = Queue.shuffle(tests, random) - push(tests.map(&:id)) + entries = Queue.shuffle(tests, random).map { |test| queue_entry_for(test) } + push(entries) self end + def stream_populate(tests, random: Random.new, batch_size: 10_000) + batch_size = batch_size.to_i + batch_size = 1 if batch_size < 1 + + value = key('setup', worker_id) + _, status = redis.pipelined do |pipeline| + pipeline.set(key('master-status'), value, nx: true) + pipeline.get(key('master-status')) + end + + if @master = (value == status) + @total = 0 + puts "Worker elected as leader, streaming tests to the queue." + + duration = measure do + start_streaming! + buffer = [] + + tests.each do |test| + buffer << test + + if buffer.size >= batch_size + push_batch(buffer, random) + buffer.clear + end + end + + push_batch(buffer, random) unless buffer.empty? + finalize_streaming + end + + puts "Streamed #{@total} tests in #{duration.round(2)}s." + $stdout.flush + end + + register + redis.expire(key('workers'), config.redis_ttl) + self + rescue *CONNECTION_ERRORS + raise if @master + end + def populated? !!defined?(@index) end + def total + return @total if defined?(@total) && @total + + redis.get(key('total')).to_i + rescue *CONNECTION_ERRORS + @total || 0 + end + def shutdown! @shutdown_required = true end @@ -51,13 +104,18 @@ def master? DEFAULT_SLEEP_SECONDS = 0.5 def poll - wait_for_master + wait_for_master(timeout: config.queue_init_timeout, allow_streaming: true) attempt = 0 until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? - if test = reserve + if entry = reserve attempt = 0 - yield index.fetch(test) + yield resolve_entry(entry) else + if still_streaming? + raise LostMaster, "Streaming stalled for more than #{config.lazy_load_streaming_timeout}s" if streaming_stale? + sleep 0.1 + next + end # Adding exponential backoff to avoid hammering Redis # we just stay online here in case a test gets retried or times out so we can afford to wait sleep_time = [DEFAULT_SLEEP_SECONDS * (2 ** attempt), Redis.max_sleep_time].min @@ -89,6 +147,7 @@ def retrying? def retry_queue failures = build.failed_tests.to_set log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1) + log = log.map { |entry| queue_entry_test_id(entry) } log.select! { |id| failures.include?(id) } log.uniq! log.reverse! @@ -103,23 +162,38 @@ def build @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config) end + def file_loader + @file_loader ||= CI::Queue::FileLoader.new + end + + def worker_queue_length + redis.llen(key('worker', worker_id, 'queue')) + rescue *CONNECTION_ERRORS + nil + end + def report_worker_error(error) build.report_worker_error(error) end def acknowledge(test_key, error: nil, pipeline: redis) - raise_on_mismatching_test(test_key) + test_id = normalize_test_id(test_key) + assert_reserved!(test_id) + entry = reserved_entries.fetch(test_id, queue_entry_for(test_key)) + unreserve_entry(test_id) eval_script( :acknowledge, keys: [key('running'), key('processed'), key('owners'), key('error-reports')], - argv: [test_key, error.to_s, config.redis_ttl], + argv: [entry, test_id, error.to_s, config.redis_ttl], pipeline: pipeline, ) == 1 end def requeue(test, offset: Redis.requeue_offset) - test_key = test.id - raise_on_mismatching_test(test_key) + test_id = normalize_test_id(test) + assert_reserved!(test_id) + entry = reserved_entries.fetch(test_id, queue_entry_for(test)) + unreserve_entry(test_id) global_max_requeues = config.global_max_requeues(total) requeued = config.max_requeues > 0 && global_max_requeues > 0 && eval_script( @@ -133,10 +207,13 @@ def requeue(test, offset: Redis.requeue_offset) key('owners'), key('error-reports'), ], - argv: [config.max_requeues, global_max_requeues, test_key, offset], + argv: [config.max_requeues, global_max_requeues, entry, test_id, offset], ) == 1 - reserved_tests << test_key unless requeued + unless requeued + reserved_tests << test_id + reserved_entries[test_id] = entry + end requeued end @@ -157,19 +234,129 @@ def reserved_tests @reserved_tests ||= Concurrent::Set.new end + def reserved_entries + @reserved_entries ||= Concurrent::Map.new + end + + def reserved_entry_ids + @reserved_entry_ids ||= Concurrent::Map.new + end + def worker_id config.worker_id end - def raise_on_mismatching_test(test) - unless reserved_tests.delete?(test) - raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" + def assert_reserved!(test_id) + unless reserved_tests.include?(test_id) + raise ReservationError, "Acknowledged #{test_id.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" + end + end + + def reserve_entry(entry) + test_id = queue_entry_test_id(entry) + reserved_tests << test_id + reserved_entries[test_id] = entry + reserved_entry_ids[entry] = test_id + end + + def unreserve_entry(test_id) + entry = reserved_entries.delete(test_id) + reserved_tests.delete(test_id) + reserved_entry_ids.delete(entry) if entry + end + + def normalize_test_id(test_key) + key = test_key.respond_to?(:id) ? test_key.id : test_key + if key.is_a?(String) + cached = reserved_entry_ids[key] + return cached if cached + end + queue_entry_test_id(key) + end + + def queue_entry_test_id(entry) + CI::Queue::QueueEntry.test_id(entry) + end + + def queue_entry_for(test) + return test.queue_entry if test.respond_to?(:queue_entry) + return test.id if test.respond_to?(:id) + + test + end + + def resolve_entry(entry) + test_id = reserved_entry_ids[entry] || queue_entry_test_id(entry) + if populated? + return index[test_id] if index.key?(test_id) + end + + return entry_resolver.call(entry) if entry_resolver + + entry + end + + def still_streaming? + master_status == 'streaming' + end + + def streaming_stale? + timeout = config.lazy_load_streaming_timeout.to_i + updated_at = redis.get(key('streaming-updated-at')) + return true unless updated_at + + (CI::Queue.time_now.to_f - updated_at.to_f) > timeout + rescue *CONNECTION_ERRORS + false + end + + def start_streaming! + timeout = config.lazy_load_streaming_timeout.to_i + with_redis_timeout(5) do + redis.multi do |transaction| + transaction.set(key('total'), 0) + transaction.set(key('master-status'), 'streaming') + transaction.set(key('streaming-updated-at'), CI::Queue.time_now.to_f) + transaction.expire(key('streaming-updated-at'), timeout) + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end + end + end + + def push_batch(tests, random) + entries = Queue.shuffle(tests, random).map { |test| queue_entry_for(test) } + return if entries.empty? + + @total += entries.size + timeout = config.lazy_load_streaming_timeout.to_i + redis.multi do |transaction| + transaction.lpush(key('queue'), entries) + transaction.incrby(key('total'), entries.size) + transaction.set(key('master-status'), 'streaming') + transaction.set(key('streaming-updated-at'), CI::Queue.time_now.to_f) + transaction.expire(key('streaming-updated-at'), timeout) + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end + end + + def finalize_streaming + redis.multi do |transaction| + transaction.set(key('master-status'), 'ready') + transaction.expire(key('master-status'), config.redis_ttl) + transaction.del(key('streaming-updated-at')) end end def reserve - (try_to_reserve_lost_test || try_to_reserve_test).tap do |test| - reserved_tests << test if test + (try_to_reserve_lost_test || try_to_reserve_test).tap do |entry| + if entry + @first_reserve_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + reserve_entry(entry) + end end end @@ -198,7 +385,7 @@ def try_to_reserve_lost_test key('worker', worker_id, 'queue'), key('owners'), ], - argv: [CI::Queue.time_now.to_f, timeout], + argv: [CI::Queue.time_now.to_f, timeout, CI::Queue::QueueEntry::DELIMITER], ) if lost_test @@ -208,8 +395,8 @@ def try_to_reserve_lost_test lost_test end - def push(tests) - @total = tests.size + def push(entries) + @total = entries.size # We set a unique value (worker_id) and read it back to make "SET if Not eXists" idempotent in case of a retry. value = key('setup', worker_id) @@ -227,7 +414,7 @@ def push(tests) with_redis_timeout(5) do redis.without_reconnect do redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.lpush(key('queue'), entries) unless entries.empty? transaction.set(key('total'), @total) transaction.set(key('master-status'), 'ready') diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 09849cb7..956b7bfa 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -2,6 +2,7 @@ require 'shellwords' require 'minitest' require 'minitest/reporters' +require 'concurrent/map' require 'minitest/queue/failure_formatter' require 'minitest/queue/error_report' @@ -106,6 +107,10 @@ module WithTimestamps attr_accessor :start_timestamp, :finish_timestamp end + module ResultMetadata + attr_accessor :queue_id, :queue_entry + end + module Queue extend ::CI::Queue::OutputHelpers attr_writer :run_command_formatter, :project_root @@ -156,19 +161,29 @@ def queue def run(reporter, *) rescue_run_errors do - queue.poll do |example| - result = queue.with_heartbeat(example.id) do - example.run + begin + queue.poll do |example| + result = queue.with_heartbeat(example.queue_entry) do + example.run + end + + handle_test_result(reporter, example, result) end - handle_test_result(reporter, example, result) + report_load_stats(queue) + ensure + store_worker_profile(queue) end - queue.stop_heartbeat! end end def handle_test_result(reporter, example, result) + if result.respond_to?(:queue_id=) + result.queue_id = example.id + result.queue_entry = example.queue_entry if result.respond_to?(:queue_entry=) + end + failed = !(result.passed? || result.skipped?) if example.flaky? @@ -194,6 +209,85 @@ def handle_test_result(reporter, example, result) private + def report_load_stats(queue) + return unless CI::Queue.debug? + return unless queue.respond_to?(:file_loader) + return unless queue.respond_to?(:config) && queue.config.lazy_load + + loader = queue.file_loader + return if loader.load_stats.empty? + + total_time = loader.total_load_time + file_count = loader.load_stats.size + average = file_count.zero? ? 0 : (total_time / file_count) + + puts + puts "File loading stats:" + puts " Total time: #{total_time.round(2)}s" + puts " Files loaded: #{file_count}" + puts " Average: #{average.round(3)}s per file" + + slowest = loader.slowest_files(5) + return if slowest.empty? + + puts " Slowest files:" + slowest.each do |file_path, duration| + puts " #{duration.round(3)}s - #{Minitest::Queue.relative_path(file_path)}" + end + end + + def store_worker_profile(queue) + debug = CI::Queue.debug? + return unless queue.respond_to?(:config) + config = queue.config + + run_start = Minitest::Queue::Runner.run_start + return unless run_start + + run_end = Process.clock_gettime(Process::CLOCK_MONOTONIC) + profile = { + 'worker_id' => config.worker_id, + 'mode' => config.lazy_load ? 'lazy' : 'eager', + 'role' => queue.master? ? 'leader' : 'non-leader', + 'total_wall_clock' => (run_end - run_start).round(2), + } + + first_test = queue.respond_to?(:first_reserve_at) ? queue.first_reserve_at : nil + profile['time_to_first_test'] = (first_test - run_start).round(2) if first_test + + tests_run = queue.rescue_connection_errors { queue.worker_queue_length } if queue.respond_to?(:worker_queue_length) + profile['tests_run'] = tests_run.to_i if tests_run + + load_tests_duration = Minitest::Queue::Runner.load_tests_duration + profile['load_tests_duration'] = load_tests_duration.round(2) if load_tests_duration + + if queue.respond_to?(:file_loader) && queue.file_loader.load_stats.any? + loader = queue.file_loader + profile['files_loaded'] = loader.load_stats.size + profile['file_load_time'] = loader.total_load_time.round(2) + end + + profile['total_files'] = Minitest::Queue::Runner.total_files if Minitest::Queue::Runner.total_files + + rss_kb = begin + if File.exist?("/proc/#{Process.pid}/statm") + pages = Integer(File.read("/proc/#{Process.pid}/statm").split[1]) + pages * 4 + else + Integer(`ps -o rss= -p #{Process.pid}`.strip) + end + rescue + nil + end + profile['memory_rss_kb'] = rss_kb if rss_kb + + queue.rescue_connection_errors do + queue.build.record_worker_profile(profile) + end + rescue => e + puts "WARNING: Failed to store worker profile: #{e.message}" if debug + end + def rescue_run_errors(&block) block.call rescue Errno::EPIPE @@ -232,6 +326,10 @@ def id @id ||= "#{@runnable}##{@method_name}".freeze end + def queue_entry + id + end + def <=>(other) id <=> other.id end @@ -270,6 +368,149 @@ def current_timestamp end end + class LazySingleExample + attr_reader :class_name, :method_name, :file_path + + def initialize(class_name, method_name, file_path, loader:, resolver:, load_error: nil, queue_entry: nil) + @class_name = class_name + @method_name = method_name + @file_path = file_path + @loader = loader + @resolver = resolver + @load_error = load_error + @queue_entry_override = queue_entry + @runnable = nil + end + + def id + @id ||= "#{@class_name}##{@method_name}".freeze + end + + def queue_entry + @queue_entry ||= @queue_entry_override || CI::Queue::QueueEntry.format(id, file_path) + end + + def <=>(other) + id <=> other.id + end + + RUNNABLE_METHODS_TRIGGERED = Concurrent::Map.new # :nodoc: + + def runnable + @runnable ||= begin + klass = @resolver.resolve(@class_name, file_path: @file_path, loader: @loader) + unless RUNNABLE_METHODS_TRIGGERED[klass] + klass.runnable_methods + RUNNABLE_METHODS_TRIGGERED[klass] = true + end + + # If the method doesn't exist, the class may have been autoloaded by + # Zeitwerk without executing test-specific code (includes, helpers). + # Force load the file so all class-definition-time code executes. + unless klass.method_defined?(@method_name) || klass.private_method_defined?(@method_name) + if @file_path && @loader + @loader.load_file(@file_path) + RUNNABLE_METHODS_TRIGGERED.delete(klass) + klass.runnable_methods + RUNNABLE_METHODS_TRIGGERED[klass] = true + end + end + + klass + end + end + + def with_timestamps + start_timestamp = current_timestamp + result = yield + result + ensure + if result + result.start_timestamp = start_timestamp + result.finish_timestamp = current_timestamp + end + end + + def run + with_timestamps do + begin + return build_error_result(@load_error) if @load_error + Minitest.run_one_method(runnable, @method_name) + rescue StandardError, ScriptError => error + build_error_result(error) + end + end + end + + def flaky? + Minitest.queue.flaky?(self) + end + + def source_location + return nil if @load_error + + runnable.instance_method(@method_name).source_location + rescue NameError, NoMethodError, CI::Queue::FileLoadError, CI::Queue::ClassNotFoundError + nil + end + + def marshal_dump + { + 'class_name' => @class_name, + 'method_name' => @method_name, + 'file_path' => @file_path, + 'load_error' => serialize_error(@load_error), + 'queue_entry' => @queue_entry_override, + } + end + + def marshal_load(payload) + @class_name = payload['class_name'] + @method_name = payload['method_name'] + @file_path = payload['file_path'] + @load_error = deserialize_error(payload['load_error']) + @queue_entry_override = payload['queue_entry'] + @loader = CI::Queue::FileLoader.new + @resolver = CI::Queue::ClassResolver + @runnable = nil + @id = nil + @queue_entry = nil + end + + private + + def serialize_error(error) + return nil unless error + + { + 'class' => error.class.name, + 'message' => error.message, + 'backtrace' => error.backtrace, + } + end + + def deserialize_error(payload) + return nil unless payload + + message = "#{payload['class']}: #{payload['message']}" + error = StandardError.new(message) + error.set_backtrace(payload['backtrace']) if payload['backtrace'] + CI::Queue::FileLoadError.new(@file_path, error) + end + + def build_error_result(error) + result_class = defined?(Minitest::Result) ? Minitest::Result : Minitest::Test + result = result_class.new(@method_name) + result.klass = @class_name if result.respond_to?(:klass=) + result.failures << Minitest::UnexpectedError.new(error) + result + end + + def current_timestamp + CI::Queue.time_now.to_i + end + end + attr_accessor :queue def queue_reporters=(reporters) @@ -310,10 +551,12 @@ def __run(*args) Minitest::Result.prepend(Minitest::Requeueing) Minitest::Result.prepend(Minitest::Flakiness) Minitest::Result.prepend(Minitest::WithTimestamps) + Minitest::Result.prepend(Minitest::ResultMetadata) else Minitest::Test.prepend(Minitest::Requeueing) Minitest::Test.prepend(Minitest::Flakiness) Minitest::Test.prepend(Minitest::WithTimestamps) + Minitest::Test.prepend(Minitest::ResultMetadata) module MinitestBackwardCompatibility def source_location diff --git a/ruby/lib/minitest/queue/build_status_recorder.rb b/ruby/lib/minitest/queue/build_status_recorder.rb index bf50b1f5..3e98d83b 100644 --- a/ruby/lib/minitest/queue/build_status_recorder.rb +++ b/ruby/lib/minitest/queue/build_status_recorder.rb @@ -49,12 +49,17 @@ def record(test) end stats = COUNTERS.zip(COUNTERS.map { |c| send(c) }).to_h + test_id = if test.respond_to?(:queue_id) && test.queue_id + test.queue_id + else + "#{test.klass}##{test.name}" + end if (test.failure || test.error?) && !test.skipped? - build.record_error("#{test.klass}##{test.name}", dump(test), stats: stats) + build.record_error(test_id, dump(test), stats: stats) elsif test.requeued? - build.record_requeue("#{test.klass}##{test.name}", stats: stats) + build.record_requeue(test_id, stats: stats) else - build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?) + build.record_success(test_id, stats: stats, skip_flaky_record: test.skipped?) end end diff --git a/ruby/lib/minitest/queue/junit_reporter.rb b/ruby/lib/minitest/queue/junit_reporter.rb index a90a80ed..2dd9e470 100644 --- a/ruby/lib/minitest/queue/junit_reporter.rb +++ b/ruby/lib/minitest/queue/junit_reporter.rb @@ -136,9 +136,9 @@ def analyze_suite(tests) result[:time] = 0 tests.each do |test| result[:"#{result(test)}_count"] += 1 - result[:assertion_count] += test.assertions + result[:assertion_count] += test.assertions || 0 result[:test_count] += 1 - result[:time] += test.time + result[:time] += test.time || 0 end result end diff --git a/ruby/lib/minitest/queue/lazy_entry_resolver.rb b/ruby/lib/minitest/queue/lazy_entry_resolver.rb new file mode 100644 index 00000000..a8a7552a --- /dev/null +++ b/ruby/lib/minitest/queue/lazy_entry_resolver.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Minitest + module Queue + class LazyEntryResolver + def initialize(loader:, resolver:) + @loader = loader + @resolver = resolver + end + + def call(entry) + parsed = CI::Queue::QueueEntry.parse(entry) + class_name, method_name = parsed.fetch(:test_id).split('#', 2) + if CI::Queue::QueueEntry.load_error_payload?(parsed[:file_path]) + payload = CI::Queue::QueueEntry.decode_load_error(parsed[:file_path]) + if payload + error = StandardError.new("#{payload['error_class']}: #{payload['error_message']}") + error.set_backtrace(payload['backtrace']) if payload['backtrace'] + load_error = CI::Queue::FileLoadError.new(payload['file_path'], error) + return Minitest::Queue::LazySingleExample.new( + class_name, + method_name, + payload['file_path'], + loader: @loader, + resolver: @resolver, + load_error: load_error, + queue_entry: entry, + ) + end + end + + Minitest::Queue::LazySingleExample.new( + class_name, + method_name, + parsed[:file_path], + loader: @loader, + resolver: @resolver, + queue_entry: entry, + ) + end + end + end +end diff --git a/ruby/lib/minitest/queue/lazy_test_discovery.rb b/ruby/lib/minitest/queue/lazy_test_discovery.rb new file mode 100644 index 00000000..e3b978a3 --- /dev/null +++ b/ruby/lib/minitest/queue/lazy_test_discovery.rb @@ -0,0 +1,168 @@ +# frozen_string_literal: true + +require 'set' + +module Minitest + module Queue + class LazyTestDiscovery + def initialize(loader:, resolver:) + @loader = loader + @resolver = resolver + end + + def enumerator(files) + Enumerator.new do |yielder| + each_test(files) do |test| + yielder << test + end + end + end + + def each_test(files) + discovery_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + total_files = 0 + new_runnable_files = 0 + reopened_files = 0 + reopened_candidates = 0 + reopened_scan_time = 0.0 + + seen = Set.new + runnables = Minitest::Test.runnables + known_count = runnables.size + by_full_name = {} + by_short_name = Hash.new { |h, k| h[k] = [] } + index_runnables(runnables, by_full_name, by_short_name) + + files.each do |file| + total_files += 1 + file_path = File.expand_path(file) + begin + @loader.load_file(file_path) + rescue CI::Queue::FileLoadError => error + method_name = "load_file_#{file_path.hash.abs}" + class_name = "CIQueue::FileLoadError" + test_id = "#{class_name}##{method_name}" + entry = CI::Queue::QueueEntry.format( + test_id, + CI::Queue::QueueEntry.encode_load_error(file_path, error), + ) + yield Minitest::Queue::LazySingleExample.new( + class_name, + method_name, + file_path, + loader: @loader, + resolver: @resolver, + load_error: error, + queue_entry: entry, + ) + next + end + + runnables = Minitest::Test.runnables + candidates = [] + if runnables.size > known_count + new_runnables = runnables[known_count..] + known_count = runnables.size + index_runnables(new_runnables, by_full_name, by_short_name) + candidates.concat(new_runnables) + new_runnable_files += 1 + else + # Re-opened classes do not increase runnables size. In that case, map + # declared class names in the file to known runnables directly. + reopened_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + reopened = reopened_runnables_for_file(file_path, by_full_name, by_short_name) + reopened_scan_time += Process.clock_gettime(Process::CLOCK_MONOTONIC) - reopened_start + unless reopened.empty? + reopened_files += 1 + reopened_candidates += reopened.size + end + candidates.concat(reopened) + end + + enqueue_discovered_tests(candidates.uniq, file_path, seen) do |test| + yield test + end + end + ensure + debug_discovery_profile( + discovery_start: discovery_start, + total_files: total_files, + new_runnable_files: new_runnable_files, + reopened_files: reopened_files, + reopened_candidates: reopened_candidates, + reopened_scan_time: reopened_scan_time, + ) + end + + private + + def reopened_runnables_for_file(file_path, by_full_name, by_short_name) + declared = declared_class_names(file_path) + return [] if declared.empty? + + declared.each_with_object([]) do |name, runnables| + runnable = by_full_name[name] + if runnable + runnables << runnable + next + end + + short_name = name.split('::').last + runnables.concat(by_short_name[short_name]) + end + end + + def index_runnables(runnables, by_full_name, by_short_name) + runnables.each do |runnable| + name = runnable.name + next unless name + + by_full_name[name] ||= runnable + short_name = name.split('::').last + by_short_name[short_name] << runnable + end + end + + def debug_discovery_profile(discovery_start:, total_files:, new_runnable_files:, reopened_files:, reopened_candidates:, reopened_scan_time:) + return unless CI::Queue.debug? + + total_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - discovery_start + puts "[ci-queue][lazy-discovery] files=#{total_files} new_runnable_files=#{new_runnable_files} " \ + "reopened_files=#{reopened_files} reopened_candidates=#{reopened_candidates} " \ + "reopened_scan_time=#{reopened_scan_time.round(2)}s total_time=#{total_time.round(2)}s" + end + + def enqueue_discovered_tests(runnables, file_path, seen) + runnables.each do |runnable| + runnable.runnable_methods.each do |method_name| + test_id = "#{runnable.name}##{method_name}" + next if seen.include?(test_id) + + seen.add(test_id) + yield Minitest::Queue::LazySingleExample.new( + runnable.name, + method_name, + file_path, + loader: @loader, + resolver: @resolver, + ) + rescue NameError, NoMethodError + next + end + end + end + + def declared_class_names(file_path) + names = Set.new + ::File.foreach(file_path) do |line| + match = line.match(/^\s*class\s+([A-Z]\w*(?:::[A-Z]\w*)*)\b/) + names.add(match[1]) if match + end + names + rescue SystemCallError + Set.new + end + + end + end +end diff --git a/ruby/lib/minitest/queue/order_reporter.rb b/ruby/lib/minitest/queue/order_reporter.rb index 9e409576..eb1f0d71 100644 --- a/ruby/lib/minitest/queue/order_reporter.rb +++ b/ruby/lib/minitest/queue/order_reporter.rb @@ -5,6 +5,9 @@ class Minitest::Queue::OrderReporter < Minitest::Reporters::BaseReporter def initialize(options = {}) @path = options.delete(:path) @file = nil + @flush_every = Integer(ENV.fetch('CI_QUEUE_ORDER_FLUSH_EVERY', '50')) + @flush_every = 1 if @flush_every < 1 + @pending = 0 super end @@ -16,10 +19,15 @@ def start def before_test(test) super file.puts("#{test.class.name}##{test.name}") - file.flush + @pending += 1 + if @pending >= @flush_every + file.flush + @pending = 0 + end end def report + file.flush file.close end @@ -29,4 +37,3 @@ def file @file ||= File.open(@path, 'a+') end end - diff --git a/ruby/lib/minitest/queue/queue_population_strategy.rb b/ruby/lib/minitest/queue/queue_population_strategy.rb new file mode 100644 index 00000000..2d56dadf --- /dev/null +++ b/ruby/lib/minitest/queue/queue_population_strategy.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +require 'minitest/queue/lazy_entry_resolver' +require 'minitest/queue/lazy_test_discovery' + +module Minitest + module Queue + class QueuePopulationStrategy + attr_reader :load_tests_duration, :total_files + + def initialize(queue:, queue_config:, argv:, test_files_file:, ordering_seed:) + @queue = queue + @queue_config = queue_config + @argv = argv + @test_files_file = test_files_file + @ordering_seed = ordering_seed + end + + def load_and_populate! + load_tests + populate_queue + end + + private + + attr_reader :queue, :queue_config, :argv, :test_files_file, :ordering_seed + + def populate_queue + if queue_config.lazy_load && queue.respond_to?(:stream_populate) + configure_lazy_queue + queue.stream_populate(lazy_test_enumerator, random: ordering_seed, batch_size: queue_config.lazy_load_stream_batch_size) + else + queue.populate(Minitest.loaded_tests, random: ordering_seed) + end + end + + def load_tests + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + if queue_config.lazy_load && queue.respond_to?(:stream_populate) + # In lazy-load mode, test files are loaded on-demand by the entry resolver. + # Load test helpers (e.g., test/test_helper.rb via CI_QUEUE_LAZY_LOAD_TEST_HELPERS) + # to boot the app for all workers. + queue_config.lazy_load_test_helper_paths.each do |helper_path| + require File.expand_path(helper_path) + end + else + test_file_list.sort.each do |file_path| + require File.expand_path(file_path) + end + end + ensure + @load_tests_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start + @total_files = begin + test_file_list.size + rescue StandardError + nil + end + end + + def configure_lazy_queue + return unless queue.respond_to?(:entry_resolver=) + + queue.entry_resolver = lazy_entry_resolver + end + + def lazy_entry_resolver + loader = queue.respond_to?(:file_loader) ? queue.file_loader : CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + Minitest::Queue::LazyEntryResolver.new(loader: loader, resolver: resolver) + end + + def lazy_test_enumerator + loader = queue.respond_to?(:file_loader) ? queue.file_loader : CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + files = test_file_list.sort + + Minitest::Queue::LazyTestDiscovery.new(loader: loader, resolver: resolver).enumerator(files) + end + + # Returns the list of test files to process. Prefers --test-files FILE + # (reads paths from a file, one per line) over positional argv arguments. + # --test-files avoids ARG_MAX limits for large test suites (36K+ files). + def test_file_list + @test_file_list ||= begin + if test_files_file + File.readlines(test_files_file, chomp: true).reject { |f| f.strip.empty? } + else + argv + end + end + end + end + end +end diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index ef50ada3..5d7fa3e5 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -6,6 +6,8 @@ require 'digest/md5' require 'minitest/reporters/bisect_reporter' require 'minitest/reporters/statsd_reporter' +require 'minitest/queue/queue_population_strategy' +require 'minitest/queue/worker_profile_reporter' module Minitest module Queue @@ -15,6 +17,10 @@ class Runner Error = Class.new(StandardError) MissingParameter = Class.new(Error) + class << self + attr_accessor :run_start, :load_tests_duration, :total_files + end + def self.invoke(argv) new(argv).run! end @@ -48,6 +54,8 @@ def retry_command end def run_command + @run_start = Runner.run_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + require_worker_id! # if it's an automatic job retry we should process the main queue if manual_retry? @@ -97,19 +105,27 @@ def run_command if remaining <= running puts green("Queue almost empty, exiting early...") else - load_tests - populate_queue + prepare_queue_for_execution end else - load_tests - populate_queue + prepare_queue_for_execution end end - at_exit { + if queue_config.lazy_load + # In lazy-load mode, run minitest explicitly instead of relying on + # minitest/autorun's at_exit hook, which may not be registered since + # test files haven't been loaded yet. exit! prevents double-execution + # if minitest/autorun was loaded by the leader during streaming. + passed = Minitest.run [] verify_reporters!(reporters) - } - # Let minitest's at_exit hook trigger + exit!(passed ? 0 : 1) + else + at_exit { + verify_reporters!(reporters) + } + # Let minitest's at_exit hook trigger + end end def verify_reporters!(reporters) @@ -170,11 +186,9 @@ def grind_command trap('TERM') { Minitest.queue.shutdown! } trap('INT') { Minitest.queue.shutdown! } - load_tests - @queue = CI::Queue::Grind.new(grind_list, queue_config) Minitest.queue = queue - populate_queue + prepare_queue_for_execution # Let minitest's at_exit hook trigger end @@ -183,10 +197,9 @@ def bisect_command invalid_usage! "Missing the FAILING_TEST argument." unless queue_config.failing_test set_load_path - load_tests @queue = CI::Queue::Bisect.new(queue_url, queue_config) Minitest.queue = queue - populate_queue + prepare_queue_for_execution step("Testing the failing test in isolation") unless queue.failing_test_present? @@ -284,6 +297,7 @@ def report_command reporter.write_failure_file(queue_config.failure_file) if queue_config.failure_file reporter.write_flaky_tests_file(queue_config.export_flaky_tests_file) if queue_config.export_flaky_tests_file exit_code = reporter.report + print_worker_profiles(supervisor) exit! exit_code end @@ -321,7 +335,7 @@ def report_grind_command attr_reader :queue_config, :options, :command, :argv attr_writer :queue_url - attr_accessor :queue, :grind_list, :grind_count, :load_paths, :verbose + attr_accessor :queue, :grind_list, :grind_count, :load_paths, :verbose, :test_files_file def require_worker_id! if queue.distributed? @@ -357,10 +371,6 @@ def reset_counters queue.build.reset_worker_error end - def populate_queue - Minitest.queue.populate(Minitest.loaded_tests, random: ordering_seed) - end - def set_load_path if paths = load_paths paths.split(':').reverse.each do |path| @@ -369,10 +379,21 @@ def set_load_path end end - def load_tests - argv.sort.each do |f| - require File.expand_path(f) - end + def prepare_queue_for_execution + strategy = queue_population_strategy + strategy.load_and_populate! + Runner.load_tests_duration = strategy.load_tests_duration + Runner.total_files = strategy.total_files + end + + def queue_population_strategy + Minitest::Queue::QueuePopulationStrategy.new( + queue: queue, + queue_config: queue_config, + argv: argv, + test_files_file: test_files_file, + ordering_seed: ordering_seed, + ) end def parse(argv) @@ -381,6 +402,10 @@ def parse(argv) return command, argv end + def print_worker_profiles(supervisor) + Minitest::Queue::WorkerProfileReporter.new(supervisor).print_summary + end + def parser @parser ||= OptionParser.new do |opts| opts.banner = "Usage: minitest-queue [options] COMMAND [ARGS]" @@ -486,6 +511,41 @@ def parser self.load_paths = [load_paths, paths].compact.join(':') end + help = <<~EOS + Load test files on demand instead of eagerly loading all tests. + EOS + opts.separator "" + opts.on('--lazy-load', help) do + queue_config.lazy_load = true + end + + help = <<~EOS + Batch size for streaming tests to Redis in lazy mode. + Defaults to 5000. + EOS + opts.separator "" + opts.on('--lazy-load-stream-batch-size SIZE', Integer, help) do |size| + queue_config.lazy_load_stream_batch_size = size + end + + help = <<~EOS + Read test file paths from FILE (one per line) instead of positional arguments. + Useful for large test suites to avoid ARG_MAX limits. + EOS + opts.separator "" + opts.on('--test-files FILE', help) do |file| + self.test_files_file = file + end + + help = <<~EOS + Timeout in seconds without new streamed batches before failing. + Defaults to the max of --queue-init-timeout and 300 seconds. + EOS + opts.separator "" + opts.on('--lazy-load-stream-timeout SECONDS', Integer, help) do |seconds| + queue_config.lazy_load_streaming_timeout = seconds + end + help = <<~EOS Sepcify a seed used to shuffle the test suite. On Buildkite, CircleCI, Heroku CI, and Travis, the commit revision will be used by default. diff --git a/ruby/lib/minitest/queue/worker_profile_reporter.rb b/ruby/lib/minitest/queue/worker_profile_reporter.rb new file mode 100644 index 00000000..9d795220 --- /dev/null +++ b/ruby/lib/minitest/queue/worker_profile_reporter.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +module Minitest + module Queue + class WorkerProfileReporter + def initialize(supervisor, out: $stdout) + @supervisor = supervisor + @out = out + end + + def print_summary + return unless CI::Queue.debug? + + expected = @supervisor.workers_count + profiles = {} + 3.times do + profiles = @supervisor.build.worker_profiles + break if profiles.size >= expected + sleep 1 + end + return if profiles.empty? + + sorted = profiles.values.sort_by { |p| p['worker_id'].to_s } + mode = sorted.first&.dig('mode') || 'unknown' + + @out.puts + @out.puts "Worker profile summary (#{sorted.size} workers, mode: #{mode}):" + @out.puts " %-12s %-12s %8s %14s %14s %14s %14s %10s" % ['Worker', 'Role', 'Tests', '1st Test', 'Wall Clock', 'Load Tests', 'File Load', 'Memory'] + @out.puts " #{'-' * 100}" + + sorted.each do |profile| + @out.puts format_profile_row(profile) + end + + print_first_test_summary(sorted) + rescue StandardError + # Don't fail the build if profile printing fails + end + + private + + def format_profile_row(profile) + tests = profile['tests_run'] ? profile['tests_run'].to_s : 'n/a' + first_test = profile['time_to_first_test'] ? "#{profile['time_to_first_test']}s" : 'n/a' + wall = "#{profile['total_wall_clock']}s" + load_tests = profile['load_tests_duration'] ? "#{profile['load_tests_duration']}s" : 'n/a' + files = if profile['files_loaded'] && profile['total_files'] + "#{profile['file_load_time']}s (#{profile['files_loaded']}/#{profile['total_files']})" + elsif profile['file_load_time'] + "#{profile['file_load_time']}s" + else + 'n/a' + end + mem = profile['memory_rss_kb'] ? "#{(profile['memory_rss_kb'] / 1024.0).round(0)} MB" : 'n/a' + + " %-12s %-12s %8s %14s %14s %14s %14s %10s" % [ + profile['worker_id'], profile['role'], tests, first_test, wall, load_tests, files, mem + ] + end + + def print_first_test_summary(sorted) + leaders = sorted.select { |p| p['role'] == 'leader' } + non_leaders = sorted.select { |p| p['role'] == 'non-leader' } + return unless leaders.any? && non_leaders.any? + + leader_first = leaders.filter_map { |p| p['time_to_first_test'] }.min + nl_firsts = non_leaders.filter_map { |p| p['time_to_first_test'] } + return unless leader_first && nl_firsts.any? + + avg_nl = (nl_firsts.sum / nl_firsts.size).round(2) + @out.puts + @out.puts " Leader time to 1st test: #{leader_first}s" + @out.puts " Avg non-leader time to 1st test: #{avg_nl}s" + end + end + end +end diff --git a/ruby/test/ci/queue/class_resolver_test.rb b/ruby/test/ci/queue/class_resolver_test.rb new file mode 100644 index 00000000..0ecbf1e5 --- /dev/null +++ b/ruby/test/ci/queue/class_resolver_test.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true +require 'test_helper' + +class CI::Queue::ClassResolverTest < Minitest::Test + def test_resolves_fully_qualified_class + resolver_test = Module.new + inner = Module.new + foo_class = Class.new + inner.const_set(:FooTest, foo_class) + resolver_test.const_set(:Inner, inner) + Object.const_set(:ResolverTest, resolver_test) + + klass = CI::Queue::ClassResolver.resolve("ResolverTest::Inner::FooTest") + assert_equal ResolverTest::Inner::FooTest, klass + ensure + if Object.const_defined?(:ResolverTest) + ResolverTest::Inner.send(:remove_const, :FooTest) if ResolverTest::Inner.const_defined?(:FooTest) + ResolverTest.send(:remove_const, :Inner) if ResolverTest.const_defined?(:Inner) + Object.send(:remove_const, :ResolverTest) + end + end + + def test_does_not_leak_to_top_level_const + resolver_test = Module.new + resolver_test.const_set(:Inner, Module.new) + Object.const_set(:ResolverTest, resolver_test) + + Object.const_set(:FooTest, Class.new) + + assert_raises(CI::Queue::ClassNotFoundError) do + CI::Queue::ClassResolver.resolve("ResolverTest::Inner::FooTest") + end + ensure + Object.send(:remove_const, :FooTest) if Object.const_defined?(:FooTest) + if Object.const_defined?(:ResolverTest) + ResolverTest.send(:remove_const, :Inner) if ResolverTest.const_defined?(:Inner) + Object.send(:remove_const, :ResolverTest) + end + end + + def test_raises_for_module + resolver_test = Module.new + resolver_test.const_set(:OnlyModule, Module.new) + Object.const_set(:ResolverTest, resolver_test) + + assert_raises(CI::Queue::ClassNotFoundError) do + CI::Queue::ClassResolver.resolve("ResolverTest::OnlyModule") + end + ensure + if Object.const_defined?(:ResolverTest) + ResolverTest.send(:remove_const, :OnlyModule) if ResolverTest.const_defined?(:OnlyModule) + Object.send(:remove_const, :ResolverTest) + end + end + + def test_resolves_with_loader + Object.send(:remove_const, :ResolverLoaded) if Object.const_defined?(:ResolverLoaded) + + loader = Class.new do + def load_file(_path) + Object.const_set(:ResolverLoaded, Class.new) + end + end.new + + klass = CI::Queue::ClassResolver.resolve("ResolverLoaded", file_path: "./dummy", loader: loader) + assert_equal ResolverLoaded, klass + ensure + Object.send(:remove_const, :ResolverLoaded) if Object.const_defined?(:ResolverLoaded) + end +end diff --git a/ruby/test/ci/queue/configuration_test.rb b/ruby/test/ci/queue/configuration_test.rb index 18fce529..b39bc427 100644 --- a/ruby/test/ci/queue/configuration_test.rb +++ b/ruby/test/ci/queue/configuration_test.rb @@ -143,5 +143,62 @@ def test_report_timeout_set assert_equal 45, config.report_timeout end + def test_lazy_load_defaults + config = Configuration.new + refute config.lazy_load + assert_equal 5000, config.lazy_load_stream_batch_size + end + + def test_lazy_load_from_env + config = Configuration.from_env( + "CI_QUEUE_LAZY_LOAD" => "true", + "CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE" => "1500", + ) + assert config.lazy_load + assert_equal 1500, config.lazy_load_stream_batch_size + end + + def test_lazy_load_from_env_false + config = Configuration.from_env( + "CI_QUEUE_LAZY_LOAD" => "false", + ) + refute config.lazy_load + end + + def test_lazy_load_streaming_timeout_defaults + config = Configuration.new + assert_equal 300, config.lazy_load_streaming_timeout + end + + def test_lazy_load_streaming_timeout_defaults_to_queue_init_timeout_when_higher + config = Configuration.new(queue_init_timeout: 900) + assert_equal 900, config.lazy_load_streaming_timeout + end + + def test_lazy_load_streaming_timeout_from_env + config = Configuration.from_env( + "CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT" => "120", + ) + assert_equal 120, config.lazy_load_streaming_timeout + end + + def test_legacy_lazy_load_env_aliases + config = Configuration.from_env( + "CI_QUEUE_STREAM_BATCH_SIZE" => "1600", + "CI_QUEUE_STREAM_TIMEOUT" => "90", + "CI_QUEUE_TEST_HELPERS" => "test/test_helper.rb, test/support/helper.rb", + ) + assert_equal 1600, config.lazy_load_stream_batch_size + assert_equal 90, config.lazy_load_streaming_timeout + assert_equal ["test/test_helper.rb", "test/support/helper.rb"], config.lazy_load_test_helper_paths + end + + def test_new_lazy_load_test_helpers_env + config = Configuration.from_env( + "CI_QUEUE_LAZY_LOAD_TEST_HELPERS" => "test/test_helper.rb, test/support/helper.rb", + ) + assert_equal ["test/test_helper.rb", "test/support/helper.rb"], config.lazy_load_test_helper_paths + end + end end diff --git a/ruby/test/ci/queue/file_loader_test.rb b/ruby/test/ci/queue/file_loader_test.rb new file mode 100644 index 00000000..72bf5561 --- /dev/null +++ b/ruby/test/ci/queue/file_loader_test.rb @@ -0,0 +1,92 @@ +# frozen_string_literal: true +require 'test_helper' + +class CI::Queue::FileLoaderTest < Minitest::Test + def test_load_file_records_stats + loader = CI::Queue::FileLoader.new + + Dir.mktmpdir do |dir| + path = File.join(dir, "sample_test.rb") + File.write(path, "module FileLoaderSample; class Sample; end; end\n") + + loader.load_file(path) + + assert Object.const_defined?(:FileLoaderSample) + assert FileLoaderSample.const_defined?(:Sample) + assert_includes loader.load_stats.keys, path + assert loader.load_stats[path] >= 0 + ensure + if Object.const_defined?(:FileLoaderSample) + FileLoaderSample.send(:remove_const, :Sample) if FileLoaderSample.const_defined?(:Sample) + Object.send(:remove_const, :FileLoaderSample) + end + end + end + + def test_load_file_raises_file_load_error + loader = CI::Queue::FileLoader.new + missing_path = File.join(Dir.tmpdir, "missing_test_#{Process.pid}.rb") + + error = assert_raises(CI::Queue::FileLoadError) do + loader.load_file(missing_path) + end + + assert_equal missing_path, error.file_path + assert_includes loader.load_stats.keys, missing_path + end + + # Verifies that non-StandardError exceptions (e.g., StrictWarning::Offense + # which inherits from Exception) are caught and wrapped as FileLoadError + # instead of crashing the worker process. + def test_load_file_catches_non_standard_error_exceptions + loader = CI::Queue::FileLoader.new + + Dir.mktmpdir do |dir| + path = File.join(dir, "raises_exception_test.rb") + File.write(path, "raise Class.new(Exception), 'non-standard exception'\n") + + error = assert_raises(CI::Queue::FileLoadError) do + loader.load_file(path) + end + + assert_equal path, error.file_path + assert_includes error.message, "non-standard exception" + assert_includes loader.load_stats.keys, path + end + end + + def test_load_file_does_not_catch_signal_exceptions + loader = CI::Queue::FileLoader.new + + Dir.mktmpdir do |dir| + path = File.join(dir, "raises_signal_test.rb") + File.write(path, "raise SignalException, 'TERM'\n") + + assert_raises(SignalException) do + loader.load_file(path) + end + end + end + + def test_load_file_forces_load_when_feature_is_inherited_after_fork + loader = CI::Queue::FileLoader.new + + Dir.mktmpdir do |dir| + class_name = "ForkLoaded#{Process.pid}#{rand(1000)}" + path = File.join(dir, "fork_loaded_test.rb") + File.write(path, "class #{class_name}; end\n") + + expanded = File.expand_path(path) + $LOADED_FEATURES << expanded unless $LOADED_FEATURES.include?(expanded) + loader.instance_variable_set(:@pid, Process.pid - 1) + + loader.load_file(path) + + assert Object.const_defined?(class_name), "class should be defined by forced load fallback" + assert_includes loader.load_stats.keys, path + ensure + Object.send(:remove_const, class_name) if Object.const_defined?(class_name) + $LOADED_FEATURES.delete(expanded) + end + end +end diff --git a/ruby/test/ci/queue/queue_entry_test.rb b/ruby/test/ci/queue/queue_entry_test.rb new file mode 100644 index 00000000..86a96aa5 --- /dev/null +++ b/ruby/test/ci/queue/queue_entry_test.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true +require 'test_helper' + +class CI::Queue::QueueEntryTest < Minitest::Test + DELIMITER = CI::Queue::QueueEntry::DELIMITER + + def test_parse_without_file_path + entry = "FooTest#test_bar" + parsed = CI::Queue::QueueEntry.parse(entry) + assert_equal "FooTest#test_bar", parsed[:test_id] + assert_nil parsed[:file_path] + end + + def test_parse_with_file_path + entry = "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb" + parsed = CI::Queue::QueueEntry.parse(entry) + assert_equal "FooTest#test_bar", parsed[:test_id] + assert_equal "/tmp/foo_test.rb", parsed[:file_path] + end + + def test_format_without_file_path + assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.format("FooTest#test_bar", nil) + assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.format("FooTest#test_bar", "") + end + + def test_format_with_file_path + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb") + assert_equal "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb", entry + end + + def test_parse_with_pipe_in_test_name + test_id = "FooTest#test_status=[published_|_visible]_tag:elasticsearch:true" + entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb") + parsed = CI::Queue::QueueEntry.parse(entry) + assert_equal test_id, parsed[:test_id] + assert_equal "/tmp/foo_test.rb", parsed[:file_path] + end + + def test_round_trip_preserves_test_id + test_id = "FooTest#test_bar" + file_path = "/tmp/foo_test.rb" + entry = CI::Queue::QueueEntry.format(test_id, file_path) + parsed = CI::Queue::QueueEntry.parse(entry) + assert_equal test_id, parsed[:test_id] + assert_equal file_path, parsed[:file_path] + end + + def test_encode_decode_load_error + error = StandardError.new("boom") + error.set_backtrace(["/tmp/test.rb:10"]) + encoded = CI::Queue::QueueEntry.encode_load_error("/tmp/test.rb", error) + assert CI::Queue::QueueEntry.load_error_payload?(encoded) + + payload = CI::Queue::QueueEntry.decode_load_error(encoded) + assert_equal "/tmp/test.rb", payload['file_path'] + assert_equal "StandardError", payload['error_class'] + assert_equal "boom", payload['error_message'] + assert_equal ["/tmp/test.rb:10"], payload['backtrace'] + end +end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index fed9d3bd..55cd7d02 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -4,6 +4,9 @@ class CI::Queue::RedisTest < Minitest::Test include SharedQueueAssertions + DELIMITER = CI::Queue::QueueEntry::DELIMITER + EntryTest = Struct.new(:id, :queue_entry) + def setup @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') @redis = ::Redis.new(url: @redis_url) @@ -234,6 +237,105 @@ def test_timeout_warning end end + def test_streaming_waits_for_batches + leader = worker(1, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: 'streaming') + consumer = worker(2, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: 'streaming') + consumer.entry_resolver = ->(entry) { entry } + + tests = [ + EntryTest.new('ATest#test_foo', "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"), + EntryTest.new('ATest#test_bar', "ATest#test_bar#{DELIMITER}/tmp/a_test.rb"), + ] + + streamed = Enumerator.new do |yielder| + sleep 0.2 + tests.each { |test| yielder << test } + end + + leader_thread = Thread.new do + leader.stream_populate(streamed, random: Random.new(0), batch_size: 1) + end + + timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1 + loop do + status = @redis.get(leader.send(:key, 'master-status')) + break if status == 'streaming' || status == 'ready' + raise "streaming status not set" if Process.clock_gettime(Process::CLOCK_MONOTONIC) > timeout_at + sleep 0.01 + end + + consumed = [] + consumer_thread = Thread.new do + consumer.poll do |entry| + consumed << entry + consumer.acknowledge(entry) + end + end + + sleep 0.05 + + leader_thread.join + consumer_thread.join(2) + + assert_equal tests.map(&:queue_entry).sort, consumed.sort + assert_predicate consumer, :exhausted? + end + + def test_reserve_lost_ignores_processed_entry_with_path + queue = worker(1, populate: false) + entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb" + test_id = 'ATest#test_foo' + + @redis.zadd(queue.send(:key, 'running'), 0, entry) + @redis.sadd(queue.send(:key, 'completed'), test_id) + @redis.hset(queue.send(:key, 'owners'), entry, queue.send(:key, 'worker', queue.config.worker_id, 'queue')) + + lost = queue.send(:try_to_reserve_lost_test) + assert_nil lost + end + + def test_streaming_timeout_raises_lost_master + queue = worker(1, populate: false, lazy_load_streaming_timeout: 1, queue_init_timeout: 1) + @redis.set(queue.send(:key, 'master-status'), 'streaming') + @redis.set(queue.send(:key, 'streaming-updated-at'), CI::Queue.time_now.to_f - 5) + + assert_raises(CI::Queue::Redis::LostMaster) do + queue.poll { |_entry| } + end + end + + def test_heartbeat_uses_test_id_for_processed_check + queue = worker(1, populate: false) + entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb" + test_id = 'ATest#test_foo' + + @redis.sadd(queue.send(:key, 'processed'), test_id) + + result = queue.send( + :eval_script, + :heartbeat, + keys: [ + queue.send(:key, 'running'), + queue.send(:key, 'processed'), + queue.send(:key, 'owners'), + queue.send(:key, 'worker', queue.config.worker_id, 'queue'), + ], + argv: [CI::Queue.time_now.to_f, entry, DELIMITER], + ) + + assert_nil result + end + + def test_resolve_entry_falls_back_to_resolver + queue = worker(1, populate: false) + queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok }) + queue.entry_resolver = ->(entry) { "resolved:#{entry}" } + + resolved = queue.send(:resolve_entry, "MissingTest#test_bar#{DELIMITER}/tmp/missing.rb") + + assert_equal "resolved:MissingTest#test_bar#{DELIMITER}/tmp/missing.rb", resolved + end + def test_continuously_timing_out_tests 3.times do @redis.flushdb @@ -270,6 +372,62 @@ def test_initialise_from_rediss_uri assert_instance_of CI::Queue::Redis::Worker, queue end + def test_first_reserve_at_is_set_on_first_reserve + queue = worker(1) + assert_nil queue.first_reserve_at + + queue.poll do |_test| + assert queue.first_reserve_at, "first_reserve_at should be set after first reserve" + break + end + end + + def test_first_reserve_at_does_not_change_on_subsequent_reserves + queue = worker(1) + first_value = nil + count = 0 + + queue.poll do |_test| + first_value ||= queue.first_reserve_at + assert_equal first_value, queue.first_reserve_at + count += 1 + break if count >= 3 + end + + assert_operator count, :>=, 2, "Should have reserved multiple tests" + end + + def test_record_and_read_worker_profiles + queue = worker(1) + profile = { + 'worker_id' => '1', + 'mode' => 'lazy', + 'role' => 'leader', + 'total_wall_clock' => 12.34, + 'time_to_first_test' => 1.23, + 'memory_rss_kb' => 512_000, + } + + queue.build.record_worker_profile(profile) + + profiles = queue.build.worker_profiles + assert_equal 1, profiles.size + assert_equal profile, profiles['1'] + end + + def test_worker_profiles_aggregates_multiple_workers + q1 = worker(1) + q2 = worker(2) + + q1.build.record_worker_profile({ 'worker_id' => '1', 'role' => 'leader' }) + q2.build.record_worker_profile({ 'worker_id' => '2', 'role' => 'non-leader' }) + + profiles = q1.build.worker_profiles + assert_equal 2, profiles.size + assert_equal 'leader', profiles['1']['role'] + assert_equal 'non-leader', profiles['2']['role'] + end + private def shuffled_test_list diff --git a/ruby/test/fixtures/log/bisect_test_details.log b/ruby/test/fixtures/log/bisect_test_details.log new file mode 100644 index 00000000..e69de29b diff --git a/ruby/test/fixtures/test/dynamic_test.rb b/ruby/test/fixtures/test/dynamic_test.rb new file mode 100644 index 00000000..fe759ace --- /dev/null +++ b/ruby/test/fixtures/test/dynamic_test.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true +require 'test_helper' + +# Simulates frameworks that dynamically generate test methods in runnable_methods +# (e.g., Shopify's Verdict FLAGS). The generated methods only exist after +# runnable_methods is called, NOT at class definition time. +class DynamicTest < Minitest::Test + VARIANTS = %w[alpha beta gamma].freeze + + # Static method - always exists + def test_static + assert true + end + + singleton_class.prepend(Module.new do + def runnable_methods + # Generate variant methods on first call (idempotent) + VARIANTS.each do |variant| + method_name = "test_dynamic_VARIANT:#{variant}" + unless instance_methods(false).include?(method_name.to_sym) + define_method(method_name) do + assert true + end + end + end + super + end + end) +end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index dbb74d9f..3f05d045 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -94,6 +94,204 @@ def test_lost_test_with_heartbeat_monitor end end + def test_lazy_loading_streaming + out, err = capture_subprocess_io do + threads = 2.times.map do |i| + Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', 'lazy-stream', + '--worker', i.to_s, + '--timeout', '1', + '--lazy-load', + '--lazy-load-stream-batch-size', '1', + '--lazy-load-stream-timeout', '5', + '-Itest', + 'test/passing_test.rb', + chdir: 'test/fixtures/', + ) + end + end + threads.each(&:join) + end + + assert_empty err + + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', 'lazy-stream', + '--timeout', '1', + chdir: 'test/fixtures/', + ) + end + + assert_empty err + result = normalize(out.lines[1].strip) + assert_equal 'Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)', result + end + + # Reproduces the "No leader was elected" bug in lazy-load mode. + # When using --test-files (no positional args), non-leader workers must still + # enter queue.poll. Without the fix, non-leaders exit immediately with status 0 + # because minitest's at_exit hook never fires. + # + # We verify the fix by checking that BOTH workers processed tests (via their + # worker queue keys in Redis), not just the leader. + def test_lazy_loading_with_test_files_option + build_id = 'lazy-test-files' + test_files = File.expand_path('../../fixtures/test/passing_test.rb', __FILE__) + Tempfile.open('test_files_list') do |f| + f.write(test_files) + f.flush + + out, err = capture_subprocess_io do + threads = 2.times.map do |i| + Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', build_id, + '--worker', i.to_s, + '--timeout', '5', + '--queue-init-timeout', '10', + '--lazy-load', + '--test-files', f.path, + '--lazy-load-stream-batch-size', '1', + '--lazy-load-stream-timeout', '10', + '-Itest', + chdir: 'test/fixtures/', + ) + end + end + threads.each(&:join) + end + + assert_empty err + + # Verify the non-leader actually entered queue.poll and processed tests. + # The leader may process 0 tests if the non-leader is fast enough to drain + # the queue before the leader finishes streaming. + worker_0_count = @redis.llen(CI::Queue::Redis::KeyShortener.key(build_id, 'worker', '0', 'queue')) + worker_1_count = @redis.llen(CI::Queue::Redis::KeyShortener.key(build_id, 'worker', '1', 'queue')) + + assert_operator worker_0_count + worker_1_count, :>=, 100, "All tests should have been processed" + assert_operator [worker_0_count, worker_1_count].max, :>, 0, "At least one worker should have processed tests (non-leader likely exited without running minitest if both are 0)" + + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', build_id, + '--timeout', '5', + chdir: 'test/fixtures/', + ) + end + + assert_empty err + result = normalize(out.lines[1].strip) + assert_equal 'Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)', result + end + end + + # Verifies that dynamically generated test methods (defined in runnable_methods, + # not at class load time) work correctly in lazy-load mode. This catches issues + # like Shopify's Verdict FLAGS methods not being found on non-leader workers. + def test_lazy_loading_dynamic_test_methods + build_id = 'lazy-dynamic' + test_files = File.expand_path('../../fixtures/test/dynamic_test.rb', __FILE__) + Tempfile.open('test_files_list') do |f| + f.write(test_files) + f.flush + + out, err = capture_subprocess_io do + threads = 2.times.map do |i| + Thread.start do + system( + { 'BUILDKITE' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', build_id, + '--worker', i.to_s, + '--timeout', '5', + '--queue-init-timeout', '10', + '--lazy-load', + '--test-files', f.path, + '--lazy-load-stream-batch-size', '1', + '--lazy-load-stream-timeout', '10', + '-Itest', + chdir: 'test/fixtures/', + ) + end + end + threads.each(&:join) + end + + assert_empty err + + out, err = capture_subprocess_io do + system( + @exe, 'report', + '--queue', @redis_url, + '--build', build_id, + '--timeout', '5', + chdir: 'test/fixtures/', + ) + end + + assert_empty err + # 1 static + 3 dynamic variants = 4 tests + result = normalize(out.lines[1].strip) + assert_equal 'Ran 4 tests, 4 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)', result + end + end + + def test_worker_profile_in_report + build_id = 'profile-report' + out, err = capture_subprocess_io do + system( + { 'BUILDKITE' => '1', 'CI_QUEUE_DEBUG' => '1' }, + @exe, 'run', + '--queue', @redis_url, + '--seed', 'foobar', + '--build', build_id, + '--worker', '0', + '--timeout', '5', + '--lazy-load', + '--lazy-load-stream-batch-size', '10', + '--lazy-load-stream-timeout', '5', + '-Itest', + 'test/passing_test.rb', + chdir: 'test/fixtures/', + ) + end + + assert_empty err + + out, err = capture_subprocess_io do + system( + { 'CI_QUEUE_DEBUG' => '1' }, + @exe, 'report', + '--queue', @redis_url, + '--build', build_id, + '--timeout', '5', + chdir: 'test/fixtures/', + ) + end + + assert_empty err + assert_includes out, 'Worker profile summary' + assert_includes out, 'leader' + assert_includes out, 'Wall Clock' + end + def test_verbose_reporter out, err = capture_subprocess_io do system( diff --git a/ruby/test/minitest/queue/lazy_entry_resolver_test.rb b/ruby/test/minitest/queue/lazy_entry_resolver_test.rb new file mode 100644 index 00000000..20483376 --- /dev/null +++ b/ruby/test/minitest/queue/lazy_entry_resolver_test.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true +require 'test_helper' +require 'minitest/queue/lazy_entry_resolver' + +module Minitest::Queue + class LazyEntryResolverTest < Minitest::Test + def test_builds_lazy_single_example_for_regular_entry + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb") + + resolved = LazyEntryResolver.new(loader: loader, resolver: resolver).call(entry) + + assert_instance_of Minitest::Queue::LazySingleExample, resolved + assert_equal "FooTest#test_bar", resolved.id + assert_equal entry, resolved.queue_entry + end + + def test_builds_lazy_single_example_with_load_error + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + error = StandardError.new("boom") + encoded = CI::Queue::QueueEntry.encode_load_error("/tmp/foo_test.rb", error) + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", encoded) + + resolved = LazyEntryResolver.new(loader: loader, resolver: resolver).call(entry) + result = resolved.run + + assert_instance_of Minitest::Queue::LazySingleExample, resolved + assert result.error? + assert_instance_of Minitest::UnexpectedError, result.failure + end + end +end diff --git a/ruby/test/minitest/queue/lazy_single_example_test.rb b/ruby/test/minitest/queue/lazy_single_example_test.rb new file mode 100644 index 00000000..ad45cf2d --- /dev/null +++ b/ruby/test/minitest/queue/lazy_single_example_test.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true +require 'test_helper' + +module Minitest::Queue + class LazySingleExampleTest < Minitest::Test + def test_run_executes_test + Dir.mktmpdir do |dir| + class_name = "LazyExample#{Process.pid}#{rand(1000)}" + file_path = File.join(dir, "lazy_example_test.rb") + File.write( + file_path, + "class #{class_name} < Minitest::Test\n" \ + " def test_works\n" \ + " assert true\n" \ + " end\n" \ + "end\n" + ) + + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + example = LazySingleExample.new(class_name, 'test_works', file_path, loader: loader, resolver: resolver) + + result = example.run + + assert result.passed? + assert_equal [file_path, 2], example.source_location + ensure + Object.send(:remove_const, class_name) if Object.const_defined?(class_name) + end + end + + def test_run_returns_error_for_load_error + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + error = StandardError.new('boom') + example = LazySingleExample.new('MissingClass', 'test_missing', '/tmp/missing.rb', loader: loader, resolver: resolver, load_error: error) + + result = example.run + + assert result.error? + assert_instance_of Minitest::UnexpectedError, result.failure + assert_nil example.source_location + end + + def test_run_handles_script_error + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + example = LazySingleExample.new('MissingClass', 'test_missing', '/tmp/missing.rb', loader: loader, resolver: resolver) + + example.stub(:runnable, -> { raise LoadError, 'boom' }) do + result = example.run + assert result.error? + assert_instance_of Minitest::UnexpectedError, result.failure + end + end + + def test_marshal_round_trip + Dir.mktmpdir do |dir| + class_name = "LazyMarshal#{Process.pid}#{rand(1000)}" + file_path = File.join(dir, "lazy_marshal_test.rb") + File.write( + file_path, + "class #{class_name} < Minitest::Test\n" \ + " def test_works\n" \ + " assert true\n" \ + " end\n" \ + "end\n" + ) + + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + example = LazySingleExample.new(class_name, 'test_works', file_path, loader: loader, resolver: resolver) + dumped = Marshal.dump(example) + loaded = Marshal.load(dumped) + + assert_equal example.queue_entry, loaded.queue_entry + assert loaded.run.passed? + ensure + Object.send(:remove_const, class_name) if Object.const_defined?(class_name) + end + end + end +end diff --git a/ruby/test/minitest/queue/lazy_test_discovery_test.rb b/ruby/test/minitest/queue/lazy_test_discovery_test.rb new file mode 100644 index 00000000..0299d9cf --- /dev/null +++ b/ruby/test/minitest/queue/lazy_test_discovery_test.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true +require 'test_helper' +require 'minitest/queue/lazy_test_discovery' + +module Minitest::Queue + class LazyTestDiscoveryTest < Minitest::Test + def test_discovers_methods_added_by_reopened_class + loader = CI::Queue::FileLoader.new + resolver = CI::Queue::ClassResolver + discovery = LazyTestDiscovery.new(loader: loader, resolver: resolver) + class_name = "DiscoveryLazy#{Process.pid}#{rand(1000)}" + discovered = [] + + Dir.mktmpdir do |dir| + first_file = File.join(dir, "first_test.rb") + second_file = File.join(dir, "second_test.rb") + File.write(first_file, "class #{class_name} < Minitest::Test\n def test_one\n assert true\n end\nend\n") + File.write(second_file, "class #{class_name}\n def test_two\n assert true\n end\nend\n") + + discovery.each_test([first_file, second_file]) do |test| + discovered << test.id + end + end + + assert_includes discovered, "#{class_name}#test_one" + assert_includes discovered, "#{class_name}#test_two" + ensure + Object.send(:remove_const, class_name) if Object.const_defined?(class_name) + end + end +end diff --git a/ruby/test/minitest/queue/queue_population_strategy_test.rb b/ruby/test/minitest/queue/queue_population_strategy_test.rb new file mode 100644 index 00000000..16dbf1e5 --- /dev/null +++ b/ruby/test/minitest/queue/queue_population_strategy_test.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true +require 'test_helper' +require 'minitest/queue/queue_population_strategy' + +module Minitest::Queue + class QueuePopulationStrategyTest < Minitest::Test + class FakeQueue + attr_accessor :entry_resolver + attr_reader :populated_with, :streamed_with + + def initialize + @file_loader = CI::Queue::FileLoader.new + end + + def file_loader + @file_loader + end + + def populate(tests, random:) + @populated_with = { tests: tests, random: random } + end + + def stream_populate(tests, random:, batch_size:) + @streamed_with = { tests: tests, random: random, batch_size: batch_size } + end + end + + def test_eager_mode_populates_loaded_tests + queue = FakeQueue.new + config = CI::Queue::Configuration.new(lazy_load: false) + class_name = "StrategyEager#{Process.pid}#{rand(1000)}" + file = nil + strategy = QueuePopulationStrategy.new( + queue: queue, + queue_config: config, + argv: nil, + test_files_file: nil, + ordering_seed: Random.new(123), + ) + + Dir.mktmpdir do |dir| + file = File.join(dir, "strategy_eager_test.rb") + File.write(file, "class #{class_name} < Minitest::Test\n def test_strategy_eager\n assert true\n end\nend\n") + strategy = QueuePopulationStrategy.new( + queue: queue, + queue_config: config, + argv: [file], + test_files_file: nil, + ordering_seed: Random.new(123), + ) + strategy.load_and_populate! + end + + assert queue.populated_with + ids = queue.populated_with[:tests].map(&:id) + assert_includes ids, "#{class_name}#test_strategy_eager" + assert_nil queue.streamed_with + ensure + Object.send(:remove_const, class_name) if class_name && Object.const_defined?(class_name) + end + + def test_lazy_mode_sets_resolver_and_streams + queue = FakeQueue.new + config = CI::Queue::Configuration.new(lazy_load: true, lazy_load_stream_batch_size: 7) + strategy = QueuePopulationStrategy.new( + queue: queue, + queue_config: config, + argv: [], + test_files_file: nil, + ordering_seed: Random.new(456), + ) + + strategy.load_and_populate! + + assert_instance_of Minitest::Queue::LazyEntryResolver, queue.entry_resolver + assert queue.streamed_with + assert_instance_of Enumerator, queue.streamed_with[:tests] + assert_equal 7, queue.streamed_with[:batch_size] + assert_nil queue.populated_with + end + end +end diff --git a/ruby/test/minitest/queue/worker_profile_reporter_test.rb b/ruby/test/minitest/queue/worker_profile_reporter_test.rb new file mode 100644 index 00000000..0d6a9d47 --- /dev/null +++ b/ruby/test/minitest/queue/worker_profile_reporter_test.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true +require 'test_helper' +require 'stringio' +require 'minitest/queue/worker_profile_reporter' + +module Minitest::Queue + class WorkerProfileReporterTest < Minitest::Test + def test_print_summary_is_silent_without_debug_env + profiles = { + '0' => { 'worker_id' => '0', 'mode' => 'lazy', 'role' => 'leader' }, + } + supervisor = Struct.new(:workers_count, :build).new( + 1, + Struct.new(:worker_profiles).new(profiles), + ) + out = StringIO.new + original = ENV['CI_QUEUE_DEBUG'] + ENV.delete('CI_QUEUE_DEBUG') + + WorkerProfileReporter.new(supervisor, out: out).print_summary + + assert_equal "", out.string + ensure + ENV['CI_QUEUE_DEBUG'] = original + end + + def test_print_summary_outputs_table + profiles = { + '0' => { + 'worker_id' => '0', + 'mode' => 'lazy', + 'role' => 'leader', + 'tests_run' => 10, + 'time_to_first_test' => 1.2, + 'total_wall_clock' => 12.3, + 'load_tests_duration' => 0.4, + 'file_load_time' => 2.0, + 'files_loaded' => 3, + 'total_files' => 10, + 'memory_rss_kb' => 512_000, + }, + '1' => { + 'worker_id' => '1', + 'mode' => 'lazy', + 'role' => 'non-leader', + 'tests_run' => 9, + 'time_to_first_test' => 2.2, + 'total_wall_clock' => 11.1, + }, + } + + supervisor = Struct.new(:workers_count, :build).new( + 2, + Struct.new(:worker_profiles).new(profiles), + ) + out = StringIO.new + + original = ENV['CI_QUEUE_DEBUG'] + ENV['CI_QUEUE_DEBUG'] = '1' + WorkerProfileReporter.new(supervisor, out: out).print_summary + + text = out.string + assert_includes text, "Worker profile summary (2 workers, mode: lazy):" + assert_includes text, "Leader time to 1st test: 1.2s" + assert_includes text, "Avg non-leader time to 1st test: 2.2s" + ensure + ENV['CI_QUEUE_DEBUG'] = original + end + end +end