Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin
We have 6 node riak cluster.I simple custom erlang application for custom MapReduce job.

We start MapReduce job using riak_kv_mrc_pipe pipe module,for example -

Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
riak_kv_mrc_pipe:mapred({index,Bucket,Field,From,To},Query,Timeout).

But if one of the node down for along time. Response is unpredictable sometimes it's return {ok,GoodResultList}, but sometimes {ok,[]}(empty list).
We trace riak_kv and riak_pipe and found too problem:
1. In riak_kv_pipe_index or in riak_kv_pipe_liskeys created fitting_spec this nval always is 1.
2. Actual error is occurred in riak_pipe_vnode:remaining_preflist that retun empty PrefList for some Hash(#fitting_spec.nval is 1). It use riak_core_apl:get_primary_apl function.

But if we use old style map reduce,for example:
        {ok,C} = riak:local_client(),
         Me = self(),
        Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
        {ok, {ReqId,FlowPid}} = C:mapred_stream(Query,Me,Timeout),
        {ok,_}=riak_kv_index_fsm_sup:start_index_fsm(zont_riak_connection:riak_node(), [{raw, ReqId,FlowPid}, [Bucket, none,{range,Field,From,To},Timeout,mapred]]),
        luke_flow:collect_output(ReqId, Timeout).

Query executed well. But problem is that do_prereduce and {reduce_phase_batch_size, 1000} is ignored,that why execution is slow.


Can you make some recommendation? May be riak_pipe_vnode:remaining_preflist we need use riak_core_apl:get_apl_ann or set #fitting_spec.nval to nval from out Bucket props?

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

John Daily
Riak's MapReduce functionality cannot survive a node failure. If a vnode involved with a query fails while actively processing the request, the entire query will have to be re-run. The failed query should be automatically terminated, but you'll have to re-run the query yourself.

If you create queries using Riak Pipe (the technology layer beneath MapReduce), it is possible to create queries that can survive a vnode failure, but that is not a trivial exercise.

Regarding the empty result set you're seeing: one possibility is that a vnode has failed recently and has come back online without data. MapReduce will not currently trigger a read repair, but that problem should be resolved with the forthcoming Riak 1.3 release.

-John Daily
Technical Evangelist
Basho

On Jan 30, 2013, at 8:05 AM, [hidden email] wrote:

> We have 6 node riak cluster.I simple custom erlang application for custom MapReduce job.
>
> We start MapReduce job using riak_kv_mrc_pipe pipe module,for example -
>
> Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
> riak_kv_mrc_pipe:mapred({index,Bucket,Field,From,To},Query,Timeout).
>
> But if one of the node down for along time. Response is unpredictable sometimes it's return {ok,GoodResultList}, but sometimes {ok,[]}(empty list).
> We trace riak_kv and riak_pipe and found too problem:
> 1. In riak_kv_pipe_index or in riak_kv_pipe_liskeys created fitting_spec this nval always is 1.
> 2. Actual error is occurred in riak_pipe_vnode:remaining_preflist that retun empty PrefList for some Hash(#fitting_spec.nval is 1). It use riak_core_apl:get_primary_apl function.
>
> But if we use old style map reduce,for example:
>        {ok,C} = riak:local_client(),
> Me = self(),
>        Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
> {ok, {ReqId,FlowPid}} = C:mapred_stream(Query,Me,Timeout),
> {ok,_}=riak_kv_index_fsm_sup:start_index_fsm(zont_riak_connection:riak_node(), [{raw, ReqId,FlowPid}, [Bucket, none,{range,Field,From,To},Timeout,mapred]]),
> luke_flow:collect_output(ReqId, Timeout).
>
> Query executed well. But problem is that do_prereduce and {reduce_phase_batch_size, 1000} is ignored,that why execution is slow.
>
>
> Can you make some recommendation? May be riak_pipe_vnode:remaining_preflist we need use riak_core_apl:get_apl_ann or set #fitting_spec.nval to nval from out Bucket props?
>
> _______________________________________________
> riak-users mailing list
> [hidden email]
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin
Sorry John. You don't understand my question.
1. One node(I mean physical(erlang) node) in cluster is down.
2. It was down when i'm start job,when perform job and after it. We power off this node. It's under repair. But we don't remove this node from cluster.
3. All data that must be processed available. On primary and fallback vnodes started on other physical(erlang) node. We don't need read repair for this data.(As you see old style MapReduce work fine,it's use luke module)
4. If something fail I want receive error,but actually last reduce phase  call this empty list,and after that return empty list. For example 4 times of 5 i'am receive good result that contains all data,
but 1 of 5 i'm receive empty result.
5. I don't see any vnode fails during MapReduce task.

Thank you.

PS. I prepare simple test script later.

----- Исходное сообщение -----
От: "John Daily" <[hidden email]>
Кому: [hidden email]
Копия: [hidden email]
Отправленные: Четверг, 31 Январь 2013 г 1:58:38
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

Riak's MapReduce functionality cannot survive a node failure. If a vnode involved with a query fails while actively processing the request, the entire query will have to be re-run. The failed query should be automatically terminated, but you'll have to re-run the query yourself.

If you create queries using Riak Pipe (the technology layer beneath MapReduce), it is possible to create queries that can survive a vnode failure, but that is not a trivial exercise.

Regarding the empty result set you're seeing: one possibility is that a vnode has failed recently and has come back online without data. MapReduce will not currently trigger a read repair, but that problem should be resolved with the forthcoming Riak 1.3 release.

-John Daily
Technical Evangelist
Basho

On Jan 30, 2013, at 8:05 AM, [hidden email] wrote:

> We have 6 node riak cluster.I simple custom erlang application for custom MapReduce job.
>
> We start MapReduce job using riak_kv_mrc_pipe pipe module,for example -
>
> Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
> riak_kv_mrc_pipe:mapred({index,Bucket,Field,From,To},Query,Timeout).
>
> But if one of the node down for along time. Response is unpredictable sometimes it's return {ok,GoodResultList}, but sometimes {ok,[]}(empty list).
> We trace riak_kv and riak_pipe and found too problem:
> 1. In riak_kv_pipe_index or in riak_kv_pipe_liskeys created fitting_spec this nval always is 1.
> 2. Actual error is occurred in riak_pipe_vnode:remaining_preflist that retun empty PrefList for some Hash(#fitting_spec.nval is 1). It use riak_core_apl:get_primary_apl function.
>
> But if we use old style map reduce,for example:
>        {ok,C} = riak:local_client(),
> Me = self(),
>        Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
> {ok, {ReqId,FlowPid}} = C:mapred_stream(Query,Me,Timeout),
> {ok,_}=riak_kv_index_fsm_sup:start_index_fsm(zont_riak_connection:riak_node(), [{raw, ReqId,FlowPid}, [Bucket, none,{range,Field,From,To},Timeout,mapred]]),
> luke_flow:collect_output(ReqId, Timeout).
>
> Query executed well. But problem is that do_prereduce and {reduce_phase_batch_size, 1000} is ignored,that why execution is slow.
>
>
> Can you make some recommendation? May be riak_pipe_vnode:remaining_preflist we need use riak_core_apl:get_apl_ann or set #fitting_spec.nval to nval from out Bucket props?
>
> _______________________________________________
> riak-users mailing list
> [hidden email]
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

bryan-basho
Administrator
On Thu, Jan 31, 2013 at 6:07 AM,  <[hidden email]> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin
Thank your for response.
1. Riak 1.2. I'm clone it form github master branch some times ago.
2. 6 nodes in out test environment.
3. more than 100 millions.(100 thousands per day)
4. near 100 thousands (index by date).

I'm prepare simple test module and test scenario. It must help you to help me:)

1. Generate four riak node.
[xx riak]$ make devrel
2. Start all nodes and join it to cluster.
[xx dev]$ dev1/bin/riak-admin ringready
TRUE All nodes agree on the ring ['dev1@127.0.0.1','dev2@127.0.0.1',
                                  'dev3@127.0.0.1','dev4@127.0.0.1']

3. Kill fourth node.
[xx dev]$ dev4/bin/riak stop

[xx dev]$ dev1/bin/riak-admin ringready
FALSE ['dev4@127.0.0.1'] down.  All nodes need to be up to check.

4. Compile my test module and include it in code path on all started nodes(I'm include it in riak_kv before compiling riak).

5. Join to erlang console on node dev1.
[xx dev]$ dev1/bin/riak attach

6. Test

Generate test dataset (50 objects).
(dev1@127.0.0.1)1> mapred_test:make_data(50).
ok

Check that all data available.
(dev1@127.0.0.1)7> mapred_test:check_data(50).
{not_available,0}
Ok. All data saved.
Check that all data available 50 times.
(dev1@127.0.0.1)8> mapred_test:check_data_n(50,50).
ok
Ok. All data really saved.

Try Count object count this MapReduce.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,0}
Ok.
....Repeat 7 times.
Attempts #8.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,50}
!!!Fail!!!MapReduce actualy return {ok,[]}.

Try generate some statistics. Repeat MapReduce task 50 times.
(dev1@127.0.0.1)29> mapred_test:pipe_mapreduce_check_n(50,50).
ok count 39
failed count 11.

Every 1 of 5 task failed.

Code of mapred_test module:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mapred_test).


-export([map/3,
                 reduce/2,
                 make_data/1,
                 check_data_n/2,
                 check_data/1,
                 pipe_mapreduce_check/1,
                 pipe_mapreduce_check_n/2]).

%%map for counter
map({error,notfound},_,_)->
        [];
map(_,_,_)->
        [1].

%%simle sum.
reduce(L,_)->
        [lists:foldl(fun(I,Acc)->I+Acc end,0,L)].

%%Generate data set for test.
make_data(Count)->
        {ok,Conn} = riak:local_client(),
        make_data_loop(Count,Conn).
make_data_loop(0,_)->
        ok;
make_data_loop(Count,Conn)->
        Key = list_to_binary(integer_to_list(Count)),
        RObj = riak_object:new(<<"mapred_test">>,Key,1),
        case Conn:put(RObj) of
                ok->
                        make_data_loop(Count-1,Conn);
                Else->
                        {error,Else}
        end.

       
check_data(Count)->
        {ok,Conn} = riak:local_client(),
        {not_available,check_data_loop(Count,Conn,0)}.

check_data_loop(0,_Conn,Acc)->
        Acc;
check_data_loop(Count,Conn,Acc)->
        Key = list_to_binary(integer_to_list(Count)),
        case Conn:get(<<"mapred_test">>,Key) of
                {ok,_}->
                        check_data_loop(Count-1,Conn,Acc);
                _->
                        check_data_loop(Count-1,Conn,Acc+1)
        end.

check_data_n(_Count,0)->
        ok;
check_data_n(Count,N)->
        case check_data(Count) of
                {not_available,0}->
                        check_data_n(Count,N-1);
                Else->
                        Else
        end.

pipe_mapreduce_check(Count)->
        Query = [{map, {modfun,?MODULE,map},[do_prereduce,none],false},
                         {reduce, {modfun,?MODULE,reduce},[{reduce_phase_batch_size, 1000}], true}],
        case riak_kv_mrc_pipe:mapred(<<"mapred_test">>,Query,60000) of
                {ok,[I]} when is_integer(I)->
                        {not_available,Count-I};
                {ok,[]}->
                        {not_available,Count};
                Else->
                        Else
        end.

pipe_mapreduce_check_n(Count,N)->
        {Good,Bad}=pipe_mapreduce_check_n(Count,N,{0,0}),
        io:format("ok count ~p ~nfailed count ~p~n",[Good,Bad]).

pipe_mapreduce_check_n(_Count,0,Acc)->
        Acc;
pipe_mapreduce_check_n(Count,N,{Good,Bad})->
        case pipe_mapreduce_check(Count) of
                {not_available,0}->
                        pipe_mapreduce_check_n(Count,N-1,{Good+1,Bad});
                {not_available,_}->
                        pipe_mapreduce_check_n(Count,N-1,{Good,Bad+1});
                Else->
                        {0,{runtime_error,Else}}
        end.


%%%%%%%%%%%%%%%%%%%%%%

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: "Bryan Fink" <[hidden email]>
Кому: [hidden email]
Копия: "John Daily" <[hidden email]>, "Riak-Users" <[hidden email]>
Отправленные: Четверг, 31 Январь 2013 г 17:03:09
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

On Thu, Jan 31, 2013 at 6:07 AM,  <[hidden email]> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin
Hello Bryan.
I'm detect problem.

Problem is in reduce phase.

1. See riak_kv_mrc_pipe:mr2pipe_phases implementation. It convert MapReduce job spec to riak_pipe spec.
In this fun created ConstHashCookie as  Now = now(), and use it as chashfun value for fitting  in reduce phase.
This generated value actually used in riak_kv_w_reduce:done function, you try make prereduce not reduced data and send to output.
But output vnode in that case is
preflist for ConstHashCookie,i.e. some random value and n_val for this phase is always 1, that why sometimes calculated perflist for this phase is empty.

Do you have any suggestion how we can fix it?

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: [hidden email]
Кому: "Bryan Fink" <[hidden email]>
Копия: "Riak-Users" <[hidden email]>
Отправленные: Четверг, 31 Январь 2013 г 17:34:34
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

Thank your for response.
1. Riak 1.2. I'm clone it form github master branch some times ago.
2. 6 nodes in out test environment.
3. more than 100 millions.(100 thousands per day)
4. near 100 thousands (index by date).

I'm prepare simple test module and test scenario. It must help you to help me:)

1. Generate four riak node.
[xx riak]$ make devrel
2. Start all nodes and join it to cluster.
[xx dev]$ dev1/bin/riak-admin ringready
TRUE All nodes agree on the ring ['dev1@127.0.0.1','dev2@127.0.0.1',
                                  'dev3@127.0.0.1','dev4@127.0.0.1']

3. Kill fourth node.
[xx dev]$ dev4/bin/riak stop

[xx dev]$ dev1/bin/riak-admin ringready
FALSE ['dev4@127.0.0.1'] down.  All nodes need to be up to check.

4. Compile my test module and include it in code path on all started nodes(I'm include it in riak_kv before compiling riak).

5. Join to erlang console on node dev1.
[xx dev]$ dev1/bin/riak attach

6. Test

Generate test dataset (50 objects).
(dev1@127.0.0.1)1> mapred_test:make_data(50).
ok

Check that all data available.
(dev1@127.0.0.1)7> mapred_test:check_data(50).
{not_available,0}
Ok. All data saved.
Check that all data available 50 times.
(dev1@127.0.0.1)8> mapred_test:check_data_n(50,50).
ok
Ok. All data really saved.

Try Count object count this MapReduce.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,0}
Ok.
....Repeat 7 times.
Attempts #8.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,50}
!!!Fail!!!MapReduce actualy return {ok,[]}.

Try generate some statistics. Repeat MapReduce task 50 times.
(dev1@127.0.0.1)29> mapred_test:pipe_mapreduce_check_n(50,50).
ok count 39
failed count 11.

Every 1 of 5 task failed.

Code of mapred_test module:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mapred_test).


