diff --git a/src/poolboy.erl b/src/poolboy.erl index 532362c..756ec7b 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -12,8 +12,9 @@ {gen_fsm, sync_send_all_state_event, 2}]}). -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, + get_pool_size/1, set_pool_size/2, set_pool_size/3, child_spec/2, child_spec/3, start/1, start/2, start_link/1, - start_link/2, stop/1, status/1]). + start_link/2, stop/1, status/1, status_ext/1]). -export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). @@ -39,6 +40,7 @@ waiting :: poolboy_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), + latched_size = 5 :: non_neg_integer(), %% as resized; size to converge eventually with that overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer() }). @@ -70,6 +72,17 @@ transaction(Pool, Fun) -> ok = poolboy:checkin(Pool, Worker) end. +-spec get_pool_size(pid()) -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}. +get_pool_size(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, get_pool_size). + +-spec set_pool_size(pid(), non_neg_integer()) -> ok. +set_pool_size(Pid, NewSize) -> + gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize}). +-spec set_pool_size(pid(), non_neg_integer(), non_neg_integer()) -> ok. +set_pool_size(Pid, NewSize, NewMaxOverflow) -> + gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize, NewMaxOverflow}). + -spec child_spec(Pool :: node(), PoolArgs :: proplists:proplist()) -> supervisor:child_spec(). child_spec(Pool, PoolArgs) -> @@ -113,6 +126,10 @@ stop(Pool) -> -spec status(Pool :: node()) -> {atom(), integer(), integer(), integer()}. status(Pool) -> gen_fsm:sync_send_all_state_event(Pool, status). +-spec status_ext(Pool :: node()) -> {atom(), integer(), integer(), integer(), + non_neg_integer(), non_neg_integer()}. +status_ext(Pool) -> + gen_fsm:sync_send_all_state_event(Pool, status_ext). init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), @@ -124,7 +141,7 @@ init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), init(Rest, WorkerArgs, State#state{supervisor=Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size=Size}); + init(Rest, WorkerArgs, State#state{size=Size, latched_size=Size}); init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> init(Rest, WorkerArgs, State#state{max_overflow=MaxOverflow}); init([_ | Rest], WorkerArgs, State) -> @@ -139,13 +156,30 @@ init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow {ok, StartState, State#state{workers=Workers}}. ready({checkin, Pid}, State) -> - Monitors = State#state.monitors, + #state{size = Size, + overflow = Overflow, + max_overflow = MaxOverflow, + latched_size = LatchedSize, + supervisor = Sup, + monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers}}; + {NewSize, NewOverflow, Workers} = + case Size + Overflow > LatchedSize + MaxOverflow of %% when we shrunk + true when Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {Size - 1, Overflow, State#state.workers}; + true -> + ok = dismiss_worker(Sup, Pid), + {Size, Overflow - 1, State#state.workers}; + false -> + {Size, Overflow, queue:in(Pid, State#state.workers)} + end, + {next_state, ready, State#state{workers = Workers, + size = NewSize, + overflow = NewOverflow}}; [] -> {next_state, ready, State} end; @@ -154,57 +188,86 @@ ready(_Event, State) -> ready({checkout, Block, Timeout}, {FromPid, _}=From, State) -> #state{supervisor = Sup, + size = Size, + latched_size = LatchedSize, workers = Workers, monitors = Monitors, max_overflow = MaxOverflow} = State, - case queue:out(Workers) of - {{value, Pid}, Left} -> - Ref = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, Ref}), - NextState = case queue:is_empty(Left) of - true when MaxOverflow < 1 -> full; - true -> overflow; - false -> ready - end, - {reply, Pid, NextState, State#state{workers=Left}}; - {empty, Empty} when MaxOverflow > 0 -> + if Size < LatchedSize -> %% we grew + %% we are here after a set_pool_size with Size > OldSize {Pid, Ref} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, Ref}), - {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; - {empty, Empty} when Block =:= false -> - {reply, full, full, State#state{workers=Empty}}; - {empty, Empty} -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{workers=Empty, waiting=Waiting}} + {reply, Pid, ready, State#state{size = Size + 1}}; + el/=se -> + case queue:out(Workers) of + {{value, Pid}, Left} -> + Ref = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + NextState = case queue:is_empty(Left) of + true when MaxOverflow < 1 -> full; + true -> overflow; + false -> ready + end, + {reply, Pid, NextState, State#state{workers=Left}}; + {empty, Empty} when MaxOverflow > 0 -> + {Pid, Ref} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; + {empty, Empty} when Block =:= false -> + {reply, full, full, State#state{workers=Empty}}; + {empty, Empty} -> + Waiting = add_waiting(From, Timeout, State#state.waiting), + {next_state, full, State#state{workers=Empty, waiting=Waiting}} + end end; ready(_Event, _From, State) -> {reply, ok, ready, State}. overflow({checkin, Pid}, #state{overflow=0}=State) -> - Monitors = State#state.monitors, + #state{monitors = Monitors, + size = Size, + supervisor = Sup, + latched_size = LatchedSize} = State, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - NextState = case State#state.size > 0 of + NextState = case Size > 0 of true -> ready; false -> overflow end, - Workers = queue:in(Pid, State#state.workers), - {next_state, NextState, State#state{overflow=0, workers=Workers}}; + Workers = + case Size > LatchedSize of + true -> + ok = dismiss_worker(Sup, Pid), + State#state.workers; + false -> + queue:in(Pid, State#state.workers) + end, + {next_state, NextState, State#state{workers=Workers}}; [] -> {next_state, overflow, State} end; overflow({checkin, Pid}, State) -> - #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State, + #state{supervisor = Sup, + monitors = Monitors, + overflow = Overflow, + size = Size, + latched_size = LatchedSize} = State, + {NextState, NewOverflow} = + if Size > LatchedSize -> + {full, Overflow}; + el/=se -> + {overflow, Overflow - 1} + end, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> ok = dismiss_worker(Sup, Pid), true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - {next_state, overflow, State#state{overflow=Overflow-1}}; + {next_state, NextState, State#state{overflow = NewOverflow}}; [] -> - {next_state, overflow, State} + {next_state, NextState, State#state{overflow = NewOverflow}} end; overflow(_Event, State) -> {next_state, overflow, State}. @@ -247,11 +310,30 @@ full({checkin, Pid}, State) -> full(_Event, State) -> {next_state, full, State}. -full({checkout, true, Timeout}, From, State) -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{waiting=Waiting}}; -full({checkout, false, _Timeout}, _From, State) -> - {reply, full, full, State}; +full({checkout, Block, Timeout}, {FromPid, _} = From, State) -> + #state{size = Size, + latched_size = LatchedSize, + max_overflow = MaxOverflow, + overflow = Overflow, + monitors = Monitors, + supervisor = Sup} = State, + if Size + Overflow < LatchedSize + MaxOverflow -> + {Pid, Ref} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + {NextState, NewOverflow, NewSize} = + if Size < LatchedSize -> + {full, Overflow, Size + 1}; + el/=se -> + {overflow, Overflow + 1, Size} + end, + {reply, Pid, NextState, State#state{size = NewSize, + overflow = NewOverflow}}; + Block == true -> + Waiting = add_waiting(From, Timeout, State#state.waiting), + {next_state, full, State#state{waiting=Waiting}}; + el/=se -> + {reply, full, full, State} + end; full(_Event, _From, State) -> {reply, ok, full, State}. @@ -262,6 +344,10 @@ handle_sync_event(status, _From, StateName, State) -> {reply, {StateName, queue:len(State#state.workers), State#state.overflow, ets:info(State#state.monitors, size)}, StateName, State}; +handle_sync_event(status_ext, _From, StateName, State) -> + {reply, {StateName, queue:len(State#state.workers), State#state.overflow, + ets:info(State#state.monitors, size), State#state.size, State#state.latched_size}, + StateName, State}; handle_sync_event(get_avail_workers, _From, StateName, State) -> Workers = State#state.workers, WorkerList = queue:to_list(Workers), @@ -273,6 +359,26 @@ handle_sync_event(get_all_workers, _From, StateName, State) -> handle_sync_event(get_all_monitors, _From, StateName, State) -> Monitors = ets:tab2list(State#state.monitors), {reply, Monitors, StateName, State}; +handle_sync_event(get_pool_size, _From, StateName, State) -> + {reply, {State#state.size, State#state.latched_size, State#state.max_overflow}, StateName, State}; +handle_sync_event({set_pool_size, NewSize}, _From, StateName, State) -> + handle_sync_event({set_pool_size, NewSize, State#state.max_overflow}, _From, StateName, State); +handle_sync_event({set_pool_size, NewSize, NewMaxOverflow}, _From, StateName, State) -> + %% minimize overflow + SizeDiff = NewSize - State#state.size, + MegaDiff = NewSize - (State#state.size + State#state.overflow), + {SizeCorrected, OverflowCorrected} = + if MegaDiff > 0 -> + {State#state.size + State#state.overflow, 0}; %% with some slack + SizeDiff > 0 -> + {State#state.size + SizeDiff, State#state.overflow - SizeDiff}; + el/=se -> + {State#state.size, State#state.overflow} + end, + {reply, ok, StateName, State#state{latched_size = NewSize, + max_overflow = NewMaxOverflow, + size = SizeCorrected, + overflow = OverflowCorrected}}; handle_sync_event(stop, _From, _StateName, State) -> Sup = State#state.supervisor, true = exit(Sup, shutdown), @@ -368,8 +474,11 @@ checkin_while_full(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, + workers = Workers, max_overflow = MaxOverflow, - overflow = Overflow} = State, + overflow = Overflow, + size = Size, + latched_size = LatchedSize} = State, case queue:out(Waiting) of {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} -> case wait_valid(StartTime, Timeout) of @@ -382,13 +491,29 @@ checkin_while_full(Pid, State) -> checkin_while_full(Pid, State#state{waiting=Left}) end; {empty, Empty} when MaxOverflow < 1 -> - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers, - waiting=Empty}}; + if Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {next_state, full, State#state{waiting = Empty, + size = Size - 1}}; + el/=se -> + {next_state, ready, State#state{workers = queue:in(Pid, Workers), + waiting = Empty}} + end; {empty, Empty} -> - ok = dismiss_worker(Sup, Pid), - {next_state, overflow, State#state{waiting=Empty, - overflow=Overflow-1}} + {NextState, NewSize, NewOverflow, NewWorkers} = + if Size > LatchedSize -> + ok = dismiss_worker(Sup, Pid), + {full, Size - 1, Overflow, Workers}; + Overflow > 0 -> + ok = dismiss_worker(Sup, Pid), + {overflow, Size, Overflow - 1, Workers}; + el/=se -> + {ready, Size, 0, queue:in(Pid, Workers)} + end, + {next_state, NextState, State#state{waiting = Empty, + workers = NewWorkers, + size = NewSize, + overflow = NewOverflow}} end. handle_worker_exit(Pid, StateName, State) -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 6a19b66..764ad77 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -1,6 +1,7 @@ -module(poolboy_tests). -include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). -compile({nowarn_deprecated_function, [{gen_fsm, sync_send_all_state_event, 2}]}). @@ -21,6 +22,9 @@ pool_test_() -> error_logger:tty(true) end, [ + {<<"Pool size adjustments">>, + fun pool_resize/0 + }, {<<"Basic pool operations">>, fun pool_startup/0 }, @@ -369,6 +373,106 @@ checkin_after_exception_in_transaction() -> ?assertEqual(2, length(?sync(Pool, get_avail_workers))), ok = ?sync(Pool, stop). +pool_resize() -> + [ ok = pool_resize(SZ0, OF0, SZ1, OF1, Block) + || SZ0 <- lists:seq(1, 3), + OF0 <- lists:seq(0, 5), + SZ1 <- lists:seq(1, 5), + OF1 <- lists:seq(0, 3), + SZ0 < SZ1, + Block <- [false, true] ], + ok. + +pool_resize(Size0, Overflow0, Size1, Overflow1, Block) when Size1 > Size0 -> + io:format("\npool_resize ~b, ~b to ~b, ~b (Block: ~p)\n", [Size0, Overflow0, Size1, Overflow1, Block]), + {ok, Pool} = new_pool(Size0, Overflow0), + %% actual size, latched size, overflow: + ?assertEqual({Size0, Size0, Overflow0}, ?sync(Pool, get_pool_size)), + + AllInitialWorkers = [poolboy:checkout(Pool, Block) || _ <- lists:seq(1, Size0 + Overflow0)], + ?assertEqual(full, poolboy:checkout(Pool, false)), + ?assertEqual({full, _WorkerQueue = 0, Overflow0, _Monitors = Size0 + Overflow0, + Size0, _LatchedSize = Size0}, poolboy:status_ext(Pool)), + + %% grow + ok = ?sync(Pool, {set_pool_size, Size1, Overflow1}), + %% actual size can be corrected ('pre-grown') towards latched size + %% taking into account current overflow, so we don't match on it + ?assertMatch({_, Size1, Overflow1}, ?sync(Pool, get_pool_size)), + io:format("after growing, pool size is ~p and state is ~p\n", [?sync(Pool, get_pool_size), poolboy:status_ext(Pool)]), + + %% sup still has Size0 children but (Size1 - Size0) more checkouts should succeed + %% ?assertEqual({full, 0, Overflow0, Size0 + Overflow0, Size0, Size1}, poolboy:status_ext(Pool)): + %% no change until extra checkouts + ?assertEqual(Size0 + Overflow0, length(?sync(Pool, get_all_workers))), + + case (Size1 + Overflow1) - (Size0 + Overflow0) of + Diff when Diff > 0 -> + MoreSeq = lists:seq(1, Diff), + io:format("going to check out ~b extra workers\n", [length(MoreSeq)]), + ExtraWorkers = + [ begin + io:format("Checking out one, Status: ~p\n", [poolboy:status_ext(Pool)]), + W = poolboy:checkout(Pool, Block), + ?assert(is_pid(W)), + %% it doesn't seem right that checkouts are possible + %% even when status is 'full'; this is temporary and by + %% design, until number of children after resize reaches + %% latched size + ?assertEqual(Size0 + Overflow0 + N, length(?sync(Pool, get_all_workers))), + W + end || N <- MoreSeq ], + %% now we are properly full + %% when we are full and *not undersize*, blocking checkout will timeout, so + ?assertEqual(full, poolboy:checkout(Pool, false)), + %% io:format("After growing, Status: ~p\n", [poolboy:status_ext(Pool)]), + ?assertEqual({full, 0, Overflow1, Size1 + Overflow1, Size1, Size1}, poolboy:status_ext(Pool)), + + %% shrink + io:format("shrinking pool size back to ~b ~b\n", [Size0, Overflow0]), + ok = ?sync(Pool, {set_pool_size, Size0, Overflow0}), + ?assertEqual(0, length(?sync(Pool, get_avail_workers))), + ?assertEqual(Size1 + Overflow1, length(?sync(Pool, get_all_workers))), %% no change until checkins + + %% checkouts are of course not possible + ?assertEqual(full, poolboy:checkout(Pool, false)), + + %% checking in excess workers while full + io:format("now going to check in ~b extra workers\n", [length(ExtraWorkers)]), + _ = [ begin + ok = poolboy:checkin(Pool, W), + io:format(" Checked in one, Status: ~p\n", [poolboy:status_ext(Pool)]) + %% because we are over-overflow, worker is dismissed, not placed on queue + %%?assertEqual(0, length(?sync(Pool, get_avail_workers))), + %% -- except not always, again, depending on values of overflow vs max_overflow. + end || W <- ExtraWorkers ]; + _ -> + ok = ?sync(Pool, {set_pool_size, Size0, Overflow0}), + io:format("(already grown)\n", []) + end, + + %% back to normal + %% checking in non-excess workers + {WorkersOverflowing, WorkersRestRemaining} = lists:split(Overflow0, AllInitialWorkers), + io:format("now going to check in ~b workers to trim in overflow\n", [length(WorkersOverflowing)]), + _ = [ begin + ok = poolboy:checkin(Pool, W), + io:format(" Checked in one, Status: ~p\n", [poolboy:status_ext(Pool)]) + end || W <- WorkersOverflowing ], + + [poolboy:checkin(Pool, W) || W <- WorkersRestRemaining], + + %% make sure our arithmetics is correct (specifically, after all + %% checkins, exactly Size0 workers are on queue) + ?assertEqual({ready, Size0, 0, 0, Size0, Size0}, poolboy:status_ext(Pool)), + ok = ?sync(Pool, stop), + + %% this to avoid an already_started condition on the next + %% new_pool, occasional seen on blinding fast machines such as + %% those running github CI + timer:sleep(10), + ok. + pool_returns_status() -> {ok, Pool} = new_pool(2, 0), ?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)),