"consistent" map/reduce

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

"consistent" map/reduce

Kresten Krab Thorup
Hi,

For Riak Mobile, I occasionally need to use a M/R job to scan all key/values in a bucket and compute a content-hash for each object in a bucket.  This works fine, but ...

I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 semantics" for an "N=3 bucket".  Maybe other people have the same need, but I can't see if this is possible ... perhaps with the new riak_pipe infrastructure?

This is my idea:

The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but needs to run on *all* replicas of objects in a given Bucket.   Hash is the real value I'm interested in i.e., the content-hash for the object; but it could be some other "map" function output.

Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} tuples, by considering the VectorClocks to determine if results are in "conflict", or if one is before/after the other.  N is reduced to the sum of all elements with equal Hash value.

For each output of the reduce phase I'll then have, for each key, a list of {VC,N,Hash}.  If one of those N values are >= quorum, then I have a consistent output value (Hash).

Questions:

- How can I have a M/R job run on *all* vnodes?  Not just for objects that are owned by a primary?

- The M/R "input" is essentially  listkeys(Bucket)  ... can this be done using "async keylisting", so that the operation does not hold up the vnode while listing?

If someone can sketch a solution, I'd be happy to go hacking on it ...  

Kresten



Mobile: + 45 2343 4626 | Skype: krestenkrabthorup | Twitter: @drkrab
Trifork A/S  |  Margrethepladsen 4  | DK- 8000 Aarhus C |  Phone : +45 8732 8787  |  www.trifork.com
 


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

bryan-basho
Administrator
On Mon, Nov 21, 2011 at 2:46 PM, Kresten Krab Thorup <[hidden email]> wrote:
> I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 semantics" for an "N=3 bucket".  Maybe other people have the same need, but I can't see if this is possible ... perhaps with the new riak_pipe infrastructure?

Hi, Kresten.  Indeed, this same topic came up in an independent
conversation last week.  I think there are a few ways to attack it.
Let's start with yours:

> The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but needs to run on *all* replicas of objects in a given Bucket.   Hash is the real value I'm interested in i.e., the content-hash for the object; but it could be some other "map" function output.
>
> Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} tuples, by considering the VectorClocks to determine if results are in "conflict", or if one is before/after the other.  N is reduced to the sum of all elements with equal Hash value.

I like many of the ideas in this approach.  It has a nice distributed
data-provenance feel to it.  I think the danger lies in the work that
the reduce phase would have to do.  With distributed parallel
keylisting, there's no way to guarantee the order that the map results
arrive at the reduce processor.  This means that the reduce state may
become quite large tracking all of the key/version pairs
produced-but-not-yet-satisfying-R.  Maybe this is manageable, though,
so I'll also try to answer your questions:

> - How can I have a M/R job run on *all* vnodes?  Not just for objects that are owned by a primary?

The only way to do this right now is to use Riak Pipe directly.  Setup
a pipe with your "map" and "reduce" fittings, then send inputs to it
such that one input hashes to each vnode.  Using riak_pipe_qcover_fsm
with N=1 might ease this process.

> - The M/R "input" is essentially  listkeys(Bucket)  ... can this be done using "async keylisting", so that the operation does not hold up the vnode while listing?

Yes, absolutely.  The riak_kv_pipe_listkeys module does just this, and
is also an example of using riak_pipe_qcover_fsm.

These two questions and answers lead to the basic pipe layout of:

[
 {module= riak_kv_pipe_listkeys}, %% setup with qcover N=1

 {module= riak_kv_pipe_get,
  chashfun= follow},              %% each vnode processes keys it produces

 {module= riak_kv_mrc_map,
  chashfun= follow},

 {module= riak_kv_w_reduce,
  chashfun= ContstantOrCustom}    %% see below
]

Riak KV reduce fittings are normally set up with a constant chashfun,
such that *all* results are processed in one place.  To help alleviate
the reduce-state-size problem, I might suggest using a chashfun that
spreads results, yet makes sure all results for each key end up at the
same reducer (most likely, hash the result's key, just as you would
for determining its KV preflist).

Note also that the "get" fitting has a 'follow' chashfun.  This is
also different from normal MR usage, since we would normally set N=3
(or whatever the bucket has set) for the qcover-ing listkeys fitting.
The normal setting ensures that each key is produced only once, but
N=1 will produce the same key multiple times for buckets where N>1.
You want each result processed locally ('follow') to get the
possibly-different vclock/hash stored at that vnode.

It may also be possible to take a completely different approach.  A
simple modification of riak_kv_pipe_get could allow it to attempt to
read all N replicas, perhaps even by simply starting a
riak_kv_get_fsm.  In this case, all of the merging of vclocks would
happen before the mapping instead of after.  But, it would also miss
keys that were not fully replicated, since you'd likely want to
maintain N=3 for the keylisting qcover operation.  I also haven't put
as much thought into this path, so there may be other demons lurking.

> If someone can sketch a solution, I'd be happy to go hacking on it ...