-export([map/3,
                 reduce/2,
                 make_data/1,
                 check_data_n/2,
                 check_data/1,
                 pipe_mapreduce_check/1,
                 pipe_mapreduce_check_n/2]).

%%map for counter
map({error,notfound},_,_)->
        [];
map(_,_,_)->
        [1].

%%simle sum.
reduce(L,_)->
        [lists:foldl(fun(I,Acc)->I+Acc end,0,L)].

%%Generate data set for test.
make_data(Count)->
        {ok,Conn} = riak:local_client(),
        make_data_loop(Count,Conn).
make_data_loop(0,_)->
        ok;
make_data_loop(Count,Conn)->
        Key = list_to_binary(integer_to_list(Count)),
        RObj = riak_object:new(<<"mapred_test">>,Key,1),
        case Conn:put(RObj) of
                ok->
                        make_data_loop(Count-1,Conn);
                Else->
                        {error,Else}
        end.

       
check_data(Count)->
        {ok,Conn} = riak:local_client(),
        {not_available,check_data_loop(Count,Conn,0)}.

check_data_loop(0,_Conn,Acc)->
        Acc;
check_data_loop(Count,Conn,Acc)->
        Key = list_to_binary(integer_to_list(Count)),
        case Conn:get(<<"mapred_test">>,Key) of
                {ok,_}->
                        check_data_loop(Count-1,Conn,Acc);
                _->
                        check_data_loop(Count-1,Conn,Acc+1)
        end.

