Skip to content

Commit

Permalink
Merge pull request #1189 from basho/feature/stats-for-riak-pb-client
Browse files Browse the repository at this point in the history
Add latency stats for riak pb client operations

Reviewed-by: kuenishi
  • Loading branch information
borshop committed Jul 16, 2015
2 parents 3274aa3 + bfe6871 commit 71b0916
Show file tree
Hide file tree
Showing 28 changed files with 472 additions and 318 deletions.
7 changes: 4 additions & 3 deletions dialyzer.ignore-warnings.ee
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Errors
riak_cs_block_server.erl:312: The pattern Success = {'ok', _} can never match the type {'error',_}
riak_cs_block_server.erl:347: The pattern {'ok', RiakObject} can never match the type {'error',_}
riak_cs_pbc.erl:100: The pattern {'ok', ClusterID} can never match the type {'error',_}
riak_cs_block_server.erl:324: The pattern Success = {'ok', _} can never match the type {'error',_}
riak_cs_block_server.erl:361: The pattern {'ok', RiakObject} can never match the type {'error',_}
riak_cs_pbc.erl:58: The variable _ can never match since previous clauses completely covered the type 'pong'
riak_cs_pbc.erl:175: The pattern {'ok', ClusterID} can never match the type {'error',_}
# Warnings
Unknown functions:
app_helper:get_prop_or_env/3
Expand Down
2 changes: 1 addition & 1 deletion priv/tools/repair_gc_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ process_gc_keys(Pbc, Options, Continuation, [GCKey | Keys]) ->
{error, term()}.
repair_manifests_for_gc_key(Pbc, Options, GCKey) ->
Timeout = riak_cs_config:get_gckey_timeout(),
case riak_cs_pbc:get_object(Pbc, ?GC_BUCKET, GCKey, Timeout) of
case riakc_pb_socket:get(Pbc, ?GC_BUCKET, GCKey, [], Timeout) of
{ok, GCObj} ->
FileSet = riak_cs_gc:decode_and_merge_siblings(
GCObj, twop_set:new()),
Expand Down
55 changes: 26 additions & 29 deletions riak_test/tests/stats_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ confirm() ->
lager:info("Confirming stats"),
Stats1 = query_stats(UserConfig, rtcs:cs_port(hd(RiakNodes))),
_Stats2 = status_cmd(),
confirm_stat_count(Stats1, <<"service_get_buckets">>, 2),
confirm_stat_count(Stats1, <<"object_gets">>, 1),
confirm_stat_count(Stats1, <<"object_puts">>, 1),
confirm_stat_count(Stats1, <<"object_deletes">>, 1),
confirm_stat_count(Stats1, <<"service_get_out_one">>, 2),
confirm_stat_count(Stats1, <<"object_get_out_one">>, 1),
confirm_stat_count(Stats1, <<"object_put_out_one">>, 1),
confirm_stat_count(Stats1, <<"object_delete_out_one">>, 1),
rtcs:pass().

status_cmd() ->
Expand All @@ -72,35 +72,32 @@ query_stats(UserConfig, Port) ->

confirm_initial_stats(StatData) ->
%% Check for values for all meters to be 0 when system is initially started
?assertEqual(614, length(StatData)),
[?assert(proplists:is_defined(StatType, StatData))
|| StatType <- [<<"block_gets">>,
<<"block_puts">>,
<<"block_deletes">>,
<<"service_get_buckets">>,
<<"bucket_list_keys">>,
<<"bucket_creates">>,
<<"bucket_deletes">>,
<<"bucket_get_acl">>,
<<"bucket_put_acl">>,
<<"object_gets">>,
<<"object_puts">>,
<<"object_heads">>,
<<"object_deletes">>,
<<"object_get_acl">>,
<<"object_put_acl">>]],

