214 lines
5.5 KiB
Perl
214 lines
5.5 KiB
Perl
|
|
# This code was forked from the LiveJournal project owned and operated
|
||
|
|
# by Live Journal, Inc. The code has been modified and expanded by
|
||
|
|
# Dreamwidth Studios, LLC. These files were originally licensed under
|
||
|
|
# the terms of the license supplied by Live Journal, Inc, which can
|
||
|
|
# currently be found at:
|
||
|
|
#
|
||
|
|
# http://code.livejournal.org/trac/livejournal/browser/trunk/LICENSE-LiveJournal.txt
|
||
|
|
#
|
||
|
|
# In accordance with the original license, this code and all its
|
||
|
|
# modifications are provided under the GNU General Public License.
|
||
|
|
# A copy of that license can be found in the LICENSE file included as
|
||
|
|
# part of this distribution.
|
||
|
|
|
||
|
|
package LJ::Worker::Gearman;
|
||
|
|
use strict;
|
||
|
|
use Gearman::Worker;
|
||
|
|
use base "LJ::Worker", "Exporter";
|
||
|
|
use LJ::WorkerResultStorage;
|
||
|
|
|
||
|
|
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
|
||
|
|
use Getopt::Long;
|
||
|
|
use IO::Socket::INET ();
|
||
|
|
use Carp qw(croak);
|
||
|
|
|
||
|
|
my $quit_flag = 0;
|
||
|
|
$SIG{TERM} = sub {
|
||
|
|
$quit_flag = 1;
|
||
|
|
};
|
||
|
|
|
||
|
|
my $opt_verbose;
|
||
|
|
die "Unknown options"
|
||
|
|
unless GetOptions( "verbose|v" => \$opt_verbose );
|
||
|
|
|
||
|
|
our @EXPORT = qw(gearman_decl gearman_work gearman_set_idle_handler gearman_set_requester_id);
|
||
|
|
|
||
|
|
my $worker = Gearman::Worker->new;
|
||
|
|
my $idle_handler;
|
||
|
|
my $requester_id; # userid, who requested job, optional
|
||
|
|
|
||
|
|
sub gearman_set_requester_id { $requester_id = $_[0]; }
|
||
|
|
|
||
|
|
sub gearman_decl {
|
||
|
|
my $name = shift;
|
||
|
|
my ( $subref, $timeout );
|
||
|
|
|
||
|
|
if ( ref $_[0] eq 'CODE' ) {
|
||
|
|
$subref = shift;
|
||
|
|
}
|
||
|
|
else {
|
||
|
|
$timeout = shift;
|
||
|
|
$subref = shift;
|
||
|
|
}
|
||
|
|
|
||
|
|
$subref = wrapped_verbose( $name, $subref ) if $opt_verbose;
|
||
|
|
|
||
|
|
if ( defined $timeout ) {
|
||
|
|
$worker->register_function( $name => $timeout => $subref );
|
||
|
|
}
|
||
|
|
else {
|
||
|
|
$worker->register_function( $name => $subref );
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# set idle handler
|
||
|
|
sub gearman_set_idle_handler {
|
||
|
|
my $cb = shift;
|
||
|
|
return unless ref $cb eq 'CODE';
|
||
|
|
$idle_handler = $cb;
|
||
|
|
}
|
||
|
|
|
||
|
|
sub gearman_work {
|
||
|
|
my %opts = @_;
|
||
|
|
my $save_result = delete $opts{save_result} || 0;
|
||
|
|
|
||
|
|
croak "unknown opts passed to gearman_work: " . join( ', ', keys %opts )
|
||
|
|
if keys %opts;
|
||
|
|
|
||
|
|
if ($LJ::IS_DEV_SERVER) {
|
||
|
|
die "DEVSERVER help: No gearmand servers listed in \@LJ::GEARMAN_SERVERS.\n"
|
||
|
|
unless @LJ::GEARMAN_SERVERS;
|
||
|
|
IO::Socket::INET->new( PeerAddr => $LJ::GEARMAN_SERVERS[0] )
|
||
|
|
or die
|
||
|
|
"First gearmand server in \@LJ::GEARMAN_SERVERS ($LJ::GEARMAN_SERVERS[0]) isn't responding.\n";
|
||
|
|
}
|
||
|
|
|
||
|
|
LJ::Worker->setup_mother();
|
||
|
|
|
||
|
|
# save the results of this worker
|
||
|
|
my $storage;
|
||
|
|
|
||
|
|
my $last_death_check = time();
|
||
|
|
|
||
|
|
my $periodic_checks = sub {
|
||
|
|
LJ::Worker->check_limits();
|
||
|
|
|
||
|
|
# check to see if we should die
|
||
|
|
my $now = time();
|
||
|
|
if ( $now != $last_death_check ) {
|
||
|
|
$last_death_check = $now;
|
||
|
|
if ( -e "$LJ::VAR/$$.please_die" ) {
|
||
|
|
unlink "$LJ::VAR/$$.please_die";
|
||
|
|
|
||
|
|
# Using exit here since "die" would be caught by an eval block
|
||
|
|
exit 1;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
$worker->job_servers(@LJ::GEARMAN_SERVERS)
|
||
|
|
; # TODO: don't do this everytime, only when config changes?
|
||
|
|
|
||
|
|
exit 0 if $quit_flag;
|
||
|
|
};
|
||
|
|
|
||
|
|
my $start_cb = sub {
|
||
|
|
my $handle = shift;
|
||
|
|
|
||
|
|
LJ::start_request();
|
||
|
|
undef $requester_id;
|
||
|
|
|
||
|
|
# save to db that we are starting the job
|
||
|
|
if ($save_result) {
|
||
|
|
$storage = LJ::WorkerResultStorage->new( handle => $handle );
|
||
|
|
$storage->init_job;
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
my $end_work = sub {
|
||
|
|
LJ::end_request();
|
||
|
|
$periodic_checks->();
|
||
|
|
};
|
||
|
|
|
||
|
|
# create callbacks to save job status
|
||
|
|
my $complete_cb = sub {
|
||
|
|
$end_work->();
|
||
|
|
my ( $handle, $res ) = @_;
|
||
|
|
$res ||= '';
|
||
|
|
|
||
|
|
if ( $save_result && $storage ) {
|
||
|
|
my %row = (
|
||
|
|
result => $res,
|
||
|
|
status => 'success',
|
||
|
|
end_time => 1
|
||
|
|
);
|
||
|
|
$row{userid} = $requester_id if defined $requester_id;
|
||
|
|
$storage->save_status(%row);
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
my $fail_cb = sub {
|
||
|
|
$end_work->();
|
||
|
|
my ( $handle, $err ) = @_;
|
||
|
|
$err ||= '';
|
||
|
|
|
||
|
|
if ( $save_result && $storage ) {
|
||
|
|
my %row = (
|
||
|
|
result => $err,
|
||
|
|
status => 'error',
|
||
|
|
end_time => 1
|
||
|
|
);
|
||
|
|
$row{userid} = $requester_id if defined $requester_id;
|
||
|
|
$storage->save_status(%row);
|
||
|
|
}
|
||
|
|
|
||
|
|
};
|
||
|
|
|
||
|
|
while (1) {
|
||
|
|
$periodic_checks->();
|
||
|
|
warn "waiting for work...\n" if $opt_verbose;
|
||
|
|
|
||
|
|
# do the actual work
|
||
|
|
eval {
|
||
|
|
$worker->work(
|
||
|
|
stop_if => sub { $_[0] },
|
||
|
|
on_complete => $complete_cb,
|
||
|
|
on_fail => $fail_cb,
|
||
|
|
on_start => $start_cb,
|
||
|
|
);
|
||
|
|
};
|
||
|
|
warn $@ if $@;
|
||
|
|
|
||
|
|
if ($idle_handler) {
|
||
|
|
eval {
|
||
|
|
LJ::start_request();
|
||
|
|
$idle_handler->();
|
||
|
|
LJ::end_request();
|
||
|
|
};
|
||
|
|
warn $@ if $@;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# --------------
|
||
|
|
|
||
|
|
sub wrapped_verbose {
|
||
|
|
my ( $name, $subref ) = @_;
|
||
|
|
return sub {
|
||
|
|
warn " executing '$name'...\n";
|
||
|
|
my $ans = eval { $subref->(@_) };
|
||
|
|
if ($@) {
|
||
|
|
warn " -> ERR: $@\n";
|
||
|
|
die $@; # re-throw
|
||
|
|
}
|
||
|
|
elsif ( !ref $ans && $ans !~ /^[\0\x7f-\xff]/ ) {
|
||
|
|
my $cleanans = $ans;
|
||
|
|
$cleanans =~ s/[^[:print:]]+//g;
|
||
|
|
$cleanans = substr( $cleanans, 0, 1024 ) . "..." if length $cleanans > 1024;
|
||
|
|
warn " -> answer: $cleanans\n";
|
||
|
|
}
|
||
|
|
return $ans;
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
1;
|