check_data_n(_Count,0)->
        ok;
check_data_n(Count,N)->
        case check_data(Count) of
                {not_available,0}->
                        check_data_n(Count,N-1);
                Else->
                        Else
        end.

pipe_mapreduce_check(Count)->
        Query = [{map, {modfun,?MODULE,map},[do_prereduce,none],false},
                         {reduce, {modfun,?MODULE,reduce},[{reduce_phase_batch_size, 1000}], true}],
        case riak_kv_mrc_pipe:mapred(<<"mapred_test">>,Query,60000) of
                {ok,[I]} when is_integer(I)->
                        {not_available,Count-I};
                {ok,[]}->
                        {not_available,Count};
                Else->
                        Else
        end.

pipe_mapreduce_check_n(Count,N)->
        {Good,Bad}=pipe_mapreduce_check_n(Count,N,{0,0}),
        io:format("ok count ~p ~nfailed count ~p~n",[Good,Bad]).

pipe_mapreduce_check_n(_Count,0,Acc)->
        Acc;
pipe_mapreduce_check_n(Count,N,{Good,Bad})->
        case pipe_mapreduce_check(Count) of
                {not_available,0}->
                        pipe_mapreduce_check_n(Count,N-1,{Good+1,Bad});
                {not_available,_}->
                        pipe_mapreduce_check_n(Count,N-1,{Good,Bad+1});
                Else->
                        {0,{runtime_error,Else}}
        end.


