From 7c284954a454b9f0f613ec69cdf92eeeadc53494 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Mon, 13 Jan 2025 22:17:56 +0200 Subject: [PATCH] {get,set}_pool_size, to allow growing/shrinking pools at runtime This changeset introduces three new functions: * `poolboy:set_pool_size/2,3` and `poolboy:get_pool_size/1`, which can be used to resize a running poolboy process dynamically; * `poolboy:status_ext/1`, which in addition to the items returned by `status/1`, also reports the values of `size` and `latched_size`. The new size is assigned to a new state record field, `latched_size`. If it is greater than the current size, new checkouts will succeed even in a `full` state, until the actual size, incremented per checkout, grows and matches the latched_size. If `latched_size` is smaller than current size, no immediate action is taken on those currently checked-out workers which are now in excess of the latched size. Instead, as they are checked in, they are dismissed until the actual size decreases to match latched size. The `set_pool_size/3` variant also allows to set the value of max_overflow. One limitation is that after setting a new size, the state of the poolboy process (as returned by `poolboy:status` or `status_ext`), may not reflect the actual state of the pooloy fsm wrt its ability to accept new checkouts. This condition is temporary and the status will resume normal reporting once `size` converges with `latched_size`. This changeset is needed for tiot/openriak-3.4/tictacaae-and-nextgenrepl-cli-commands branches in riak_kv and riak_core. pool dynamic resize for overflow = 0 pool dynamic resize for overflow > 0, too wip towards more comprehensive tests wip reasonably good wip spurious wip need to tweak asserts in test wip finalizing more extensive test, with fixes resulting therefrom address possible case of pool stop returning too early before pool start --- src/poolboy.erl | 207 +++++++++++++++++++++++++++++++++-------- test/poolboy_tests.erl | 104 +++++++++++++++++++++ 2 files changed, 270 insertions(+), 41 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 532362c..756ec7b 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -12,8 +12,9 @@ {gen_fsm, sync_send_all_state_event, 2}]}). -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, + get_pool_size/1, set_pool_size/2, set_pool_size/3, child_spec/2, child_spec/3, start/1, start/2, start_link/1, - start_link/2, stop/1, status/1]). + start_link/2, stop/1, status/1, status_ext/1]). -export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). @@ -39,6 +40,7 @@ waiting :: poolboy_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), + latched_size = 5 :: non_neg_integer(), %% as resized; size to converge eventually with that overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer() }). @@ -70,6 +72,17 @@ transaction(Pool, Fun) -> ok = poolboy:checkin(Pool, Worker) end. +-spec get_pool_size(pid()) -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}. +get_pool_size(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, get_pool_size). + +-spec set_pool_size(pid(), non_neg_integer()) -> ok. +set_pool_size(Pid, NewSize) -> + gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize}). +-spec set_pool_size(pid(), non_neg_integer(), non_neg_integer()) -> ok. +set_pool_size(Pid, NewSize, NewMaxOverflow) -> + gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize, NewMaxOverflow}). + -spec child_spec(Pool :: node(), PoolArgs :: proplists:proplist()) -> supervisor:child_spec(). child_spec(Pool, PoolArgs) -> @@ -113,6 +126,10 @@ stop(Pool) -> -spec status(Pool :: node()) -> {atom(), integer(), integer(), integer()}. status(Pool) -> gen_fsm:sync_send_all_state_event(Pool, status). +-spec status_ext(Pool :: node()) -> {atom(), integer(), integer(), integer(), + non_neg_integer(), non_neg_integer()}. +status_ext(Pool) -> + gen_fsm:sync_send_all_state_event(Pool, status_ext). init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), @@ -124,7 +141,7 @@ init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), init(Rest, WorkerArgs, State#state{supervisor=Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size=Size}); + init(Rest, WorkerArgs, State#state{size=Size, latched_size=Size}); init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> init(Rest, WorkerArgs, State#state{max_overflow=MaxOverflow}); init([_ | Rest], WorkerArgs, State) -> @@ -139,13 +156,30 @@ init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow {ok, StartState, State#state{workers=Workers}}. ready({checkin, Pid}, State) -> - Monitors = State#state.monitors, + #state{size = Size, + overflow = Overflow, + max_overflow = MaxOverflow, + latched_size = LatchedSize, + supervisor = Sup, + monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers}}; + {NewSize, NewOverflow, Workers} = + case Size + Overflow > LatchedSize + MaxOverflow of %% when we shrunk + true when Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {Size - 1, Overflow, State#state.workers}; + true -> + ok = dismiss_worker(Sup, Pid), + {Size, Overflow - 1, State#state.workers}; + false -> + {Size, Overflow, queue:in(Pid, State#state.workers)} + end, + {next_state, ready, State#state{workers = Workers, + size = NewSize, + overflow = NewOverflow}}; [] -> {next_state, ready, State} end; @@ -154,57 +188,86 @@ ready(_Event, State) -> ready({checkout, Block, Timeout}, {FromPid, _}=From, State) -> #state{supervisor = Sup, + size = Size, + latched_size = LatchedSize, workers = Workers, monitors = Monitors, max_overflow = MaxOverflow} = State, - case queue:out(Workers) of - {{value, Pid}, Left} -> - Ref = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, Ref}), - NextState = case queue:is_empty(Left) of - true when MaxOverflow < 1 -> full; - true -> overflow; - false -> ready - end, - {reply, Pid, NextState, State#state{workers=Left}}; - {empty, Empty} when MaxOverflow > 0 -> + if Size < LatchedSize -> %% we grew + %% we are here after a set_pool_size with Size > OldSize {Pid, Ref} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, Ref}), - {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; - {empty, Empty} when Block =:= false -> - {reply, full, full, State#state{workers=Empty}}; - {empty, Empty} -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{workers=Empty, waiting=Waiting}} + {reply, Pid, ready, State#state{size = Size + 1}}; + el/=se -> + case queue:out(Workers) of + {{value, Pid}, Left} -> + Ref = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + NextState = case queue:is_empty(Left) of + true when MaxOverflow < 1 -> full; + true -> overflow; + false -> ready + end, + {reply, Pid, NextState, State#state{workers=Left}}; + {empty, Empty} when MaxOverflow > 0 -> + {Pid, Ref} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; + {empty, Empty} when Block =:= false -> + {reply, full, full, State#state{workers=Empty}}; + {empty, Empty} -> + Waiting = add_waiting(From, Timeout, State#state.waiting), + {next_state, full, State#state{workers=Empty, waiting=Waiting}} + end end; ready(_Event, _From, State) -> {reply, ok, ready, State}. overflow({checkin, Pid}, #state{overflow=0}=State) -> - Monitors = State#state.monitors, + #state{monitors = Monitors, + size = Size, + supervisor = Sup, + latched_size = LatchedSize} = State, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - NextState = case State#state.size > 0 of + NextState = case Size > 0 of true -> ready; false -> overflow end, - Workers = queue:in(Pid, State#state.workers), - {next_state, NextState, State#state{overflow=0, workers=Workers}}; + Workers = + case Size > LatchedSize of + true -> + ok = dismiss_worker(Sup, Pid), + State#state.workers; + false -> + queue:in(Pid, State#state.workers) + end, + {next_state, NextState, State#state{workers=Workers}}; [] -> {next_state, overflow, State} end; overflow({checkin, Pid}, State) -> - #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State, + #state{supervisor = Sup, + monitors = Monitors, + overflow = Overflow, + size = Size, + latched_size = LatchedSize} = State, + {NextState, NewOverflow} = + if Size > LatchedSize -> + {full, Overflow}; + el/=se -> + {overflow, Overflow - 1} + end, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> ok = dismiss_worker(Sup, Pid), true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - {next_state, overflow, State#state{overflow=Overflow-1}}; + {next_state, NextState, State#state{overflow = NewOverflow}}; [] -> - {next_state, overflow, State} + {next_state, NextState, State#state{overflow = NewOverflow}} end; overflow(_Event, State) -> {next_state, overflow, State}. @@ -247,11 +310,30 @@ full({checkin, Pid}, State) -> full(_Event, State) -> {next_state, full, State}. -full({checkout, true, Timeout}, From, State) -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{waiting=Waiting}}; -full({checkout, false, _Timeout}, _From, State) -> - {reply, full, full, State}; +full({checkout, Block, Timeout}, {FromPid, _} = From, State) -> + #state{size = Size, + latched_size = LatchedSize, + max_overflow = MaxOverflow, + overflow = Overflow, + monitors = Monitors, + supervisor = Sup} = State, + if Size + Overflow < LatchedSize + MaxOverflow -> + {Pid, Ref} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + {NextState, NewOverflow, NewSize} = + if Size < LatchedSize -> + {full, Overflow, Size + 1}; + el/=se -> + {overflow, Overflow + 1, Size} + end, + {reply, Pid, NextState, State#state{size = NewSize, + overflow = NewOverflow}}; + Block == true -> + Waiting = add_waiting(From, Timeout, State#state.waiting), + {next_state, full, State#state{waiting=Waiting}}; + el/=se -> + {reply, full, full, State} + end; full(_Event, _From, State) -> {reply, ok, full, State}. @@ -262,6 +344,10 @@ handle_sync_event(status, _From, StateName, State) -> {reply, {StateName, queue:len(State#state.workers), State#state.overflow, ets:info(State#state.monitors, size)}, StateName, State}; +handle_sync_event(status_ext, _From, StateName, State) -> + {reply, {StateName, queue:len(State#state.workers), State#state.overflow, + ets:info(State#state.monitors, size), State#state.size, State#state.latched_size}, + StateName, State}; handle_sync_event(get_avail_workers, _From, StateName, State) -> Workers = State#state.workers, WorkerList = queue:to_list(Workers), @@ -273,6 +359,26 @@ handle_sync_event(get_all_workers, _From, StateName, State) -> handle_sync_event(get_all_monitors, _From, StateName, State) -> Monitors = ets:tab2list(State#state.monitors), {reply, Monitors, StateName, State}; +handle_sync_event(get_pool_size, _From, StateName, State) -> + {reply, {State#state.size, State#state.latched_size, State#state.max_overflow}, StateName, State}; +handle_sync_event({set_pool_size, NewSize}, _From, StateName, State) -> + handle_sync_event({set_pool_size, NewSize, State#state.max_overflow}, _From, StateName, State); +handle_sync_event({set_pool_size, NewSize, NewMaxOverflow}, _From, StateName, State) -> + %% minimize overflow + SizeDiff = NewSize - State#state.size, + MegaDiff = NewSize - (State#state.size + State#state.overflow), + {SizeCorrected, OverflowCorrected} = + if MegaDiff > 0 -> + {State#state.size + State#state.overflow, 0}; %% with some slack + SizeDiff > 0 -> + {State#state.size + SizeDiff, State#state.overflow - SizeDiff}; + el/=se -> + {State#state.size, State#state.overflow} + end, + {reply, ok, StateName, State#state{latched_size = NewSize, + max_overflow = NewMaxOverflow, + size = SizeCorrected, + overflow = OverflowCorrected}}; handle_sync_event(stop, _From, _StateName, State) -> Sup = State#state.supervisor, true = exit(Sup, shutdown), @@ -368,8 +474,11 @@ checkin_while_full(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, + workers = Workers, max_overflow = MaxOverflow, - overflow = Overflow} = State, + overflow = Overflow, + size = Size, + latched_size = LatchedSize} = State, case queue:out(Waiting) of {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} -> case wait_valid(StartTime, Timeout) of @@ -382,13 +491,29 @@ checkin_while_full(Pid, State) -> checkin_while_full(Pid, State#state{waiting=Left}) end; {empty, Empty} when MaxOverflow < 1 -> - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers, - waiting=Empty}}; + if Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {next_state, full, State#state{waiting = Empty, + size = Size - 1}}; + el/=se -> + {next_state, ready, State#state{workers = queue:in(Pid, Workers), + waiting = Empty}} + end; {empty, Empty} -> - ok = dismiss_worker(Sup, Pid), - {next_state, overflow, State#state{waiting=Empty, - overflow=Overflow-1}} + {NextState, NewSize, NewOverflow, NewWorkers} = + if Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {full, Size - 1, Overflow, Workers}; + Overflow > 0 -> + ok = dismiss_worker(Sup, Pid), + {overflow, Size, Overflow - 1, Workers}; + el/=se -> + {ready, Size, 0, queue:in(Pid, Workers)} + end, + {next_state, NextState, State#state{waiting = Empty, + workers = NewWorkers, + size = NewSize, + overflow = NewOverflow}} end. handle_worker_exit(Pid, StateName, State) -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 6a19b66..764ad77 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -1,6 +1,7 @@ -module(poolboy_tests). -include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). -compile({nowarn_deprecated_function, [{gen_fsm, sync_send_all_state_event, 2}]}). @@ -21,6 +22,9 @@ pool_test_() -> error_logger:tty(true) end, [ + {<<"Pool size adjustments">>, + fun pool_resize/0 + }, {<<"Basic pool operations">>, fun pool_startup/0 }, @@ -369,6 +373,106 @@ checkin_after_exception_in_transaction() -> ?assertEqual(2, length(?sync(Pool, get_avail_workers))), ok = ?sync(Pool, stop). +pool_resize() -> + [ ok = pool_resize(SZ0, OF0, SZ1, OF1, Block) + || SZ0 <- lists:seq(1, 3), + OF0 <- lists:seq(0, 5), + SZ1 <- lists:seq(1, 5), + OF1 <- lists:seq(0, 3), + SZ0 < SZ1, + Block <- [false, true] ], + ok. + +pool_resize(Size0, Overflow0, Size1, Overflow1, Block) when Size1 > Size0 -> + io:format("\npool_resize ~b, ~b to ~b, ~b (Block: ~p)\n", [Size0, Overflow0, Size1, Overflow1, Block]), + {ok, Pool} = new_pool(Size0, Overflow0), + %% actual size, latched size, overflow: + ?assertEqual({Size0, Size0, Overflow0}, ?sync(Pool, get_pool_size)), + + AllInitialWorkers = [poolboy:checkout(Pool, Block) || _ <- lists:seq(1, Size0 + Overflow0)], + ?assertEqual(full, poolboy:checkout(Pool, false)), + ?assertEqual({full, _WorkerQueue = 0, Overflow0, _Monitors = Size0 + Overflow0, + Size0, _LatchedSize = Size0}, poolboy:status_ext(Pool)), + + %% grow + ok = ?sync(Pool, {set_pool_size, Size1, Overflow1}), + %% actual size can be corrected ('pre-grown') towards latched size + %% taking into account current overflow, so we don't match on it + ?assertMatch({_, Size1, Overflow1}, ?sync(Pool, get_pool_size)), + io:format("after growing, pool size is ~p and state is ~p\n", [?sync(Pool, get_pool_size), poolboy:status_ext(Pool)]), + + %% sup still has Size0 children but (Size1 - Size0) more checkouts should succeed + %% ?assertEqual({full, 0, Overflow0, Size0 + Overflow0, Size0, Size1}, poolboy:status_ext(Pool)): + %% no change until extra checkouts + ?assertEqual(Size0 + Overflow0, length(?sync(Pool, get_all_workers))), + + case (Size1 + Overflow1) - (Size0 + Overflow0) of + Diff when Diff > 0 -> + MoreSeq = lists:seq(1, Diff), + io:format("going to check out ~b extra workers\n", [length(MoreSeq)]), + ExtraWorkers = + [ begin + io:format("Checking out one, Status: ~p\n", [poolboy:status_ext(Pool)]), + W = poolboy:checkout(Pool, Block), + ?assert(is_pid(W)), + %% it doesn't seem right that checkouts are possible + %% even when status is 'full'; this is temporary and by + %% design, until number of children after resize reaches + %% latched size + ?assertEqual(Size0 + Overflow0 + N, length(?sync(Pool, get_all_workers))), + W + end || N <- MoreSeq ], + %% now we are properly full + %% when we are full and *not undersize*, blocking checkout will timeout, so + ?assertEqual(full, poolboy:checkout(Pool, false)), + %% io:format("After growing, Status: ~p\n", [poolboy:status_ext(Pool)]), + ?assertEqual({full, 0, Overflow1, Size1 + Overflow1, Size1, Size1}, poolboy:status_ext(Pool)), + + %% shrink + io:format("shrinking pool size back to ~b ~b\n", [Size0, Overflow0]), + ok = ?sync(Pool, {set_pool_size, Size0, Overflow0}), + ?assertEqual(0, length(?sync(Pool, get_avail_workers))), + ?assertEqual(Size1 + Overflow1, length(?sync(Pool, get_all_workers))), %% no change until checkins + + %% checkouts are of course not possible + ?assertEqual(full, poolboy:checkout(Pool, false)), + + %% checking in excess workers while full + io:format("now going to check in ~b extra workers\n", [length(ExtraWorkers)]), + _ = [ begin + ok = poolboy:checkin(Pool, W), + io:format(" Checked in one, Status: ~p\n", [poolboy:status_ext(Pool)]) + %% because we are over-overflow, worker is dismissed, not placed on queue + %%?assertEqual(0, length(?sync(Pool, get_avail_workers))), + %% -- except not always, again, depending on values of overflow vs max_overflow. + end || W <- ExtraWorkers ]; + _ -> + ok = ?sync(Pool, {set_pool_size, Size0, Overflow0}), + io:format("(already grown)\n", []) + end, + + %% back to normal + %% checking in non-excess workers + {WorkersOverflowing, WorkersRestRemaining} = lists:split(Overflow0, AllInitialWorkers), + io:format("now going to check in ~b workers to trim in overflow\n", [length(WorkersOverflowing)]), + _ = [ begin + ok = poolboy:checkin(Pool, W), + io:format(" Checked in one, Status: ~p\n", [poolboy:status_ext(Pool)]) + end || W <- WorkersOverflowing ], + + [poolboy:checkin(Pool, W) || W <- WorkersRestRemaining], + + %% make sure our arithmetics is correct (specifically, after all + %% checkins, exactly Size0 workers are on queue) + ?assertEqual({ready, Size0, 0, 0, Size0, Size0}, poolboy:status_ext(Pool)), + ok = ?sync(Pool, stop), + + %% this to avoid an already_started condition on the next + %% new_pool, occasional seen on blinding fast machines such as + %% those running github CI + timer:sleep(10), + ok. + pool_returns_status() -> {ok, Pool} = new_pool(2, 0), ?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)),