Skip to content

Commit

Permalink
Mas i1801 monitorworkerq (#979)
Browse files Browse the repository at this point in the history
The worker pools are now monitored to collate:

- queue_time - how much time does a piece of work spend queueing before being allocated a worker;

- work_time - how much time is spent between the worker being checked out and checked in again (which presumably equated to the time spent doing the work - less any time on the worker_pool server message queue).

As part of this change some erroneous tabs were removed from the worker_pool code, which were skewing the code layout within github
  • Loading branch information
martinsumner authored Nov 9, 2021
1 parent 12cccff commit 7e0aa31
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 114 deletions.
33 changes: 15 additions & 18 deletions src/riak_core_node_worker_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
% Allows you to set up a DSCP-style set of pools (assuming the
% vnode_worker_pool counts as ef. Otherwise can just have a
% single node_worker_pool
:: be_pool|af1_pool|af2_pool|af3_pool|af4_pool|node_worker_pool.
:: be_pool|af1_pool|af2_pool|af3_pool|af4_pool|node_worker_pool.

-export_type([worker_pool/0]).

Expand Down Expand Up @@ -63,26 +63,23 @@ dscp_pools() ->
[af1(), af2(), af3(), af4(), be()].

-spec start_link(atom(), pos_integer(), list(), list(), worker_pool())
-> {ok, pid()}.
-> {ok, pid()}.
%% @doc
%% Start a worker pool, and register under the name PoolType, which should be
%% a recognised name from type worker_pool()
start_link(WorkerMod, PoolSize, WorkerArgs, WorkerProps, PoolType)
when PoolType == be_pool;
PoolType == af1_pool;
PoolType == af2_pool;
PoolType == af3_pool;
PoolType == af4_pool;
PoolType == node_worker_pool ->
when PoolType == be_pool;
PoolType == af1_pool;
PoolType == af2_pool;
PoolType == af3_pool;
PoolType == af4_pool;
PoolType == node_worker_pool ->
{ok, Pid} =
riak_core_worker_pool:start_link([WorkerMod,
PoolSize,
WorkerArgs,
WorkerProps],
?MODULE),
riak_core_worker_pool:start_link(
[WorkerMod, PoolSize, WorkerArgs, WorkerProps],
?MODULE,
PoolType),
register(PoolType, Pid),
lager:info("Registered worker pool of type ~w and size ~w",
[PoolType, PoolSize]),
{ok, Pid}.

do_init([WorkerMod, PoolSize, WorkerArgs, WorkerProps]) ->
Expand All @@ -108,10 +105,10 @@ stop(Pid, Reason) ->

%% wait for all the workers to finish any current work
shutdown_pool(Pid, Wait) ->
riak_core_worker_pool:shutdown_pool(Pid, Wait).
riak_core_worker_pool:shutdown_pool(Pid, Wait).

reply(From, Msg) ->
riak_core_vnode:reply(From, Msg).
riak_core_vnode:reply(From, Msg).

do_work(Pid, Work, From) ->
riak_core_vnode_worker:handle_work(Pid, Work, From).
riak_core_vnode_worker:handle_work(Pid, Work, From).
16 changes: 8 additions & 8 deletions src/riak_core_node_worker_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

%% Helper macro for declaring children of supervisor
-define(CHILD(I, PoolType, Args, Type, Timeout),
{PoolType,
{I, start_link, Args},
permanent, Timeout, Type, [I]}).
{PoolType,
{I, start_link, Args},
permanent, Timeout, Type, [I]}).
-define(CHILD(I, PoolType, Args, Type),
?CHILD(I, PoolType, Args, Type, 5000)).
?CHILD(I, PoolType, Args, Type, 5000)).

-type worker_pool() :: riak_core_node_worker_pool:worker_pool().

Expand All @@ -53,8 +53,8 @@ start_pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
end.

pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
?CHILD(riak_core_node_worker_pool,
QueueType,
[WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType],
worker).
?CHILD(riak_core_node_worker_pool,
QueueType,
[WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType],
worker).

42 changes: 33 additions & 9 deletions src/riak_core_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ handle_call(_Req, _From, State) ->
handle_cast({update, {worker_pool, vnode_pool}}, State) ->
exometer_update([prefix(), ?APP, vnode, worker_pool], 1),
{noreply, State};
handle_cast({update, {worker_pool, queue_time, Pool, QueueTime}}, State) ->
exometer_update([prefix(), ?APP, worker_pool_queuetime, Pool], QueueTime),
{noreply, State};
handle_cast({update, {worker_pool, work_time, Pool, WorkTime}}, State) ->
exometer_update([prefix(), ?APP, worker_pool_worktime, Pool], WorkTime),
{noreply, State};
handle_cast({update, {worker_pool, Pool}}, State) ->
exometer_update([prefix(), ?APP, node, worker_pool, Pool], 1),
{noreply, State};
Expand Down Expand Up @@ -200,12 +206,27 @@ stats() ->
{last, rebalance_delay_last}]} | nwp_stats()].

