Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 166 additions & 41 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
{gen_fsm, sync_send_all_state_event, 2}]}).

-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
get_pool_size/1, set_pool_size/2, set_pool_size/3,
child_spec/2, child_spec/3, start/1, start/2, start_link/1,
start_link/2, stop/1, status/1]).
start_link/2, stop/1, status/1, status_ext/1]).
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
handle_event/3, handle_sync_event/4, handle_info/3, terminate/3,
code_change/4]).
Expand All @@ -39,6 +40,7 @@
waiting :: poolboy_queue(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
latched_size = 5 :: non_neg_integer(), %% as resized; size to converge eventually with that
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer()
}).
Expand Down Expand Up @@ -70,6 +72,17 @@ transaction(Pool, Fun) ->
ok = poolboy:checkin(Pool, Worker)
end.

-spec get_pool_size(pid()) -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
get_pool_size(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, get_pool_size).

-spec set_pool_size(pid(), non_neg_integer()) -> ok.
set_pool_size(Pid, NewSize) ->
gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize}).
-spec set_pool_size(pid(), non_neg_integer(), non_neg_integer()) -> ok.
set_pool_size(Pid, NewSize, NewMaxOverflow) ->
gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize, NewMaxOverflow}).

-spec child_spec(Pool :: node(), PoolArgs :: proplists:proplist())
-> supervisor:child_spec().
child_spec(Pool, PoolArgs) ->
Expand Down Expand Up @@ -113,6 +126,10 @@ stop(Pool) ->
-spec status(Pool :: node()) -> {atom(), integer(), integer(), integer()}.
status(Pool) ->
gen_fsm:sync_send_all_state_event(Pool, status).
-spec status_ext(Pool :: node()) -> {atom(), integer(), integer(), integer(),
non_neg_integer(), non_neg_integer()}.
status_ext(Pool) ->
gen_fsm:sync_send_all_state_event(Pool, status_ext).