%%%%%%%%%%%%%%%%%%%%%%

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: "Bryan Fink" <[hidden email]>
Кому: [hidden email]
Копия: "John Daily" <[hidden email]>, "Riak-Users" <[hidden email]>
Отправленные: Четверг, 31 Январь 2013 г 17:03:09
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

On Thu, Jan 31, 2013 at 6:07 AM,  <[hidden email]> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin
I'm try fix this bug.
1. Add fucntion for  generating ConsHashCookie in riak_kv_mrc_pipe
const_hash_cookie()->
        Random = chash:key_of(now()),
        {ok, Ring} = riak_core_ring_manager:get_my_ring(),
        NodeCount = length(riak_core_ring:all_members(Ring)),
        case riak_core_apl:get_primary_apl(Random,NodeCount,riak_pipe) of
                [{{Partition, _Node},_}|_]->
                        riak_pipe_vnode:hash_for_partition(Partition);
                _->
                        Random
        end.
It's taking account of which nodes are up.
2. Remove Hash = chash:key_of(ConstHashCookie) in reduce2pipe function.

After that all work fine.

May be I must write const_hash_cookie more accurate and push this changed to github?

----- Исходное сообщение -----
От: [hidden email]
Кому: "Bryan Fink" <[hidden email]>
Копия: "Riak-Users" <[hidden email]>
Отправленные: Четверг, 31 Январь 2013 г 21:51:21
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