Exceptions = [<<"request_pool_workers">>,
<<"request_pool_size">>,
<<"bucket_list_pool_workers">>],
ShouldBeZeros = lists:foldl(fun proplists:delete/2, StatData, Exceptions),
?assertEqual(1096, length(StatData)),
[begin
lager:debug("testing ~p:~p", [Name, Value]),
StatKey = list_to_binary(StatType ++ "_out_one"),
lager:debug("StatKey: ~p~n", [StatKey]),
?assert(proplists:is_defined(StatKey, StatData)),
Value = proplists:get_value(StatKey, StatData),
?assertEqual(0, Value)
end|| {Name, Value} <- ShouldBeZeros],
end || StatType <- ["service_get",
"list_objects_get",
"bucket_put",
"bucket_delete",
"bucket_acl_get",
"bucket_acl_put",
"object_get",
"object_put",
"object_head",
"object_delete",
"object_acl_get",
"object_acl_put",
"riakc_get_block_n_one",
"riakc_put_block",
"riakc_delete_block_constrained"
]],

lager:debug("~p", [proplists:get_value(<<"request_pool_workers">>, StatData)]),
?assertEqual(rtcs:request_pool_size()-1,
?assertEqual(rtcs:request_pool_size() - 1,
proplists:get_value(<<"request_pool_workers">>, StatData)),
?assertEqual(rtcs:bucket_list_pool_size(),
proplists:get_value(<<"bucket_list_pool_workers">>, StatData)).
Expand Down
7 changes: 4 additions & 3 deletions src/riak_cs_access.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,16 @@ merge_stats(Stats, Acc) ->
%% which the keys are Riak CS node names. The value for each key is a
%% list of samples. Each sample is an orddict full of metrics.
-spec get_usage(riak_client(),
term(), %% TODO: riak_cs:user_key() type doesn't exist
term(), %% TODO: riak_cs:user_key() type doesn't exist
boolean(), %% Not used in this module
calendar:datetime(),
calendar:datetime()) ->
{Usage::orddict:orddict(), Errors::[{slice(), term()}]}.
get_usage(RcPid, User, _AdminAccess, Start, End) ->
{ok, Period} = archive_period(),
{Usage, Errors} = rts:find_samples(RcPid, ?ACCESS_BUCKET, User,
Start, End, Period),
RtsPuller = riak_cs_riak_client:rts_puller(
RcPid, ?ACCESS_BUCKET, User, [riakc, get_access]),
{Usage, Errors} = rts:find_samples(RtsPuller, Start, End, Period),
{group_by_node(Usage), Errors}.

group_by_node(Samples) ->
Expand Down
3 changes: 2 additions & 1 deletion src/riak_cs_access_archiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ archive_user(User, RcPid, Table, Slice) ->

store(User, RcPid, Record, Slice) ->
{ok, MasterPbc} = riak_cs_riak_client:master_pbc(RcPid),
case catch riakc_pb_socket:put(MasterPbc, Record) of
Timeout = riak_cs_config:put_access_timeout(),
case catch riak_cs_pbc:put(MasterPbc, Record, Timeout, [riakc, put_access]) of
ok ->
ok = lager:debug("Archived access stats for ~s ~p",
[User, Slice]);
Expand Down
4 changes: 2 additions & 2 deletions src/riak_cs_acl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ owner_id(?ACL{owner=Owner}, _) ->
owner_id(#acl_v1{owner=OwnerData}, RcPid) ->
{Name, CanonicalId} = OwnerData,
case riak_cs_user:get_user_by_index(?ID_INDEX,
list_to_binary(CanonicalId),
RcPid) of
list_to_binary(CanonicalId),
RcPid) of
{ok, {Owner, _}} ->
Owner?RCS_USER.key_id;
{error, _} ->
Expand Down
119 changes: 56 additions & 63 deletions src/riak_cs_block_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,20 @@ handle_cast({put_block, ReplyPid, Bucket, Key, UUID, BlockNumber, Value, BCSum},
[Bucket, Key, UUID, BlockNumber, Error])
end,
%% TODO: Handle put failure here.
ok = do_put_block(FullBucket, FullKey, <<>>, Value, MD, RcPid, FailFun),
ok = do_put_block(FullBucket, FullKey, <<>>, Value, MD,
RcPid, [riakc, put_block], FailFun),
riak_cs_put_fsm:block_written(ReplyPid, BlockNumber),
dt_return(<<"put_block">>, [BlockNumber], [Bucket, Key]),
{noreply, State};
handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#state{riak_client=RcPid}) ->
dt_entry(<<"delete_block">>, [BlockNumber], [Bucket, Key]),
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber),
StartTime = os:timestamp(),
Timeout = riak_cs_config:get_block_timeout(),

