Request queue issue with riakc_pb_socket

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Request queue issue with riakc_pb_socket

Julian
Hi,

I'm using riakc_pb_socket to queue up several requests and rely on the built-in queueing, which should hopefully eventually service all requests. However I have a scenario in which a couple processes that queue a request are never terminating the call on the socket...even though a request issued subsequently gets through.

I have one process which quickly spawns 7 processes, each of which will call riakc_pb_socket:search. 6 of these requests should return 0 results quickly. 1 returns 85000 results. What I see happen frequently (but not always) is that 4 of the requests with 0 results will complete. Then the long request with all the results eventually completes ( I set a high enough timeout value). After this, the 2 other requests never seem to get executed.

-record(state, {riak_conn,
       ...}).

handle_info({add_platform, PlatformBin}, State) ->
...
    erlang:spawn(?MODULE, populate_instance, [PlatformBin, State]),
...

populate_instance(InstanceBin, State) ->
    error_logger:info_msg(binary_to_list(InstanceBin) ++ " about to wait"),
    MapredResult = riakc_pb_socket:search(State#state.riak_conn,
                      InstanceBin,
                      "type:account",
                      [{map, {jsanon, <<"
function(v, keyData, arg) {
    var o = JSON.parse(v.values[0].data);
    var p = {...}; // I'm transforming the object
    return [JSON.stringify(p)]
}">> }, none, true }],
                      90000), % timeout
    error_logger:info_msg(binary_to_list(InstanceBin) ++ " done waiting"),

    .... process MapredResult ...


In the log I get 7 lines from populate_instance like this:
1c837b206ff37939892740206c8eb4f0f897b282 about to wait

Then I get 4 lines like this:
1c837b206ff37939892740206c8eb4f0f897b282 done waiting

Then it gets to the 5th one with a lot of results. Eventually:
a65796222ea26bb7b355bf140dbd5b72e81efee1 done waiting

Then no more. The remaining 2 processes in riakc_pb_socket:search never seem to print "... done waiting".

Suppose I sneak in a request to add a platform:
list_to_pid("<0.120.0>") ! {add_platform, <<"aaa">>}.

I see in the log that it goes through:
aaa about to wait
aaa done waiting

...so it seems like my 2 processes will never finish.

Any ideas? It seems like it should be fine to pass State to different processes, since the connection is just a pid. The queue is sitting in the socket process and should be fine.

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Reply | Threaded
Open this post in threaded view
|

Re: Request queue issue with riakc_pb_socket

bryan-basho
Administrator
On Tue, Jul 24, 2012 at 8:48 PM, Julian <[hidden email]> wrote:
> I'm using riakc_pb_socket to queue up several requests and rely on the
> built-in queueing, which should hopefully eventually service all requests.
> However I have a scenario in which a couple processes that queue a request
> are never terminating the call on the socket...even though a request issued
> subsequently gets through.

Hi, Julian. I have one possible answer for you. Could you please try
wrapping your call to riakc_pb_socket:search/5 in a try-catch block?

Deep inside riakc_pb_socket, that call becomes a gen_server:call to
the socket server. If that call times out (or encounters a few other
errors), the calling process (your code) exits with an exception. A
timeout here should be a rare occurrence, because the socket server
should be normally be unblocked, ready to handle (enqueue) requests,
but it's possible, and there are other errors that can trigger the
same behavior.

So, try setting things up as:

    populate_instance(InstanceBin, State) ->
        error_logger:info_msg(binary_to_list(InstanceBin) ++ " about to wait"),
        try riakc_pb_socket:search(...) of
            {ok, MapredResult} ->
                error_logger:info_msg(binary_to_list(InstanceBin) ++ "
done waiting"),
                ...process MapredResult...;
            {error, _} ->
                error_logger:info_msg(binary_to_list(InstanceBin) ++ "
encountered error")
        catch exit:_ ->
            error_logger:info_msg(binary_to_list(InstanceBin) ++ "
failed call, retrying..."),
            populate_instance(InstanceBin, State)
    end.

(obviously needs some safety valves to prevent infinite retries, etc.,
but you get the idea)

I consider this a little bit of a long shot, since your timeout is so
long, but it's the only way I've been able to recreate such a
situation so far.

-Bryan

_______________________________________________
riak-users mailing list
[hidden email]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com