Hello Bryan.
I'm detect problem.

Problem is in reduce phase.

1. See riak_kv_mrc_pipe:mr2pipe_phases implementation. It convert MapReduce job spec to riak_pipe spec.
In this fun created ConstHashCookie as  Now = now(), and use it as chashfun value for fitting  in reduce phase.
This generated value actually used in riak_kv_w_reduce:done function, you try make prereduce not reduced data and send to output.
But output vnode in that case is
preflist for ConstHashCookie,i.e. some random value and n_val for this phase is always 1, that why sometimes calculated perflist for this phase is empty.

Do you have any suggestion how we can fix it?

Thanks,
Alexander Gunin.


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

bryan-basho
Administrator
On Thu, Jan 31, 2013 at 1:46 PM,  <[hidden email]> wrote:
> I'm try fix this bug.
> 1. Add fucntion for  generating ConsHashCookie in riak_kv_mrc_pipe
> ...
> It's taking account of which nodes are up.
> 2. Remove Hash = chash:key_of(ConstHashCookie) in reduce2pipe function.
>
> After that all work fine.

Excellent! That was exactly the hunch I was hinting at. I wasn't using
the prereduce phase, and that is why I couldn't reproduce exactly what
you were seeing.

I like your solution. Please do submit a pull request for it. Thank
you for digging!

-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Loading...