306 lines
9.2 KiB
Perl
306 lines
9.2 KiB
Perl
#!/usr/bin/perl
|
|
#
|
|
# DW::TaskQueue
|
|
#
|
|
# Library for queueing and executing jobs.
|
|
#
|
|
# Authors:
|
|
# Mark Smith <mark@dreamwidth.org>
|
|
#
|
|
# Copyright (c) 2019-2020 by Dreamwidth Studios, LLC.
|
|
#
|
|
# This program is free software; you may redistribute it and/or modify it under
|
|
# the same terms as Perl tself. For a copy of the license, please reference
|
|
# 'perldoc perlartistic' or 'perldoc perlgpl'.
|
|
#
|
|
|
|
package DW::TaskQueue;
|
|
|
|
use strict;
|
|
use v5.10;
|
|
use Log::Log4perl;
|
|
my $log = Log::Log4perl->get_logger(__PACKAGE__);
|
|
|
|
use DW::TaskQueue::Dedup;
|
|
use DW::TaskQueue::SQS;
|
|
use DW::TaskQueue::LocalDisk;
|
|
|
|
my $_queue;
|
|
|
|
sub get {
|
|
my $class = $_[0];
|
|
|
|
return $_queue if defined $_queue;
|
|
|
|
$_queue = DW::TaskQueue::LocalDisk->init();
|
|
|
|
# Determine what kind of queue object to build, depending on if we're
|
|
# running locally or not
|
|
if (exists $LJ::SQS{endpoint}) {
|
|
return $_queue = DW::TaskQueue::SQS->init(%LJ::SQS);
|
|
}
|
|
|
|
# If we're a dev server, allow the local mode (not allowed in production,
|
|
# it's really crappy)
|
|
if ($LJ::IS_DEV_SERVER == 0) {
|
|
return $_queue = DW::TaskQueue::LocalDisk->init();
|
|
}
|
|
|
|
$log->logcroak('Unable to instantiate any DW::TaskQueue modules.');
|
|
}
|
|
|
|
#sub get {
|
|
#$_queue = DW::TaskQueue::LocalDisk->init();
|
|
#my $class = $_[0];
|
|
|
|
#return $_queue if defined $_queue;
|
|
|
|
|
|
# Determine what kind of queue object to build, depending on if we're
|
|
# running locally or not
|
|
#$_queue = DW::TaskQueue::LocalDisk->init();
|
|
|
|
# Determine what kind of queue object to build, depending on if we're
|
|
# running locally or not
|
|
#if ( exists $LJ::SQS{region} ) {
|
|
# return $_queue = DW::TaskQueue::SQS->init(%LJ::SQS);
|
|
#}
|
|
# if ($LJ::IS_DEV_SERVER = 0) {
|
|
# return $_queue = DW::TaskQueue::LocalDisk->init();
|
|
# }
|
|
|
|
# $log->logcroak('Unable to instantiate any DW::TaskQueue modules.');
|
|
#}
|
|
|
|
sub send {
|
|
my ( $class, @args ) = @_;
|
|
|
|
$class->get->send(@args);
|
|
}
|
|
|
|
sub receive {
|
|
my ( $class, @args ) = @_;
|
|
|
|
$class->get->receive(@args);
|
|
}
|
|
|
|
sub completed {
|
|
my ( $class, @args ) = @_;
|
|
|
|
$class->get->completed(@args);
|
|
}
|
|
|
|
sub dispatch {
|
|
my ( $self, @tasks ) = @_;
|
|
return undef unless @tasks;
|
|
|
|
$self = $self->get unless ref $self;
|
|
|
|
# This is a shim function that inspects the tasks being sent and dispatches
|
|
# them to the appropriate task queueing system.
|
|
my ( @schwartz_jobs, @tsq_tasks );
|
|
foreach my $task (@tasks) {
|
|
if ( $task->isa('TheSchwartz::Job') ) {
|
|
push @schwartz_jobs, $task;
|
|
}
|
|
elsif ( $task->isa('DW::Task') ) {
|
|
|
|
# Check dedup before enqueuing
|
|
if ( my $uniqkey = $task->uniqkey ) {
|
|
my $queue_name = ref $task;
|
|
my $ttl = $task->dedup_ttl || 3600;
|
|
unless ( DW::TaskQueue::Dedup->claim_unique( $queue_name, $uniqkey, $ttl ) ) {
|
|
$log->debug( 'Skipping duplicate task: ' . ref($task) . " key=$uniqkey" );
|
|
next;
|
|
}
|
|
}
|
|
push @tsq_tasks, $task;
|
|
}
|
|
elsif ( $task->isa('LJ::Event') ) {
|
|
push @tsq_tasks, $task->fire_task;
|
|
}
|
|
else {
|
|
$log->error( 'Unknown job/task type, dropping: ' . ref($task) );
|
|
}
|
|
}
|
|
|
|
my $rv = 1;
|
|
|
|
# Dispatch to Schwartz
|
|
if (@schwartz_jobs) {
|
|
if ( my $sclient = LJ::theschwartz() ) {
|
|
$log->debug( 'Inserting ' . scalar(@schwartz_jobs) . ' jobs into TheSchwartz.' );
|
|
$rv &&= $sclient->insert_jobs(@schwartz_jobs);
|
|
}
|
|
else {
|
|
$log->warn( 'Failed to retrieve TheSchwartz client, dropping '
|
|
. scalar(@schwartz_jobs)
|
|
. ' jobs.' );
|
|
}
|
|
}
|
|
|
|
# Dispatch to TaskQueue, grouping by task type since send() routes
|
|
# the entire batch to the queue of the first task
|
|
if (@tsq_tasks) {
|
|
my %by_type;
|
|
push @{ $by_type{ ref $_ } }, $_ for @tsq_tasks;
|
|
for my $type ( keys %by_type ) {
|
|
my $batch = $by_type{$type};
|
|
$log->debug( 'Inserting ' . scalar(@$batch) . " $type tasks into TaskQueue." );
|
|
$rv &&= $self->send(@$batch);
|
|
}
|
|
}
|
|
|
|
# Returns the "worse" of the return values. If either are falsey, we will
|
|
# return a false value.
|
|
return $rv;
|
|
}
|
|
|
|
sub start_work {
|
|
my ( $self, $class, %opts ) = @_;
|
|
|
|
$opts{message_timeout_secs} ||= 0;
|
|
|
|
$self = $self->get unless ref $self;
|
|
|
|
eval "use $class;";
|
|
$log->logcroak("Failed to load task class $class: $@") if $@;
|
|
|
|
my $start_time = time();
|
|
my $messages_done = 0;
|
|
|
|
$log->info( sprintf( '[%s %0.3fs] Worker starting', $class, 0.0 ) );
|
|
|
|
while (1) {
|
|
my $recv_start_time = time();
|
|
if ( $opts{exit_after_secs} && ( $recv_start_time - $start_time > $opts{exit_after_secs} ) )
|
|
{
|
|
$log->info(
|
|
sprintf(
|
|
'[%s] Exiting after %d seconds of work, as requested.',
|
|
$class, $opts{exit_after_secs}
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
if ( $opts{exit_after_messages} && ( $messages_done >= $opts{exit_after_messages} ) ) {
|
|
$log->info(
|
|
sprintf(
|
|
'[%s] Exiting after %d messages done, as requested.',
|
|
$class, $opts{exit_after_messages}
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
my $messages = $self->receive( $class, 10 );
|
|
my $recv_time = time() - $recv_start_time;
|
|
|
|
unless ( @{ $messages || [] } ) {
|
|
$log->debug( sprintf( '[%s %0.3fs] Receive finished, empty', $class, $recv_time ) );
|
|
next;
|
|
}
|
|
|
|
$log->debug(
|
|
sprintf(
|
|
'[%s %0.3fs] Receive finished, %d messages',
|
|
$class, $recv_time, scalar(@$messages)
|
|
)
|
|
);
|
|
|
|
my ( @completed, @failed );
|
|
my ( $work_start_time, $work_end_time );
|
|
foreach my $message_pair (@$messages) {
|
|
my ( $handle, $message ) = @$message_pair;
|
|
|
|
# Record earliest start time of any coroutine
|
|
my $local_start_time = time();
|
|
$work_start_time = $local_start_time
|
|
if $local_start_time < $work_start_time || !defined $work_start_time;
|
|
|
|
my ( $res, $abort );
|
|
eval {
|
|
local $SIG{ALRM} = sub {
|
|
$log->error(
|
|
sprintf(
|
|
'[%s] Operation timed out after %d seconds. Exiting worker. Message: %s',
|
|
$class, $opts{message_timeout_secs}, $handle
|
|
)
|
|
);
|
|
$abort = 1;
|
|
};
|
|
alarm $opts{message_timeout_secs};
|
|
$res = $message->work($handle);
|
|
};
|
|
alarm 0;
|
|
die if $@; # Reraise if the work call died.
|
|
|
|
# Clear out MDC so we don't continue to log with whatever the worker might
|
|
# have put into context
|
|
Log::Log4perl::MDC->remove;
|
|
return if $abort;
|
|
|
|
$messages_done++;
|
|
|
|
# Record latest end time of any coroutine
|
|
my $local_end_time = time();
|
|
$work_end_time = $local_end_time
|
|
if $local_end_time > $work_end_time || !defined $work_end_time;
|
|
|
|
if ( $res == DW::Task::COMPLETED ) {
|
|
push @completed, $handle;
|
|
|
|
# Release dedup key on successful completion
|
|
if ( my $uniqkey = $message->uniqkey ) {
|
|
DW::TaskQueue::Dedup->release_unique( ref($message), $uniqkey );
|
|
}
|
|
}
|
|
else {
|
|
# If we've exceeded max retries, give up and mark complete
|
|
# instead of letting SQS send it to the DLQ
|
|
if ( $opts{max_retries} && $message->receive_count >= $opts{max_retries} ) {
|
|
$log->warn(
|
|
sprintf(
|
|
'[%s] Message "%s" failed after %d attempts, giving up',
|
|
$class, $handle, $message->receive_count
|
|
)
|
|
);
|
|
push @completed, $handle;
|
|
}
|
|
else {
|
|
$log->warn( sprintf( '[%s] Message "%s" failed', $class, $handle ) );
|
|
push @failed, $handle;
|
|
}
|
|
|
|
# Release dedup key on failure so the task can be
|
|
# re-dispatched by the next scheduler run
|
|
if ( my $uniqkey = $message->uniqkey ) {
|
|
DW::TaskQueue::Dedup->release_unique( ref($message), $uniqkey );
|
|
}
|
|
}
|
|
}
|
|
|
|
$log->debug(
|
|
sprintf(
|
|
'[%s %0.3fs] Processed %d messages (%d failed)',
|
|
$class, $work_end_time - $work_start_time,
|
|
scalar(@$messages), scalar(@failed)
|
|
)
|
|
);
|
|
next unless @completed;
|
|
|
|
my $complete_start_time = time();
|
|
$self->completed( $class, @completed );
|
|
my $complete_time = time() - $complete_start_time;
|
|
|
|
$log->debug(
|
|
sprintf(
|
|
'[%s %0.3fs] Marked %d messages complete',
|
|
$class, $complete_time, scalar(@completed)
|
|
)
|
|
);
|
|
}
|
|
}
|
|
|
|
1;
|