[RFC] Thread::MappedQueue - threadsafe queue with mapped responses

[RFC] Thread::MappedQueue - threadsafe queue with mapped responses

am 09.07.2005 01:47:18 von Dean Arnold

Concept for a new Thread module, looking for feedback,
or if I've overlooked an existing module that already does
this, or if there's a better name for it...I'm trying to
implement a sortof RPC mechanism to wrap DBI so DBI handles
can be passed between threads, and needed something like this.

TIA,
Dean Arnold
Presicient Corp.

=head1 NAME

Thread::MappedQueue - thread-safe queues with identifiable elements

=head1 SYNOPSIS

use Thread::MappedQueue;
my $q = new Thread::MappedQueue;
#
# enqueue elements, returning a unique queue ID
# (used in the client)
#
my $id = $q->enqueue("foo", "bar");
#
# dequeue next available element (used in the server),
# waiting indefinitely for an element to be made available
# returns shared arrayref, first element is unique ID
#
my $foo = $q->dequeue;
#
# dequeue next available element (used in the server),
# returns undef if no element immediately available
# otherwise, returns shared arrayref, first element is unique ID
#
my $foo = $q->dequeue_nb;
#
# dequeue next available element (used in the server),
# returns undef if no element available within $timeout
# seconds; otherwise, returns shared arrayref, first
# element is unique ID
#
my $foo = $q->dequeue_until($timeout);
#
# returns number of items still in queue
#
my $left = $q->pending;
#
# maps a response for the
# queued element identified by $id;
#
$q->respond($id, @list);
#
# tests for a response to the queued
# element identified by $id; returns undef if
# not yet available, else returns the queue object
#
my $result = $q->ready($id);
#
# wait for and return the response for the
# specified unique identifier
# (dequeue_response is alias)
#
my $result = $q->wait($id);
my $result = $q->dequeue_response($id);
#
# waits up to $timeout seconds for a response to
# the queued element identified by $id; returns undef if
# not available within $timeout, else returns the queue object
#
my $result = $q->wait_until($id, $timeout);
#
# wait for a response to the queued
# elements listed in @ids, returning a hashref of
# the first available response(s), keyed by id
#
my $result = $q->wait_any(@ids);
#
# wait upto $timeout seconds for a response to
# the queued elements listed in @ids, returning
# a hashref of the first available response(s), keyed by id
# Returns undef if none available in $timeout seconds
#
my $result = $q->wait_any_until($timeout, @ids);
#
# wait for responses to all the queued
# elements listed in @ids, returning a hashref of
# the response(s), keyed by id
#
my $result = $q->wait_all(@ids);
#
# wait upto $timeout seconds for responses to
# all the queued elements listed in @ids, returning
# a hashref of the response(s), keyed by id
# Returns undef if all responses not recv'd
# in $timeout seconds
#
my $result = $q->wait_all_until($timeout, @ids);

=head1 DESCRIPTION

A mapped queue, similar to L, except that as elements
are queued, they are assigned unique identifiers, which are used
to identify responses returned from the dequeueing thread. This
class provides a simple RPC-like mechanism between multiple client
and server threads, so that a single server thread can safely
multiplex requests from multiple client threads.

C objects encapsulate

=over 4

=item a shared array, used as the queue (same as L)

=item a shared scalar, used to provide unique identifier sequence
numbers

=item a shared hash, I the mapping hash, used to return responses
to enqueued elements, using the generated uniqiue identifier as the hash key

=back

A normal processing sequence for Thread::MappedQueue might be:

#
# Thread A (the client):
#
...marshal parameters for a coroutine...
my $id = $q->enqueue('function_name', \@paramlist);
my $results = $q->dequeue_response($id);
...process $results...
#
# Thread B (the server):
#
while (1) {
my $call = $q->dequeue;
my ($id, $func, @params) = @$call;
$q->respond($id, $self->$func(@params));
}

=head1 FUNCTIONS AND METHODS

=over 8

=item new

The C function creates a new empty queue,
and associated mapping hash.

=item enqueue LIST

Creates a shared array, pushes a unique
identifier onto the shared array, then pushes the LIST onto the array,
then pushes the shared arrayref onto the queue.
The queue will grow as needed to accommodate the list.

=item dequeue

Waits indefinitely for an element to become available
in the queue, then removes and returns it.

=item dequeue_nb

The C method is identical to C(),
except it will return undef immediately if there are no
elements currently in the queue.

=item dequeue_until

Identical to C(), except it accepts a C<$timeout>
parameter specifying a duration (in seconds) to wait for
an available element. If no element is
available within the $timeout, it returns undef.

=item pending

Returns the number of items still in the queue.

=item respond($id [, LIST ])

Creates a new element in the mapping hash, keyed by C<$id>,
with a value set to a shared arrayref containing LIST.

=item ready($id)

Tests for a response to a uniquely identified
previously C'd LIST. Returns undef if no
response is available, otherwise returns the
Thread::MappedQueue object.

=item wait($id) I dequeue_response($id)

Waits indefinitely for a response to a uniquely identified
previously C'd LIST. Returns the returned result.

=item wait_until($id, $timeout)

Waits up to $timeout seconds for a response to
to a uniquely identified previously C'd LIST.
Returns undef if no response is available in the specified
$timeout duration, otherwise, returns the result.

=item wait_any(@ids)

Wait indefinitely for a response to any of the
previously C'd elements specified in the
the supplied C<@ids>. Returns a hashref of available
responses keyed by their identifiers

=item wait_any_until($timeout, @ids)

Wait upto $timeout seconds for a response to any of the
previously C'd elements specified in the
the supplied C<@ids>. Returns a hashref of available
responses keyed by their identifiers, or undef if none
available within $timeout seconds.

=item wait_all(@ids)

Wait indefinitely for a response to all the
previously C'd elements specified in
the supplied C<@ids>. Returns a hashref of
responses keyed by their identifiers.

=item wait_all_until($timeout, @ids)

Wait upto $timeout seconds for a response to all the
previously C'd elements specified in
the supplied C<@ids>. Returns a hashref of
responses keyed by their identifiers, or undef if all
responses are not available within $timeout seconds.

=back

=head1 SEE ALSO

L, L, L

=head1 AUTHOR, COPYRIGHT, & LICENSE

Dean Arnold, Presicient Corp. L

Copyright(C) 2005, Presicient Corp., USA

Permission is granted to use this software under the same terms
as Perl itself. Refer to the Perl Artistic License for details.

=cut