runaway memory leak with LWP and Fork()ing on Windows
runaway memory leak with LWP and Fork()ing on Windows
am 02.11.2007 06:30:20 von bulk88
I am trying to non-blocking/asynchronously fetch/work with websites as
a standalone program (not a server script).
My idea is to use Perl, LWP, and simple perl-provided forking on Win32
(Windows XP in my case). Child threads do not need to talk back
anything to the parent. Parent fires a bunch of childs off, and
wait()s for them to complete, then fires off more children in a loop.
My code has a severe memory leak. I have isolated it into the code
below.
If this program is modified to not fork (look in comments; change
'fork()' to '0' and 'exit(0);' to '#exit(0);' ), it will
only take 8mb ram and 7mb VM through its whole lifetime which is 500
iterations for this example. If forking is on, then it will balloon to
290mb Ram usage, 470mb VM usage when it ends at 500 iterations.
Placing the loop on infinity will cause all memory to be used up and
eventually a crash. I would want to be go with a infinite loop, or
user specified count in the 1000s of iterations. I believe I wait();
an adequate amount of times. In my research, if I don't wait(),
program will silently end with no errors after 64 unwait()ed children
threads due to limitations in fork() implementation in ActivePerl/
Win32 perl, but I am fine with the 64 limit.
My forking code seems to work fine by itself. To see this, replace
doGet sub with "mySleep" sub. mySleep will simulate the latency of the
web page fetches. MySleep uses between 13 to 38 MB of RAM, never
exceeding 40mb. It approaches 40mb right before the wait() cleanup
that happens every 60 iterations, then falls to 13mb and SLOWLY goes
back up again. With LWP code, the memory NEVER goes down after a 60
iteration wait() cleanup. Undef()ing $ua and $response in the child
does not help the memleak at all. I think there might be a garbage
collection problem, but I dont know.
Keywords: win32, LWP, UserAgent, fork, windows, threads, ithreads
I assume there is a problem with LWP somehow. I would rather not use a
real threading library. It seems unnecessary and not KISS. Can someone
help?
#!/usr/bin/perl -w
use strict;
use warnings;
use LWP::UserAgent;
use Time::HiRes qw( sleep );
use String::Escape qw( printable );
sub doGet();
for (my $i = 0; $i < 500; $i++)
{
my $pid;
$pid = fork(); # $pid = 0; # for non-fork design
if ($pid == 0)
{
doGet ();
#mySleep(); # replace doGet with mySleep to see forking code
working right
print "child $i\n";
exit(0); # comment out to switch to non-fork design
}
else {
print "parent $i $pid \n";
sleep(.1);
if ($i % 60 == 0) #cleanup thread IDs to not run into 64
limit on windows fork
{ foreach(1..60) {wait();}};
}
}
#end of script/pause to check memory usage, hopefully will cause a
pause on non-win32
$^O eq 'MSWin32' ? system('pause') : system('read -n 1 -p "Press Any
Key to Continue..."');
#LWP function
sub doGet()
{
my $ua = LWP::UserAgent->new;
#url can be anything, google was used just as an example
my $response = $ua->get('http://www.google.com/images/firefox/
op_icon.png');
if ($response->is_success) {
print substr(printable( $response->content), 0, 30)."\n";
}
else {
die $response->status_line;
}
#undef($ua);
#undef($response);
}
sub mySleep()
{
sleep(rand(5));
}
__END__
Here is my perl -V:
Summary of my perl5 (revision 5 version 8 subversion 8) configuration:
Platform:
osname=MSWin32, osvers=5.00, archname=MSWin32-x86-multi-thread
uname=''
config_args='undef'
hint=recommended, useposix=true, d_sigaction=undef
usethreads=define use5005threads=undef useithreads=define
usemultiplicity=d
fine
useperlio=define d_sfio=undef uselargefiles=define usesocks=undef
use64bitint=undef use64bitall=undef uselongdouble=undef
usemymalloc=n, bincompat5005=undef
Compiler:
cc='cl', ccflags ='-nologo -GF -W3 -MD -Zi -DNDEBUG -O1 -DWIN32 -
D_CONSOLE
DNO_STRICT -DHAVE_DES_FCRYPT -DNO_HASH_SEED -DUSE_SITECUSTOMIZE -
DPRIVLIB_LAST_
N_INC -DPERL_IMPLICIT_CONTEXT -DPERL_IMPLICIT_SYS -DUSE_PERLIO -
DPERL_MSVCRT_RE
DFIX',
optimize='-MD -Zi -DNDEBUG -O1',
cppflags='-DWIN32'
ccversion='12.00.8804', gccversion='', gccosandvers=''
intsize=4, longsize=4, ptrsize=4, doublesize=8, byteorder=1234
d_longlong=undef, longlongsize=8, d_longdbl=define, longdblsize=8
ivtype='long', ivsize=4, nvtype='double', nvsize=8,
Off_t='__int64', lseeks
ze=8
alignbytes=8, prototype=define
Linker and Libraries:
ld='link', ldflags ='-nologo -nodefaultlib -debug -opt:ref,icf -
libpath:"C
\Perl\lib\CORE" -machine:x86'
libpth=\lib
libs= oldnames.lib kernel32.lib user32.lib gdi32.lib
winspool.lib comdlg3
..lib advapi32.lib shell32.lib ole32.lib oleaut32.lib netapi32.lib
uuid.lib ws2
32.lib mpr.lib winmm.lib version.lib odbc32.lib odbccp32.lib
msvcrt.lib
perllibs= oldnames.lib kernel32.lib user32.lib gdi32.lib
winspool.lib com
lg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib netapi32.lib
uuid.lib
ws2_32.lib mpr.lib winmm.lib version.lib odbc32.lib odbccp32.lib
msvcrt.lib
libc=msvcrt.lib, so=dll, useshrplib=true, libperl=perl58.lib
gnulibc_version=''
Dynamic Linking:
dlsrc=dl_win32.xs, dlext=dll, d_dlsymun=undef, ccdlflags=' '
cccdlflags=' ', lddlflags='-dll -nologo -nodefaultlib -debug -
opt:ref,icf
libpath:"C:\Perl\lib\CORE" -machine:x86'
Characteristics of this binary (from libperl):
Compile-time options: MULTIPLICITY PERL_IMPLICIT_CONTEXT
PERL_IMPLICIT_SYS PERL_MALLOC_WRAP
PL_OP_SLAB_ALLOC USE_ITHREADS USE_LARGE_FILES
USE_PERLIO USE_SITECUSTOMIZE
Locally applied patches:
ActivePerl Build 822 [280952]
Iin_load_module moved for compatibility with build 806
PerlEx support in CGI::Carp
Less verbose ExtUtils::Install and Pod::Find
Patch for CAN-2005-0448 from Debian with modifications
Rearrange @INC so that 'site' is searched before 'perl'
Partly reverted 24733 to preserve binary compatibility
MAINT31223 plus additional changes
31490 Problem bootstraping Win32CORE
31324 Fix DynaLoader::dl_findfile() to locate .so files again
31214 Win32::GetLastError fails when first called
31211 Restore Windows NT support
31188 Problem killing a pseudo-forked child on Win32
29732 ANSIfy the PATH environment variable on Windows
27527,29868 win32_async_check() can loop indefinitely
26970 Make Passive mode the default for Net::FTP
26379 Fix alarm() for Windows 2003
24699 ICMP_UNREACHABLE handling in Net::Ping
Built under MSWin32
Compiled at Jul 31 2007 19:34:48
@INC:
C:/Perl/site/lib
C:/Perl/lib
.
Re: runaway memory leak with LWP and Fork()ing on Windows
am 02.11.2007 15:46:11 von Petr Vileta
bulk88@hotmail.com wrote:
> I am trying to non-blocking/asynchronously fetch/work with websites as
> a standalone program (not a server script).
[...]
> I assume there is a problem with LWP somehow. I would rather not use a
> real threading library. It seems unnecessary and not KISS. Can someone
> help?
>
Fork is bad idea on Win platform ;-) Go to CPAN and look for
Win32::ProcFarm.
I did something with multiple LWP reads too and this module resolved my
problems excelently.
--
Petr Vileta, Czech republic
(My server rejects all messages from Yahoo and Hotmail. Send me your mail
from another non-spammer site please.)
Re: runaway memory leak with LWP and Fork()ing on Windows
am 02.11.2007 16:00:24 von sheinrich
Without delving too deep into your problem I think you could give a
try to my attached module PreforkAgent.pm.
It was made for very much the same task - stress testing a website in
fact.
You could probably use the module in your script without any
adaptations. Just follow the pod instructions on how to set up your
callback routines.
In contrast to your current concept the specified number of children
is created only once before the actual work starts. And each returning
child is immediantly given the next task as long as there are any
more.
Under Linux the Module works like a charm with up to 500 children.
Because the overhead for the forks is all done beforehand it's
possible to create a heavy load on a target web (or any other) server
with only moderate local means.
If you want to put a cap on the load, you will need to do so in your
wrapper script.
Because I never tried it in Windows I'd be delighted to hear how you
fare.
Cheers, Steffen
#
# PreforkAgent
#
# Allows execution of many jobs in parallel.
# All parent / child communication is implemented with pipes.
# Only the signal INT is caught by the parent for cleanup purposes.
#
# Steffen Heinrich - Jun 2007
#
package PreforkAgent;
use strict;
my $VERSION = '0.03';
#################################################
# libs and class vars
use IO::Select;
my $EOF_MSG_SEQ = "\x1F"; # ASCII cotrol character US (Unit Separator)
############################
# constructor
sub new {
my $class = shift;
my $me = bless {
debug_out => 0,
parent => $$,
listener => IO::Select->new(),
kids_to_spawn => 0,
kids => 0,
living => {},
pids => {},
jobs => {},
child_prepare => sub {1}
}, $class;
$me
}
############################
# methods
sub register {
# registers one or more callback routines with the agent
my $self = shift() or return;
my %subs = @_ or return;
my @errors;
while (my ($s, $c) = each %subs) {
if ($s !~ /^(child_prepare|fetch_next_task|process_job|
process_response)$/) {
push @errors, "'$s' is not a known sub";
} elsif (defined($c) && ref($c) && ref($c) =~ /CODE/) {
$self->{$s} = $c;
} else {
push @errors, "'$s' does not reference a sub";
}
}
die join("\n", @errors)."\n" if @errors;
}
sub spawn {
# creates a given number of children
# and opens bidrectional pipes for each
my $self = shift() or return;
my $kids_to_spawn = shift() or return;
$self->{kids_to_spawn} = $kids_to_spawn;
my $process_job = $self->{process_job}
or die "process_job() must have been registered with PreforkAgent
before a call to spawn()!\n";
my $sel = $self->{listener};
# prevent zombies since we won't wait()
$SIG{CHLD} = 'IGNORE';
# fork loop
for my $child (1..$kids_to_spawn) {
my $whdl = 'W'.$child;
my $rhdl = 'R'.$child;
{ no strict 'refs';
# open bidirect comm
pipe $rhdl, WH or die "pipe1: $!"; # parent <- child
pipe RH, $whdl or die "pipe2: $!"; # child <- parent
# register the read handle with the ones to listen to
$sel->add(\*$rhdl);
}
# save write handle connected with readhandle
$self->{living}{$child} = $whdl;
select((select(WH), $| = 1)[0]); # autoflush
select((select($whdl), $| = 1)[0]); # autoflush
my $pid;
unless ($pid = fork()) {
# Child process
# closes unnecessary handles
close $rhdl;
close $whdl;
# execute individual initialization
my $init = $self->{child_prepare};
defined(&$init($child)) or die;
# creates a new listener
$sel = IO::Select->new;
# registers the one handle to listen to
$sel->add(\*RH);
# signals readiness
_write_into_pipe(\*WH, 'READY');
while ($sel->can_read) {
my $job = _read_from_pipe(\*RH);
if ($job eq 'QUIT') {
last;
} else {
# do something
my $answer = &$process_job($job, $child);
_write_into_pipe(\*WH, $answer);
}
}
# child is done, unload and quit
$sel->remove(\*RH);
close WH;
close RH;
exit 0;
}
# Parent process closes unnecessary handles
close WH;
close RH;
# and registers child
$self->{pids}{$child} = $pid;
# parent catches SIGINT
$SIG{INT} = sub {$self->cleanup()}
unless $self->{kids}++;
} # loop to start others
($self->{kids} == $self->{kids_to_spawn})
or die "Could only spawn $self->{kids} kids of $self-
>{kids_to_spawn}: $!";
$self->{kids}
} # end of spawn()
sub assign {
# Sending out jobs to any child which is ready to listen
# and collecting any responses which are then being reported to the
registered callback.
# As long as their are more jobs to do, they are being immediately
assigned to returning children.
my $self = shift() or return;
my $fetch_next_task = $self->{fetch_next_task};
my $process_response = $self->{process_response};
($fetch_next_task && $process_response)
or die "fetch_next_task() and process_response() must have been
registered with PreforkAgent before a call to assign()!\n";
$self->{kids} > 0
or die "You need to call spawn(kids) before assigning jobs!\n";
my $sel = $self->{listener};
# work loop
while ($self->{kids}) {
my $not_finished = 1;
while (my @ready = $sel->can_read) {
foreach my $rhdl (@ready) {
my $child = '';
{ no strict 'refs';
*{$rhdl} =~ /^(.+::)?R(\d+)$/
and $child = $2;
}
my $whdl = $self->{living}{$child};
my $response = _read_from_pipe($rhdl);
unless ($response eq 'READY') {
&$process_response($response, $child);
}
# assign next task
my $task = undef;
$not_finished = $not_finished && defined($task = &
$fetch_next_task($child));
if ($child && $whdl && $not_finished) {
_write_into_pipe($whdl, $task);
$self->{jobs}{$child}++ if $self->{debug_out};
} else {
# tell child to exit
_write_into_pipe($whdl, 'QUIT');
# unregister child
$sel->remove($rhdl);
delete $self->{living}{$child};
# close handles to child
close $rhdl;
close $whdl;
$self->{kids}--;
}
}
}
}
# since all children exited and we set SIGCHLD = IGNORE
# we don't have to wait()
if ($self->{debug_out}) {
my $job_cnt = $self->{jobs};
foreach my $child (sort {$job_cnt->{$b} <=> $job_cnt->{$a}} keys %
$job_cnt) {
printf "%4s: %5d\n", $child, $job_cnt->{$child};
}
}
$self->cleanup();
} # end of assign()
############################
# subroutines
sub _read_from_pipe {
my ($fh) = @_;
my $blksize = (stat $fh)[11] || 16384;
my $offset = 0;
my $buf = '';
while (my $len = sysread($fh, $buf, $blksize, $offset)) {
if (!defined $len) {
next if $! =~ /^Interrupted/;
die "System Read Error: $!\n";
}
$offset += $len;
last if $buf =~ s/$EOF_MSG_SEQ$//o;
}
$buf
}
sub _write_into_pipe {
my ($fh, $msg) = @_;
$msg .= $EOF_MSG_SEQ;
my $length = length($msg);
my $blksize = (stat $fh)[11] || 16384;
my $offset = 0;
while ($length) {
my $len = syswrite($fh, $msg, $blksize, $offset);
die "System Write Error: $!\n"
unless defined $len;
$length -= $len;
$offset += $len;
}
$offset
}
sub cleanup {
my $self = shift() or return;
# only for parent
return unless $self->{parent} == $$;
# print "\$kids = $self->{kids}\n";
# print "\%living count = ", scalar(keys %{$self->{living}}), "\n";
# print "select bitmap = '", defined $self->{listener}->bits()?
(unpack 'b*', $self->{listener}->bits()):'', "'\n";
while (my ($kid, $pid) = each %{$self->{pids}}) {
my $ps = `ps $pid`;
if ($ps =~ /$0\b/so) {
print STDERR "killing $pid\n";
`kill $pid`;
}
delete $self->{pids}{$kid};
}
}
1
__END__
=pod
=head1 NAME
PreforkAgent - A dispatch wrapper for simultanous tasks.
=head1 PURPOSE
Any big number of similar tasks that have to be run in parallel with
outmost throughput.
First, a given number of children is spawned. Then each of them is
handed the next task
from a common queue in succsession as they return with a response.
=head1 SYNOPSIS
use PreforkAgent;
my $pfa = PreforkAgent->new or die;
$pfa->register(
child_prepare => \&individual_init, # this sub is optional
fetch_next_task => \&next_job,
process_job => \&dispatch,
process_response => \&collect_response
);
my $GLOBAL_VAR = "fancy value";
my $kid_count = $pfa->spawn(5) or die;
$pfa->assign();
exit;
sub individual_init {
# child will die if false is returned
# this sub is optional
my $child_id = shift() or return;
# Child context:
# can read $GLOBAL_VAR at time of spawn(), but not change
# any initialization to be done by each child goes here
...
return $success;
}
sub next_job {
# must return a string, which will be subsequently passed to
dispatch()
# and MUST return undef, if finished.
# Enables the main program to tie a certain job to a specific
child's response returned in collect_response().
my $child_id = shift;
# Parent context:
# can read AND write $GLOBAL_VAR
...
return $str_job;
}
sub dispatch {
# defines the parallel task for the children
# processes the job and returns a serialized response
my $str_job = shift() or return;
my $child_id = shift() or return;
# Child context:
# can read $GLOBAL_VAR at time of spawn(), but not change
...
return $str_response;
}
sub collect_response {
# allows the main program to evaluate any of the child's reponses
my $response = shift() or return;
my $child_id = shift() or return;
# Parent context:
# can read AND write $GLOBAL_VAR
...
}
=head1 SEE ALSO
ParallelUserAgent by Marc Langheinrich
=head1 VERSION
This document describes version 0.03.
=head1 LICENSE
Copyright (C) 2007, Steffen Heinrich. All rights reserved.
This module is free software;
you can redistribute it and/or modify it under the same terms as Perl
itself.
=cut
Re: runaway memory leak with LWP and Fork()ing on Windows
am 02.11.2007 16:20:59 von glex_no-spam
bulk88@hotmail.com wrote:
> I am trying to non-blocking/asynchronously fetch/work with websites as
> a standalone program (not a server script).
>
> My idea is to use Perl, LWP, and simple perl-provided forking on Win32
> (Windows XP in my case). Child threads do not need to talk back
> anything to the parent. Parent fires a bunch of childs off, and
> wait()s for them to complete, then fires off more children in a loop.
> My code has a severe memory leak. I have isolated it into the code
> below.
[...]
> Keywords: ...
No need for you to put 'Keywords' in your post.
>
> I assume there is a problem with LWP somehow. I would rather not use a
> real threading library. It seems unnecessary and not KISS. Can someone
> help?
You could look at LWP::Parallel::UserAgent
And a helpful article using it, by Randal:
http://www.stonehenge.com/merlyn/WebTechniques/col27.html
Re: runaway memory leak with LWP and Fork()ing on Windows
am 04.11.2007 17:39:08 von Mark Clements
sheinrich@my-deja.com wrote:
> Without delving too deep into your problem I think you could give a
> try to my attached module PreforkAgent.pm.
> It was made for very much the same task - stress testing a website in
> fact.
>
> Because I never tried it in Windows I'd be delighted to hear how you
> fare.
You might like to consider packaging it up and releasing it on CPAN.
This would make it much more likely that you will receive feedback :)
Mark
Re: runaway memory leak with LWP and Fork()ing on Windows
am 05.11.2007 12:05:30 von sheinrich
>
>
>
> You might like to consider packaging it up and releasing it on CPAN.
> This would make it much more likely that you will receive feedback :)
>
> Mark
You are absolutely right.
But then I will have to bother with CPAN packaging and submission.
Which is something I never felt to have the time for. ;-)
Cheers, Steffen