%% do a get first to get the vclock (only do a head request though)
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}, head],
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of
GetOptions = [head | r_one_options()],
_ = case riak_cs_pbc:get(block_pbc(RcPid), FullBucket, FullKey,
GetOptions, Timeout, [riakc, head_block]) of
{ok, RiakObject} ->
ok = delete_block(RcPid, ReplyPid, RiakObject, {UUID, BlockNumber});
{error, notfound} ->
Expand All @@ -215,7 +216,6 @@ handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#sta
%% move on to the next block.
riak_cs_delete_fsm:block_deleted(ReplyPid, {ok, {UUID, BlockNumber}})
end,
ok = riak_cs_stats:update_with_start([block, delete], StartTime),
dt_return(<<"delete_block">>, [BlockNumber], [Bucket, Key]),
{noreply, State};
handle_cast(_Msg, State) ->
Expand All @@ -229,11 +229,11 @@ get_block(ReplyPid, Bucket, Key, ClusterId, BagId, UUID, BlockNumber, RcPid) ->

case riak_cs_utils:n_val_1_get_requests() of
true ->
do_get_block(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, ProxyActive, UUID,
BlockNumber, RcPid);
do_get_block(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, ProxyActive,
UUID, BlockNumber, RcPid);
false ->
normal_nval_block_get(ReplyPid, Bucket, Key, ClusterId,
UseProxyGet, UUID, BlockNumber, RcPid)
get_block_legacy(ReplyPid, Bucket, Key, ClusterId,
UseProxyGet, UUID, BlockNumber, RcPid)
end.

do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive,
Expand Down Expand Up @@ -264,21 +264,17 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive,
dt_entry(<<"get_block">>, [BlockNumber], [Bucket, Key]),
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber),

StartTime = os:timestamp(),
GetOptions1 = n_val_one_options(),
GetOptions2 = r_one_options(),

ProceedFun = fun(OkReply) ->
ok = riak_cs_stats:update_with_start([block, get, retry], StartTime),
ok = riak_cs_get_fsm:chunk(ReplyPid, {UUID, BlockNumber}, OkReply),
dt_return(<<"get_block">>, [BlockNumber], [Bucket, Key])
end,
RetryFun = fun(NewPause) ->
ok = riak_cs_stats:update_with_start([block, get, retry], StartTime),
do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet,
ProxyActive, UUID, BlockNumber, RcPid, MaxRetries, NewPause)
end,

Timeout = riak_cs_config:local_get_block_timeout(),
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2,
Timeout, ProceedFun, RetryFun, ErrorReasons, UseProxyGet,
Expand All @@ -287,7 +283,8 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive,
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2,
Timeout, ProceedFun, RetryFun, ErrorReasons, UseProxyGet,
ProxyActive, ClusterID) ->
case get_block_local(RcPid, FullBucket, FullKey, GetOptions1, Timeout) of
case get_block_local(RcPid, FullBucket, FullKey, GetOptions1, Timeout,
[riakc, get_block_n_one]) of
{ok, _} = Success ->
ProceedFun(Success);
{error, {insufficient_vnodes,_,need,_} = Reason} ->
Expand All @@ -311,18 +308,19 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2,
ProxyActive, ClusterID) ->