Hopefully that's enough sketching to at least generate a second round
of questions.  ;)  I'd be very interested in hearing how it goes.
Please fire back with anything that needs more explanation.

-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

Kresten Krab Thorup
Super. Thanks. I'll have to play around with it a bit and get back to you.

Kresten

On 28/11/2011, at 16.17, "Bryan Fink" <[hidden email]> wrote:

> On Mon, Nov 21, 2011 at 2:46 PM, Kresten Krab Thorup <[hidden email]> wrote:
>> I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 semantics" for an "N=3 bucket".  Maybe other people have the same need, but I can't see if this is possible ... perhaps with the new riak_pipe infrastructure?
>
> Hi, Kresten.  Indeed, this same topic came up in an independent
> conversation last week.  I think there are a few ways to attack it.
> Let's start with yours:
>
>> The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but needs to run on *all* replicas of objects in a given Bucket.   Hash is the real value I'm interested in i.e., the content-hash for the object; but it could be some other "map" function output.
>>
>> Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} tuples, by considering the VectorClocks to determine if results are in "conflict", or if one is before/after the other.  N is reduced to the sum of all elements with equal Hash value.
>
> I like many of the ideas in this approach.  It has a nice distributed
> data-provenance feel to it.  I think the danger lies in the work that
> the reduce phase would have to do.  With distributed parallel
> keylisting, there's no way to guarantee the order that the map results
> arrive at the reduce processor.  This means that the reduce state may
> become quite large tracking all of the key/version pairs
> produced-but-not-yet-satisfying-R.  Maybe this is manageable, though,
> so I'll also try to answer your questions:
>
>> - How can I have a M/R job run on *all* vnodes?  Not just for objects that are owned by a primary?
>
> The only way to do this right now is to use Riak Pipe directly.  Setup
> a pipe with your "map" and "reduce" fittings, then send inputs to it
> such that one input hashes to each vnode.  Using riak_pipe_qcover_fsm
> with N=1 might ease this process.
>
>> - The M/R "input" is essentially  listkeys(Bucket)  ... can this be done using "async keylisting", so that the operation does not hold up the vnode while listing?
>
> Yes, absolutely.  The riak_kv_pipe_listkeys module does just this, and
> is also an example of using riak_pipe_qcover_fsm.
>
> These two questions and answers lead to the basic pipe layout of:
>
> [
> {module= riak_kv_pipe_listkeys}, %% setup with qcover N=1
>
> {module= riak_kv_pipe_get,
>  chashfun= follow},              %% each vnode processes keys it produces
>
> {module= riak_kv_mrc_map,
>  chashfun= follow},
>
> {module= riak_kv_w_reduce,
>  chashfun= ContstantOrCustom}    %% see below
> ]
>
> Riak KV reduce fittings are normally set up with a constant chashfun,
> such that *all* results are processed in one place.  To help alleviate
> the reduce-state-size problem, I might suggest using a chashfun that
> spreads results, yet makes sure all results for each key end up at the
> same reducer (most likely, hash the result's key, just as you would
> for determining its KV preflist).
>
> Note also that the "get" fitting has a 'follow' chashfun.  This is
> also different from normal MR usage, since we would normally set N=3
> (or whatever the bucket has set) for the qcover-ing listkeys fitting.
> The normal setting ensures that each key is produced only once, but
> N=1 will produce the same key multiple times for buckets where N>1.
> You want each result processed locally ('follow') to get the
> possibly-different vclock/hash stored at that vnode.
>
> It may also be possible to take a completely different approach.  A
> simple modification of riak_kv_pipe_get could allow it to attempt to
> read all N replicas, perhaps even by simply starting a
> riak_kv_get_fsm.  In this case, all of the merging of vclocks would
> happen before the mapping instead of after.  But, it would also miss
> keys that were not fully replicated, since you'd likely want to
> maintain N=3 for the keylisting qcover operation.  I also haven't put
> as much thought into this path, so there may be other demons lurking.
>
>> If someone can sketch a solution, I'd be happy to go hacking on it ...
>
> Hopefully that's enough sketching to at least generate a second round
> of questions.  ;)  I'd be very interested in hearing how it goes.
> Please fire back with anything that needs more explanation.
>
> -Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

Kresten Krab Thorup
In reply to this post by bryan-basho
Hi Bryan,

now I've implemented the bulk of this, and obviously have some follow-up questions:

1. How do I create the initial inputs? i.e. the list of all {Index, Node} pairs that go into the riak_kv_pipe_listkeys fitting.  Does this fitting need a special chashfun to send it to the right vnode?

2. Given such a pipeline-based thingie installed as .beam files with Riak, is there a way to "invoke" it via the HTTP M/R API?   It would be great if I don't have to poke a new whole through tcp/ip to exploit pipe.

3. Does riak_pipe run a fitting instance per node, or per vnode?

