erlang - Riak Commit Hook CRDT 增量计数器

标签 erlang riak

我有一个提交钩子(Hook),我想更新计数器。

目前我有:

-module(bucket_counter).
-export([precommit/1]).


-record(crdt_op, {mod, op, ctx}).
-define(CRDT_OP, #crdt_op).

-define(V1_COUNTER_TYPE, riak_kv_pncounter).

precommit(RObj) ->
    {ok, Client} = riak:local_client(),
    Bucket = {<<"counters">>, <<"update_counts">>},
    Id = <<"all">>,
    Counter = case Client:get(Bucket, Id) of
        {error, notfound} -> riak_object:new(Bucket, Id, 0);
        {ok, Obj} -> Obj
    end,
    io:format("Counter ~p~n", [Counter]),

    {{Context,_},_} = riak_kv_crdt:value(Counter, ?V1_COUNTER_TYPE),

    Op = ?CRDT_OP{mod=?V1_COUNTER_TYPE, op={increment, 1}, ctx=Context},
    Counter2 = riak_kv_crdt:update(Counter, <<"1234">>, Op),

    io:format("Gen ~p~n", [Counter2]),
    %Counter3 = riak_kv_crdt:merge(Counter2),
    Client:put(Counter2),
    RObj.

这会产生错误:

    @riak_kv_put_fsm:decode_precommit:887 Problem invoking pre-commit hook bucket_counter:precommit -> error:function_clause
[{riak_kv_crdt,update_crdt,[[{riak_dt_pncounter,{{dict,5,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[[<<"dot">>|{<<252,97,47,244,1,104,136,26>>,{1,63622971158}}]],[],[],[],[],[],[[<<"content-type">>,97,112,112,108,105,99,97,116,105,111,110,47,114,105,97,107,95,99,111,117,110,116,101,114],[<<"X-Riak-VTag">>,53,87,81,75,79,113,110,77,80,111,117,49,71,101,120,122,52,67,89,115,66,122]],[[<<"index">>]],[],[[<<"X-Riak-Last-Modified">>|{1455,751958,325156}]],[],[]}}},{crdt,riak_dt_pncounter,"application/riak_counter",[{<<252,97,47,244,1,104,136,26>>,1,0}]}}}],<<"1234">>,{crdt_op,riak_kv_pncounter,{increment,1},<<>>}],[{file,"src/riak_kv_crdt.erl"},{line,234}]},{riak_kv_crdt,update,3,[{file,"src/riak_kv_crdt.erl"},{line,61}]},{bucket_counter,precommit,1,[{file,"bucket_counter.erl"},{line,24}]},{riak_kv_put_fsm,invoke_hook,4,[{file,"src/riak_kv_put_fsm.erl"},{line,853}]},{riak_kv_put_fsm,precommit,2,[{file,"src/riak_kv_put_fsm.erl"},{line,503}]},{gen_fsm,handle_msg,7,[{file,"gen_fsm.erl"},{line,505}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]

查看 riak_kv 源代码,看起来合并仅适用于具有上下文的 map 和集合:

https://github.com/basho/riak_kv/blob/master/src/riak_kv_crdt.erl#L246

我尝试删除上下文,但由于 sibling 的原因,它崩溃了:

riak_index:parse_object_hook:116 Siblings not allowed: {r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,49,67,49,85,69,121,66,118,111,84,56,85,118,81,104,104,106,74,101,98,49,71]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,750950,430934}]],[],[]}}},<<69,2,0,0,0,17,114,105,97,107,95,100,116,95,112,110,99,111,117,110,116,101,114,71,2,131,108,0,0,0,1,104,3,109,0,0,0,8,252,97,47,244,156,20,105,128,97,1,97,0,106>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,49,67,49,85,69,121,66,118,111,84,56,85,118,81,104,104,106,74,101,98,49,71]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,750950,430934}]],[],[]}}},<<69,1,71,1,0,0,0,22,70,1,131,108,0,0,0,1,104,2,109,0,0,0,4,49,50,51,52,97,1,106,0,0,0,4,70,1,131,106>>}],[{<<252,97,47,244,156,20,105,128>>,{1,63622970068}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined}

更新 map 给我一个同级错误:

-module(bucket_counter).
-export([precommit/1]).


-record(crdt_op, {mod, op, ctx}).
-define(CRDT_OP, #crdt_op).

-define(V1_COUNTER_TYPE, riak_kv_pncounter).


precommit(RObj) ->
    {ok, Client} = riak:local_client(),
    Bucket = {<<"counters">>, <<"update_counts">>},
    Id = <<"all">>,
    {Counter, Context} = case Client:get(Bucket, Id) of
        {error, notfound} -> 
            Obj = riak_object:new(Bucket, Id, 0),
            {Obj, undefined};
        {ok, Obj} ->
            {{CountContext,_},_} = riak_kv_crdt:value(Obj, riak_dt_map),
            {Obj, CountContext}
    end,
    io:format("Counter ~p~n", [Counter]),

    Op = ?CRDT_OP{mod=riak_dt_map, op={update,[{update,{<<"count">>,riak_dt_emcntr},increment}]}, ctx=Context},
    Counter2 = riak_kv_crdt:update(Counter, <<"1234">>, Op),

    io:format("Gen ~p~n", [Counter2]),
    %Counter3 = riak_kv_crdt:merge(Counter2),
    Client:put(Counter2),
    RObj.



riak_index:parse_object_hook:116 Siblings not allowed: {r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},<<69,2,0,0,0,11,114,105,97,107,95,100,116,95,109,97,112,77,1,131,80,0,0,0,132,120,1,203,96,206,97,96,96,96,204,96,202,5,82,44,134,70,198,38,137,140,89,80,33,136,32,107,114,126,105,94,73,10,3,95,81,102,98,118,124,74,73,124,106,110,114,94,73,81,6,19,138,42,168,86,184,40,22,227,224,66,25,204,137,140,64,200,144,149,149,193,148,5,4,0,198,254,26,118>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},0}],[],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined}
2016-02-18 00:48:25.966 [debug] <0.2105.0>@riak_kv_put_fsm:decode_precommit:880 Pre-commit hook riak_index:parse_object_hook failed with reason {siblings_not_allowed,{r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},<<69,2,0,0,0,11,114,105,97,107,95,100,116,95,109,97,112,77,1,131,80,0,0,0,132,120,1,203,96,206,97,96,96,96,204,96,202,5,82,44,134,70,198,38,137,140,89,80,33,136,32,107,114,126,105,94,73,10,3,95,81,102,98,118,124,74,73,124,106,110,114,94,73,81,6,19,138,42,168,86,184,40,22,227,224,66,25,204,137,140,64,200,144,149,149,193,148,5,4,0,198,254,26,118>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},0}],[],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined}}

最佳答案

我已经弄清楚了,我要脱离 1.4 文档,而需要执行 riak_kv_crdt:new(Bucket, Id, riak_dt_map)

关于erlang - Riak Commit Hook CRDT 增量计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35470449/

相关文章:

erlang - 如何在面向文档的系统中处理经过身份验证的用户对资源的访问?

Erlang ets 插入多个表

sorting - 没有累加器可以写这个吗?

python - Riak - 用于将键值数据加载到 Riak 存储桶中的 Python 脚本

F# WebRequest 抛出 WebException ("protocol error") 而不是 404 状态代码

macos - riak os x 安装

埃尔兰;在通用测试开始时启动应用程序

erlang - 降级 Erlang/OTP 后加载/重新编译 Rebar 模块

ruby - 截断 Riak 数据库

riak - 如何更改 Riak 中现有存储桶类型的存储后端?