Timeout = riak_cs_config:get_block_timeout(),
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout) of
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout,
[riakc, get_block_n_all]) of
{ok, _} = Success ->
ProceedFun(Success);

{error, Why} when Why == disconnected;
Why == timeout ->
_ = lager:debug("get_block_local/5 failed: {error, ~p}", [Why]),
_ = lager:debug("get_block_local() failed: {error, ~p}", [Why]),
RetryFun([{local_quorum, Why}|ErrorReasons]);

{error, notfound} when UseProxyGet andalso ProxyActive->
case get_block_remote(RcPid, FullBucket, FullKey,
ClusterID, GetOptions2) of
case get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions2,
[riakc, get_block_remote]) of
{ok, _} = Success ->
ProceedFun(Success);
{error, Reason} ->
Expand All @@ -339,26 +337,27 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2,
RetryFun({failure, [{local_quorum, Other}|ErrorReasons]})
end.

get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout) ->
case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of
-spec get_block_local(riak_client(), binary(), binary(), list(),
timeout(), riak_cs_stats:key()) ->
{ok, binary()} | {error, term()}.
get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout, StatsKey) ->
case riak_cs_pbc:get(block_pbc(RcPid), FullBucket, FullKey,
GetOptions, Timeout, StatsKey) of
{ok, RiakObject} ->
resolve_block_object(RiakObject, RcPid);
%% %% Corrupted siblings hack: just add another....
%% [{MD,V}] = riakc_obj:get_contents(RiakObject),
%% RiakObject2 = setelement(5, RiakObject, [{MD, <<"foobar">>}, {MD, V}]),
%% resolve_block_object(RiakObject2, RcPid);
Else ->
Else
end.

-spec get_block_remote(riak_client(), binary(), binary(), binary(), get_options()) ->
-spec get_block_remote(riak_client(), binary(), binary(), binary(), get_options(),
riak_cs_stats:key()) ->
{ok, binary()} | {error, term()}.
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) ->
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0, StatsKey) ->
%% replace get_block_timeout with proxy_get_block_timeout
GetOptions = proplists:delete(timeout, GetOptions0),
Timeout = riak_cs_config:proxy_get_block_timeout(),
case riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey,
ClusterID, GetOptions, Timeout) of
case riak_cs_pbc:repl_get(block_pbc(RcPid), FullBucket, FullKey,
ClusterID, GetOptions, Timeout, StatsKey) of
{ok, RiakObject} ->
resolve_block_object(RiakObject, RcPid);
Else ->
Expand All @@ -367,30 +366,24 @@ get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) ->

%% @doc This is the 'legacy' block get, before we introduced the ability
%% to modify n-val per GET request.
normal_nval_block_get(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID,
BlockNumber, RcPid) ->
dt_entry(<<"get_block">>, [BlockNumber], [Bucket, Key]),

get_block_legacy(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID,
BlockNumber, RcPid) ->
dt_entry(<<"get_block_legacy">>, [BlockNumber], [Bucket, Key]),
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber),
StartTime = os:timestamp(),
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}],
Object = case UseProxyGet of
false ->
LocalTimeout = riak_cs_config:get_block_timeout(),
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, LocalTimeout);
true ->
RemoteTimeout = riak_cs_config:proxy_get_block_timeout(),
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions, RemoteTimeout)
end,
ChunkValue = case Object of
{ok, RiakObject} ->
{ok, riakc_obj:get_value(RiakObject)};
{error, notfound}=NotFound ->
NotFound
end,
ok = riak_cs_stats:update_with_start([block, get], StartTime),
GetOptions = r_one_options(),
ChunkValue =
case UseProxyGet of
false ->
LocalTimeout = riak_cs_config:get_block_timeout(),
StatsKey = [riakc, get_block_legacy],
get_block_local(block_pbc(RcPid), FullBucket, FullKey, GetOptions,
LocalTimeout, StatsKey);
true ->
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions,
[riakc, get_block_legacy_remote])
end,
ok = riak_cs_get_fsm:chunk(ReplyPid, {UUID, BlockNumber}, ChunkValue),
dt_return(<<"get_block">>, [BlockNumber], [Bucket, Key]).
dt_return(<<"get_block_legacy">>, [BlockNumber], [Bucket, Key]).