nwp_stats() ->
[ {[vnode, worker_pool], counter, [], [{value, vnode_worker_pool_total}]},
{[node, worker_pool, unregistered], counter, [], [{value, node_worker_pool_unregistered_total}]} |
[nwp_stat(Pool) || Pool <- riak_core_node_worker_pool:pools()]].
PoolNames = [vnode_pool, unregistered] ++ riak_core_node_worker_pool:pools(),

[nwp_stat(Pool) || Pool <- PoolNames] ++

[nwpqt_stat(Pool) || Pool <- PoolNames] ++

[nwpwt_stat(Pool) || Pool <- PoolNames].

nwp_stat(Pool) ->
{[node, worker_pool, Pool], counter, [], [{value, nwp_name_atom(Pool)}]}.
{[node, worker_pool, Pool], counter, [],
[{value, nwp_name_atom(Pool, <<"_total">>)}]}.

nwpqt_stat(Pool) ->
{[worker_pool_queuetime, Pool], histogram, [],
[{mean , nwp_name_atom(Pool, <<"_queuetime_mean">>)},
{max , nwp_name_atom(Pool, <<"_queuetime_100">>)}]}.

nwpwt_stat(Pool) ->
{[worker_pool_worktime, Pool], histogram, [],
[{mean , nwp_name_atom(Pool, <<"_worktime_mean">>)},
{max , nwp_name_atom(Pool, <<"_worktime_100">>)}]}.

system_stats() ->
[
Expand Down Expand Up @@ -273,16 +294,19 @@ vnodeq_aggregate(Service, MQLs0) ->
vnodeq_atom(Service, Desc) ->
binary_to_atom(<<(atom_to_binary(Service, latin1))/binary, Desc/binary>>, latin1).

nwp_name_atom(Atom) ->
binary_to_atom(<< <<"node_worker_pool_">>/binary,
(atom_to_binary(Atom, latin1))/binary,
<<"_total">>/binary>>, latin1).
-spec nwp_name_atom(atom(), binary()) -> atom().
nwp_name_atom(QueueName, StatName) ->
binary_to_atom(<< <<"worker_">>/binary,
(atom_to_binary(QueueName, latin1))/binary,
StatName/binary >>,
latin1).


-ifdef(TEST).

nwp_name_to_atom_test() ->
?assertEqual(node_worker_pool_af1_pool_total, nwp_name_atom(af1_pool)).
?assertEqual(worker_af1_pool_total, nwp_name_atom(af1_pool, <<"_total">>)).


%% Check vnodeq aggregation function
vnodeq_aggregate_empty_test() ->
Expand Down
32 changes: 14 additions & 18 deletions src/riak_core_vnode_worker_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,32 @@
%% API
-export([start_link/5, stop/2, shutdown_pool/2, handle_work/3]).

start_link(WorkerMod,
PoolSize, VNodeIndex,
WorkerArgs, WorkerProps) ->
riak_core_worker_pool:start_link([WorkerMod,
PoolSize,
VNodeIndex,
WorkerArgs,
WorkerProps],
?MODULE).
start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps) ->
riak_core_worker_pool:start_link(
[WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps],
?MODULE,
vnode_pool).

handle_work(Pid, Work, From) ->
riak_core_stat:update({worker_pool, vnode_pool}),
riak_core_worker_pool:handle_work(Pid, Work, From).
riak_core_worker_pool:handle_work(Pid, Work, From).

stop(Pid, Reason) ->
riak_core_worker_pool:stop(Pid, Reason).

%% wait for all the workers to finish any current work
shutdown_pool(Pid, Wait) ->
riak_core_worker_pool:shutdown_pool(Pid, Wait).
riak_core_worker_pool:shutdown_pool(Pid, Wait).

reply(From, Msg) ->
riak_core_vnode:reply(From, Msg).
riak_core_vnode:reply(From, Msg).

do_init([WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps]) ->
poolboy:start_link([{worker_module, riak_core_vnode_worker},
{worker_args,
[VNodeIndex, WorkerArgs, WorkerProps, self()]},
{worker_callback_mod, WorkerMod},
{size, PoolSize}, {max_overflow, 0}]).
poolboy:start_link([{worker_module, riak_core_vnode_worker},
{worker_args,
[VNodeIndex, WorkerArgs, WorkerProps, self()]},
{worker_callback_mod, WorkerMod},
{size, PoolSize}, {max_overflow, 0}]).

do_work(Pid, Work, From) ->
riak_core_vnode_worker:handle_work(Pid, Work, From).
riak_core_vnode_worker:handle_work(Pid, Work, From).
Loading

0 comments on commit 7e0aa31

Please sign in to comment.