init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Expand All @@ -124,7 +141,7 @@ init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
init(Rest, WorkerArgs, State#state{supervisor=Sup});
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
init(Rest, WorkerArgs, State#state{size=Size});
init(Rest, WorkerArgs, State#state{size=Size, latched_size=Size});
init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) ->
init(Rest, WorkerArgs, State#state{max_overflow=MaxOverflow});
init([_ | Rest], WorkerArgs, State) ->
Expand All @@ -139,13 +156,30 @@ init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow
{ok, StartState, State#state{workers=Workers}}.

ready({checkin, Pid}, State) ->
Monitors = State#state.monitors,
#state{size = Size,
overflow = Overflow,
max_overflow = MaxOverflow,
latched_size = LatchedSize,
supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
true = ets:delete(Monitors, Pid),
Workers = queue:in(Pid, State#state.workers),
{next_state, ready, State#state{workers=Workers}};
{NewSize, NewOverflow, Workers} =
case Size + Overflow > LatchedSize + MaxOverflow of %% when we shrunk
true when Size > LatchedSize ->
ok = dismiss_worker(Sup, Pid),
{Size - 1, Overflow, State#state.workers};
true ->
ok = dismiss_worker(Sup, Pid),
{Size, Overflow - 1, State#state.workers};
false ->
{Size, Overflow, queue:in(Pid, State#state.workers)}
end,
{next_state, ready, State#state{workers = Workers,
size = NewSize,
overflow = NewOverflow}};
[] ->
{next_state, ready, State}
end;
Expand All @@ -154,57 +188,86 @@ ready(_Event, State) ->

ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
#state{supervisor = Sup,
size = Size,
latched_size = LatchedSize,
workers = Workers,
monitors = Monitors,
max_overflow = MaxOverflow} = State,
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
NextState = case queue:is_empty(Left) of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
end,
{reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
if Size < LatchedSize -> %% we grew
%% we are here after a set_pool_size with Size > OldSize
{Pid, Ref} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
{reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{workers=Empty, waiting=Waiting}}
{reply, Pid, ready, State#state{size = Size + 1}};
el/=se ->
case queue:out(Workers) of
{{value, Pid}, Left} ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
NextState = case queue:is_empty(Left) of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
end,
{reply, Pid, NextState, State#state{workers=Left}};
{empty, Empty} when MaxOverflow > 0 ->
{Pid, Ref} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
{reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
{empty, Empty} when Block =:= false ->
{reply, full, full, State#state{workers=Empty}};
{empty, Empty} ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{workers=Empty, waiting=Waiting}}
end
end;
ready(_Event, _From, State) ->
{reply, ok, ready, State}.

overflow({checkin, Pid}, #state{overflow=0}=State) ->
Monitors = State#state.monitors,
#state{monitors = Monitors,
size = Size,
supervisor = Sup,
latched_size = LatchedSize} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
true = ets:delete(Monitors, Pid),
NextState = case State#state.size > 0 of
NextState = case Size > 0 of
true -> ready;
false -> overflow
end,
Workers = queue:in(Pid, State#state.workers),
{next_state, NextState, State#state{overflow=0, workers=Workers}};
Workers =
case Size > LatchedSize of
true ->
ok = dismiss_worker(Sup, Pid),
State#state.workers;
false ->
queue:in(Pid, State#state.workers)
end,
{next_state, NextState, State#state{workers=Workers}};
[] ->
{next_state, overflow, State}
end;
overflow({checkin, Pid}, State) ->
#state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State,
#state{supervisor = Sup,
monitors = Monitors,
overflow = Overflow,
size = Size,
latched_size = LatchedSize} = State,
{NextState, NewOverflow} =
if Size > LatchedSize ->
{full, Overflow};
el/=se ->
{overflow, Overflow - 1}
end,
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
ok = dismiss_worker(Sup, Pid),
true = erlang:demonitor(Ref),
true = ets:delete(Monitors, Pid),
{next_state, overflow, State#state{overflow=Overflow-1}};
{next_state, NextState, State#state{overflow = NewOverflow}};
[] ->
{next_state, overflow, State}
{next_state, NextState, State#state{overflow = NewOverflow}}
end;
overflow(_Event, State) ->
{next_state, overflow, State}.
Expand Down Expand Up @@ -247,11 +310,30 @@ full({checkin, Pid}, State) ->
full(_Event, State) ->
{next_state, full, State}.

full({checkout, true, Timeout}, From, State) ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{waiting=Waiting}};
full({checkout, false, _Timeout}, _From, State) ->
{reply, full, full, State};
full({checkout, Block, Timeout}, {FromPid, _} = From, State) ->
#state{size = Size,
latched_size = LatchedSize,
max_overflow = MaxOverflow,
overflow = Overflow,
monitors = Monitors,
supervisor = Sup} = State,
if Size + Overflow < LatchedSize + MaxOverflow ->
{Pid, Ref} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
{NextState, NewOverflow, NewSize} =
if Size < LatchedSize ->
{full, Overflow, Size + 1};
el/=se ->
{overflow, Overflow + 1, Size}
end,
{reply, Pid, NextState, State#state{size = NewSize,
overflow = NewOverflow}};
Block == true ->
Waiting = add_waiting(From, Timeout, State#state.waiting),
{next_state, full, State#state{waiting=Waiting}};
el/=se ->
{reply, full, full, State}
end;
full(_Event, _From, State) ->
{reply, ok, full, State}.

Expand All @@ -262,6 +344,10 @@ handle_sync_event(status, _From, StateName, State) ->
{reply, {StateName, queue:len(State#state.workers), State#state.overflow,
ets:info(State#state.monitors, size)},
StateName, State};
handle_sync_event(status_ext, _From, StateName, State) ->
{reply, {StateName, queue:len(State#state.workers), State#state.overflow,
ets:info(State#state.monitors, size), State#state.size, State#state.latched_size},
StateName, State};
handle_sync_event(get_avail_workers, _From, StateName, State) ->
Workers = State#state.workers,
WorkerList = queue:to_list(Workers),
Expand All @@ -273,6 +359,26 @@ handle_sync_event(get_all_workers, _From, StateName, State) ->
handle_sync_event(get_all_monitors, _From, StateName, State) ->
Monitors = ets:tab2list(State#state.monitors),
{reply, Monitors, StateName, State};
handle_sync_event(get_pool_size, _From, StateName, State) ->
{reply, {State#state.size, State#state.latched_size, State#state.max_overflow}, StateName, State};
handle_sync_event({set_pool_size, NewSize}, _From, StateName, State) ->
handle_sync_event({set_pool_size, NewSize, State#state.max_overflow}, _From, StateName, State);
handle_sync_event({set_pool_size, NewSize, NewMaxOverflow}, _From, StateName, State) ->
%% minimize overflow
SizeDiff = NewSize - State#state.size,
MegaDiff = NewSize - (State#state.size + State#state.overflow),
{SizeCorrected, OverflowCorrected} =
if MegaDiff > 0 ->
{State#state.size + State#state.overflow, 0}; %% with some slack
SizeDiff > 0 ->
{State#state.size + SizeDiff, State#state.overflow - SizeDiff};
el/=se ->
{State#state.size, State#state.overflow}
end,
{reply, ok, StateName, State#state{latched_size = NewSize,
max_overflow = NewMaxOverflow,
size = SizeCorrected,
overflow = OverflowCorrected}};
handle_sync_event(stop, _From, _StateName, State) ->
Sup = State#state.supervisor,
true = exit(Sup, shutdown),
Expand Down Expand Up @@ -368,8 +474,11 @@ checkin_while_full(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
workers = Workers,
max_overflow = MaxOverflow,
overflow = Overflow} = State,
overflow = Overflow,
size = Size,
latched_size = LatchedSize} = State,
case queue:out(Waiting) of
{{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
case wait_valid(StartTime, Timeout) of
Expand All @@ -382,13 +491,29 @@ checkin_while_full(Pid, State) ->
checkin_while_full(Pid, State#state{waiting=Left})
end;
{empty, Empty} when MaxOverflow < 1 ->
Workers = queue:in(Pid, State#state.workers),
{next_state, ready, State#state{workers=Workers,
waiting=Empty}};
if Size > LatchedSize ->
ok = dismiss_worker(Sup, Pid),
{next_state, full, State#state{waiting = Empty,
size = Size - 1}};
el/=se ->
{next_state, ready, State#state{workers = queue:in(Pid, Workers),
waiting = Empty}}
end;
{empty, Empty} ->
ok = dismiss_worker(Sup, Pid),
{next_state, overflow, State#state{waiting=Empty,
overflow=Overflow-1}}
{NextState, NewSize, NewOverflow, NewWorkers} =
if Size > LatchedSize ->
ok = dismiss_worker(Sup, Pid),
{full, Size - 1, Overflow, Workers};
Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
{overflow, Size, Overflow - 1, Workers};
el/=se ->
{ready, Size, 0, queue:in(Pid, Workers)}
end,
{next_state, NextState, State#state{waiting = Empty,
workers = NewWorkers,
size = NewSize,
overflow = NewOverflow}}
end.

handle_worker_exit(Pid, StateName, State) ->
Expand Down
Loading