FYI, ... I'm considering doing a disk-based version of riak_pipe_w_reduce, which keeps the intermediate results in a local K/V store, in order to support large keysets.    This could just be a bitcask w/merge disabled.   Re-implementing the reducer would also allow us to evaluate the N>R condition in the reducer, and emit results as early as possible.


Kresten

Mobile: + 45 2343 4626 | Skype: krestenkrabthorup | Twitter: @drkrab
Trifork A/S  |  Margrethepladsen 4  | DK- 8000 Aarhus C |  Phone : +45 8732 8787  |  www.trifork.com
 


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

bryan-basho
Administrator
On Wed, Nov 30, 2011 at 8:36 AM, Kresten Krab Thorup
<[hidden email]> wrote:> 1. How do I create the initial inputs? i.e.
the list of all {Index, Node} pairs that go into the
riak_kv_pipe_listkeys fitting.  Does this fitting need a special
chashfun to send it to the right vnode?
The easiest way to get the "go" message to all vnodes is to use the
riak_pipe_qcover_* modules, as is done here:
https://github.com/basho/riak_kv/blob/master/src/riak_kv_pipe_listkeys.erl#L135
Most of your arguments will be the same as is seen there:    [{raw,
 %% used send (!) for done, not gen_*/luke/etc. calls      ReqId,   %%
a unique integer for the request      self()}, %% send done and errors
to this process     [LKP,     %% the pipe with listkeys as its head
fitting      Bucket,  %% the name of the bucket to list      NVal]]
%% the N to use in coverage calculation, 1 in your case
It would be difficult to do the same with a special chashfun, since
the input that listkeys expects does not contain anything naming the
vnode for your chashfun to work with (every vnode gets the same
input).
> 2. Given such a pipeline-based thingie installed as .beam files with Riak, is there a way to "invoke" it via the HTTP M/R API?   It would be great if I don't have to poke a new whole through tcp/ip to exploit pipe.
Sadly, no, there is not a way to use this special setup over the HTTP
MR API at this time.  I have been working on some designs for external
pipe control, but they are not yet ready for implementation.  So, yes,
you'll have to poke a new whole through for now.
> 3. Does riak_pipe run a fitting instance per node, or per vnode?
Pipe runs a fitting worker per vnode.  If you have ideas for how to
make that clearer in riak_pipe's README.org, please pass them along!
> FYI, ... I'm considering doing a disk-based version of riak_pipe_w_reduce, which keeps the intermediate results in a local K/V store, in order to support large keysets.    This could just be a bitcask w/merge disabled.   Re-implementing the reducer would also allow us to evaluate the N>R condition in the reducer, and emit results as early as possible.
Sounds awesome and right on track, I must say.
-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

bryan-basho
Administrator
Ew.  Major apologies for the formatting in my last response.  I've no
idea what happened.  Let me know if it's unreadable, and I'll try
sending again.

-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

Kresten Krab Thorup
Thanks for your answers.

Now, it seems that if the items passing between the map phase and reduce phase are many and small, then the system could benefit from a "chunking" fitting that collects items and sends them off in a list to the next fitting, after receiving X # of items, or after some timeout (say, Y ms); i.e. basically Nagle's algorithm  Otherwise, we may get burned pretty seriously by the "sync send" that happens between fittings !?

So ...

{
   list_keys,  
   riak_kv_get, follow
   riak_kv_mrc_map, follow
   nagles_send, follow  

   nagles_receive, riak_pipe_w_reduce:chashfun
   riak_pipe_w_reduce, follow
}



   

On Nov 30, 2011, at 4:08 PM, Bryan Fink wrote:

> Ew.  Major apologies for the formatting in my last response.  I've no
> idea what happened.  Let me know if it's unreadable, and I'll try
> sending again.
>
> -Bryan


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: "consistent" map/reduce

bryan-basho
Administrator
On Wed, Nov 30, 2011 at 4:56 PM, Kresten Krab Thorup
<[hidden email]> wrote:> Now, it seems that if the items passing
between the map phase and reduce phase are many and small, then the
system could benefit from a "chunking" fitting that collects items and
sends them off in a list to the next fitting, after receiving X # of
items, or after some timeout (say, Y ms); i.e. basically Nagle's
algorithm  Otherwise, we may get burned pretty seriously by the "sync
send" that happens between fittings !?
Indeed, a synchronous send for each object could be a real drag.
Achunking fitting may help, if it can alleviate any contention,
thoughyou'll still see a sync send per object inbound to that fitting.
 Theworker behavior doesn't support timeout yet, so it will have to
bebased on number of items received for now.
I have two outstanding todos that may also help.  The first isallowing
a worker to request all inputs in its queue, instead of justthe next
one, from its vnode.  The second is a bulk-sending request,where a
fitting could say "send these 10 outputs" but a vnode mightreply back
"I only queued the first 5" (similar to the common iopattern "write N
bytes"=3D>"N-M bytes written").  They each have acouple of tricky
corner cases, and need benchmarking anyway, so I'mnot sure when
they'll make mainline yet.
-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com