Map Reduce and long queries -

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Map Reduce and long queries -

David Montgomery
Hi,

Below is my code for running a map reduce in python.  I have a six
node cluster, 2 cores each with 4 gigs for ram.  I am no load and
about 3 Mill keys and using leveldb with riak 1.2.  Doing  the below
is taking a terribly long time.  Never finished and I dont even know
how I can check if it is even running other than the python script has
not timed out.  I look at the number of executed mappers in stats and
it is flat lined when looking at Graphite.  On test queries the below
works.

So..how do I debug what is going on?


def main():
    client  = riak.RiakClient(host=riak_host,port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
    query = client.add(bucket)
    filters = key_filter.tokenize(":", filter_map['date']) +
(key_filter.starts_with('201210'))
              #&  key_filter.tokenize(":", filter_map['country']).eq("US") \
              #&  key_filter.tokenize(":", filter_map['campaign_id']).eq("t1") \
    query.add_key_filters(filters)

    query.map('''
    function(value, keyData, arg) {
        var data = Riak.mapValuesJson(value)[0];

        if(data['adx']=='gdn'){
            var alt_key = data['hw'];
            var obj = {};
            obj[alt_key] = 1;
            return [ obj ];
        }else{
           return [];
        }


    }''')


    query.reduce('''
    function(values, arg){
        return [ values.reduce( function(acc, item) {
            for (var state in item) {
                if (acc[state])
                    acc[state] += item[state];
                else
                    acc[state] = item[state];
            }
            return acc;
        })];
    }
    ''')

    for result in query.run(timeout=300000):
        print result

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce and long queries -

Adam Lindsay-2
Hi David,

The word everywhere is to avoid key filters. It effectively does a whole-bucket key-listing, and that starts to get seriously slow out past 100k items. Since you say test queries work I'll presume you've debugged your map and reduce on some queries where you manually add a set of keys. (Right?)  

Since you're on LevelDB, it means you can use secondary indices ("2i") to drive these queries.

I don't have access to your filter_map, so I don't have access to how you construct your keys, but if you have 2i turned on, then you get the first key-field "for free" from 2i.

Let's say, hypothetically, that your keys are constructed as:
 keyprefix:<date>:<country>:<campaign_id>

Well, you can then rewrite the query input as:

def main():
    client = riak.RiakClient(host=riak_host,
        port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
    query = client.index(
                    bucket, 
                    '$key', 
                    'keyprefix:201210', 
                    'keyprefix:201210~')
    query.map('''function(value, keyData, arg) { ... }''')
    …


That's fine as far as it goes, but it doesn't solve the problem of querying country or campaign id, right?

As a temporary measure, I'd suggest trying your key filters, cranking up the timeout to something on the order of hours (I gave 5 minutes conservatively and arbitrarily), and going ahead and running it for however long it takes.


If those queries do give good results, I'd suggest going ahead and re-indexing your existing entries with 'country_bin' and 'campaign_bin'. It's up to personal style whether you treat dates as int or bin.

There are lots of tricks and further discussion on how best to get at every corner of your data, but how does this strike you so far?
-- 
Adam Lindsay

On Sunday, 14 October 2012 at 12:57, David Montgomery wrote:

Hi,

Below is my code for running a map reduce in python. I have a six
node cluster, 2 cores each with 4 gigs for ram. I am no load and
about 3 Mill keys and using leveldb with riak 1.2. Doing the below
is taking a terribly long time. Never finished and I dont even know
how I can check if it is even running other than the python script has
not timed out. I look at the number of executed mappers in stats and
it is flat lined when looking at Graphite. On test queries the below
works.

So..how do I debug what is going on?


def main():
client = riak.RiakClient(host=riak_host,port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
query = client.add(bucket)
filters = key_filter.tokenize(":", filter_map['date']) +
(key_filter.starts_with('201210'))
#& key_filter.tokenize(":", filter_map['country']).eq("US") \
#& key_filter.tokenize(":", filter_map['campaign_id']).eq("t1") \
query.add_key_filters(filters)

query.map('''
function(value, keyData, arg) {
var data = Riak.mapValuesJson(value)[0];

if(data['adx']=='gdn'){
var alt_key = data['hw'];
var obj = {};
obj[alt_key] = 1;
return [ obj ];
}else{
return [];
}


}''')


query.reduce('''
function(values, arg){
return [ values.reduce( function(acc, item) {
for (var state in item) {
if (acc[state])
acc[state] += item[state];
else
acc[state] = item[state];
}
return acc;
})];
}
''')

for result in query.run(timeout=300000):
print result

_______________________________________________
riak-users mailing list


_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce and long queries -

Olav Frengstad
> The word everywhere is to avoid key filters. It effectively does a whole-bucket key-listing, and that starts to get seriously slow out past 100k items. Since you say test queries work I'll presume you've debugged your map and reduce on some queries where you manually add a set of keys. (Right?)

Just as a note, using the Erlang pb client you can use the key filters
for 2i queries if you include the riak_kv_mapred_filters module in
your client code path.

➜  riak-erlang-client git:(master) ✗ erl -pa ebin -pa deps/*/ebin -pa
~/src/riak/deps/riak_kv/ebin
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2]
[async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.9.1  (abort with ^G)
1> O1 = riakc_obj:new(<<"test">>, <<"abc/def/1">>, []),
1> O2 = riakc_obj:new(<<"test">>, <<"abc/def/2">>, []),
1> O3 = riakc_obj:new(<<"test">>, <<"hij/klm/1">>, []),
1> {ok, Pid} = riakc_pb_socket:start_link("127.0.0.1", 8087),
1> riakc_pb_socket:put(Pid, O1),riakc_pb_socket:put(Pid, O2),
riakc_pb_socket:put(Pid, O3).
ok
2> Index = {index, <<"test">>, <<"$key">>, <<0>>, <<255>>},
2> {ok, Filter} = riak_kv_mapred_filters:build_filter([[<<"ends_with">>,"1"]]),
2> MR = [
2>   { reduce
2>   , {qfun, fun(X, F) -> lists:filter(fun({A, B}) -> F(B) end, X) end}
2>   , riak_kv_mapred_filters:compose(Filter)
2>   , true}],
2> riakc_pb_socket:mapred(Pid, Index, MR).
{ok,[{0,
      [{<<"test">>,<<"hij/klm/1">>},
       {<<"test">>,<<"abc/def/1">>}]}]}

Olav

2012/10/14, Adam Lindsay <[hidden email]>:
- Vis sitert tekst -
>> [hidden email] (mailto:[hidden email])
>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>>
>
>

2012/10/14, Adam Lindsay <[hidden email]>:

> Hi David,
>
> The word everywhere is to avoid key filters. It effectively does a
> whole-bucket key-listing, and that starts to get seriously slow out past
> 100k items. Since you say test queries work I'll presume you've debugged
> your map and reduce on some queries where you manually add a set of keys.
> (Right?)
>
> Since you're on LevelDB, it means you can use secondary indices ("2i") to
> drive these queries.
>
> I don't have access to your filter_map, so I don't have access to how you
> construct your keys, but if you have 2i turned on, then you get the first
> key-field "for free" from 2i.
>
> Let's say, hypothetically, that your keys are constructed as:
>  keyprefix:<date>:<country>:<campaign_id>
>
> Well, you can then rewrite the query input as:
>
> def main():
>     client = riak.RiakClient(host=riak_host,
>         port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
>     query = client.index(
>                     bucket,
>                     '$key',
>                     'keyprefix:201210',
>                     'keyprefix:201210~')
>     query.map('''function(value, keyData, arg) { ... }''')
>     …
>
>
>
> That's fine as far as it goes, but it doesn't solve the problem of querying
> country or campaign id, right?
>
> As a temporary measure, I'd suggest trying your key filters, cranking up the
> timeout to something on the order of hours (I gave 5 minutes conservatively
> and arbitrarily), and going ahead and running it for however long it takes.
>
>
> If those queries do give good results, I'd suggest going ahead and
> re-indexing your existing entries with 'country_bin' and 'campaign_bin'.
> It's up to personal style whether you treat dates as int or bin.
>
> There are lots of tricks and further discussion on how best to get at every
> corner of your data, but how does this strike you so far?
> --
> Adam Lindsay
>
>
> On Sunday, 14 October 2012 at 12:57, David Montgomery wrote:
>
>> Hi,
>>
>> Below is my code for running a map reduce in python. I have a six
>> node cluster, 2 cores each with 4 gigs for ram. I am no load and
>> about 3 Mill keys and using leveldb with riak 1.2. Doing the below
>> is taking a terribly long time. Never finished and I dont even know
>> how I can check if it is even running other than the python script has
>> not timed out. I look at the number of executed mappers in stats and
>> it is flat lined when looking at Graphite. On test queries the below
>> works.
>>
>> So..how do I debug what is going on?
>>
>>
>> def main():
>> client =
>> riak.RiakClient(host=riak_host,port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
>> query = client.add(bucket)
>> filters = key_filter.tokenize(":", filter_map['date']) +
>> (key_filter.starts_with('201210'))
>> #& key_filter.tokenize(":", filter_map['country']).eq("US") \
>> #& key_filter.tokenize(":", filter_map['campaign_id']).eq("t1") \
>> query.add_key_filters(filters)
>>
>> query.map('''
>> function(value, keyData, arg) {
>> var data = Riak.mapValuesJson(value)[0];
>>
>> if(data['adx']=='gdn'){
>> var alt_key = data['hw'];
>> var obj = {};
>> obj[alt_key] = 1;
>> return [ obj ];
>> }else{
>> return [];
>> }
>>
>>
>> }''')
>>
>>
>> query.reduce('''
>> function(values, arg){
>> return [ values.reduce( function(acc, item) {
>> for (var state in item) {
>> if (acc[state])
>> acc[state] += item[state];
>> else
>> acc[state] = item[state];
>> }
>> return acc;
>> })];
>> }
>> ''')
>>
>> for result in query.run(timeout=300000):
>> print result
>>
>> _______________________________________________
>> riak-users mailing list
>> [hidden email] (mailto:[hidden email])
>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>>
>
>
>

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce and long queries -

bryan-basho
Administrator
On Mon, Oct 15, 2012 at 4:13 AM, Olav Frengstad <[hidden email]> wrote:
> Just as a note, using the Erlang pb client you can use the key filters
> for 2i queries if you include the riak_kv_mapred_filters module in
> your client code path.

> 2> Index = {index, <<"test">>, <<"$key">>, <<0>>, <<255>>},
> 2> {ok, Filter} = riak_kv_mapred_filters:build_filter([[<<"ends_with">>,"1"]]),
> 2> MR = [
> 2>   { reduce
> 2>   , {qfun, fun(X, F) -> lists:filter(fun({A, B}) -> F(B) end, X) end}
> 2>   , riak_kv_mapred_filters:compose(Filter)
> 2>   , true}],
> 2> riakc_pb_socket:mapred(Pid, Index, MR).

It's a bit unfortunate that qfun is so useful, because it's also so
fragile. If your client node is not using exactly the same module
version as every node in your Riak cluster, this will fail with
undefined function errors. Absolutely, go ahead and use qfun to learn
with, but keep an eye out for that kind of surprise. Avoid qfun in
production if you can, or your upgrade process will become more
complex and/or prone to failure.

Yes, http://docs.basho.com/riak/latest/references/appendices/MapReduce-Implementation/
should be improved to give this warning as well. I've added an issue
to our new docs repo to track this improvement:
https://github.com/basho/basho_docs/issues/13

-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: Map Reduce and long queries -

Olav Frengstad
I use ``make_local_fun`` to send the fun's across the cluster:
https://gist.github.com/510070

In my opinion you should be able to send Erlang string fun's, and use
erl_eval to parse them. Unfortunately this is also allows for
execution of any erlang code so security wise people might not want
it, maybe there is a way to create a sandbox or only expose certain
modules?

Olav

2012/10/15, Bryan Fink <[hidden email]>:

> On Mon, Oct 15, 2012 at 4:13 AM, Olav Frengstad <[hidden email]> wrote:
>> Just as a note, using the Erlang pb client you can use the key filters
>> for 2i queries if you include the riak_kv_mapred_filters module in
>> your client code path.
> …
>> 2> Index = {index, <<"test">>, <<"$key">>, <<0>>, <<255>>},
>> 2> {ok, Filter} =
>> riak_kv_mapred_filters:build_filter([[<<"ends_with">>,"1"]]),
>> 2> MR = [
>> 2>   { reduce
>> 2>   , {qfun, fun(X, F) -> lists:filter(fun({A, B}) -> F(B) end, X) end}
>> 2>   , riak_kv_mapred_filters:compose(Filter)
>> 2>   , true}],
>> 2> riakc_pb_socket:mapred(Pid, Index, MR).
>
> It's a bit unfortunate that qfun is so useful, because it's also so
> fragile. If your client node is not using exactly the same module
> version as every node in your Riak cluster, this will fail with
> undefined function errors. Absolutely, go ahead and use qfun to learn
> with, but keep an eye out for that kind of surprise. Avoid qfun in
> production if you can, or your upgrade process will become more
> complex and/or prone to failure.
>
> Yes,
> http://docs.basho.com/riak/latest/references/appendices/MapReduce-Implementation/
> should be improved to give this warning as well. I've added an issue
> to our new docs repo to track this improvement:
> https://github.com/basho/basho_docs/issues/13
>
> -Bryan
>


--
Med Vennlig Hilsen
Olav Frengstad

Systemutvikler // FWT
+47 920 42 090

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com