delete_block(RcPid, ReplyPid, RiakObject, BlockId) ->
Result = constrained_delete(RcPid, RiakObject, BlockId),
Expand All @@ -400,14 +393,17 @@ delete_block(RcPid, ReplyPid, RiakObject, BlockId) ->

constrained_delete(RcPid, RiakObject, BlockId) ->
DeleteOptions = [{r, all}, {pr, all}, {w, all}, {pw, all}],
StatsKey = [riakc, delete_block_constrained],
Timeout = riak_cs_config:delete_block_timeout(),
format_delete_result(
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions, Timeout),
BlockId).
riak_cs_pbc:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions,
Timeout, StatsKey),
BlockId).

secondary_delete_check({error, {unsatisfied_constraint, _, _}}, RcPid, RiakObject) ->
Timeout = riak_cs_config:delete_block_timeout(),
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout);
StatsKey = [riakc, delete_block_secondary],
riak_cs_pbc:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout, StatsKey);
secondary_delete_check({error, Reason} = E, _, _) ->
_ = lager:warning("Constrained block deletion failed. Reason: ~p", [Reason]),
E;
Expand Down Expand Up @@ -497,7 +493,7 @@ find_md_usermeta(MD) ->
{ok, binary()} | {error, notfound}.
resolve_block_object(RObj, RcPid) ->
{{MD, Value}, NeedRepair} =
riak_cs_utils:resolve_robj_siblings(riakc_obj:get_contents(RObj)),
riak_cs_utils:resolve_robj_siblings(riakc_obj:get_contents(RObj)),
_ = if NeedRepair andalso is_binary(Value) ->
RBucket = riakc_obj:bucket(RObj),
RKey = riakc_obj:key(RObj),
Expand All @@ -513,13 +509,12 @@ resolve_block_object(RObj, RcPid) ->
Bucket = proplists:get_value(<<?USERMETA_BUCKET>>, S3Info),
Key = proplists:get_value(<<?USERMETA_KEY>>, S3Info),
VClock = riakc_obj:vclock(RObj),
FailFun =
fun(Error) ->
_ = lager:error("Put S3 ~p ~p Riak ~p ~p failed: ~p\n",
[Bucket, Key, RBucket, RKey, Error])
FailFun = fun(Error) ->
_ = lager:error("Put S3 ~p ~p Riak ~p ~p failed: ~p\n",
[Bucket, Key, RBucket, RKey, Error])
end,
do_put_block(RBucket, RKey, VClock, Value, MD, RcPid,
FailFun);
[riakc, put_block_resolved], FailFun);
NeedRepair andalso not is_binary(Value) ->
_ = lager:error("All checksums fail: ~P\n", [RObj, 200]);
true ->
Expand All @@ -534,15 +529,13 @@ resolve_block_object(RObj, RcPid) ->
make_md_usermeta(Props) ->
dict:from_list([{?MD_USERMETA, Props}]).

do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, FailFun) ->
do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, StatsKey, FailFun) ->
RiakObject0 = riakc_obj:new(FullBucket, FullKey, Value),
RiakObject = riakc_obj:set_vclock(
riakc_obj:update_metadata(RiakObject0, MD), VClock),
Timeout = riak_cs_config:put_block_timeout(),
StartTime = os:timestamp(),
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject, Timeout) of
case riak_cs_pbc:put(block_pbc(RcPid), RiakObject, Timeout, StatsKey) of
ok ->
ok = riak_cs_stats:update_with_start([block, put], StartTime),
ok;
Else ->
_ = FailFun(Else),
Expand Down
Loading

0 comments on commit 71b0916

Please sign in to comment.