134 lines
4.1 KiB
Perl
134 lines
4.1 KiB
Perl
|
|
#!/usr/bin/perl
|
||
|
|
#
|
||
|
|
# DW::Task::ESN::FiredEvent
|
||
|
|
#
|
||
|
|
# ESN worker that kicks off the process, called whenever an event has fired and
|
||
|
|
# enables us to do initial subscription processing.
|
||
|
|
#
|
||
|
|
# Authors:
|
||
|
|
# Mark Smith <mark@dreamwidth.org>
|
||
|
|
#
|
||
|
|
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
|
||
|
|
#
|
||
|
|
# This program is free software; you may redistribute it and/or modify it under
|
||
|
|
# the same terms as Perl itself. For a copy of the license, please reference
|
||
|
|
# 'perldoc perlartistic' or 'perldoc perlgpl'.
|
||
|
|
#
|
||
|
|
|
||
|
|
package DW::Task::ESN::FiredEvent;
|
||
|
|
|
||
|
|
use strict;
|
||
|
|
use v5.10;
|
||
|
|
use Log::Log4perl;
|
||
|
|
my $log = Log::Log4perl->get_logger(__PACKAGE__);
|
||
|
|
|
||
|
|
use DW::Task::ESN::FindSubsByCluster;
|
||
|
|
use DW::TaskQueue;
|
||
|
|
use LJ::ESN;
|
||
|
|
use LJ::Event;
|
||
|
|
|
||
|
|
use base 'DW::Task';
|
||
|
|
|
||
|
|
sub work {
|
||
|
|
my $self = $_[0];
|
||
|
|
my $a = $self->args;
|
||
|
|
|
||
|
|
my $incr = sub {
|
||
|
|
my ( $phase, $incr, $tags ) = @_;
|
||
|
|
push @{ $tags ||= [] }, "etypeid:$a->[0]";
|
||
|
|
DW::Stats::increment( 'dw.esn.firedevent.' . $phase, $incr // 1, $tags );
|
||
|
|
};
|
||
|
|
|
||
|
|
$incr->('started');
|
||
|
|
|
||
|
|
my $evt = eval { LJ::Event->new_from_raw_params(@$a) };
|
||
|
|
unless ($evt) {
|
||
|
|
$log->error( 'Failed to load event from raw params: ', join( ', ', @$a ) );
|
||
|
|
$incr->( 'failed', 1, ['err:LoadEvent'] );
|
||
|
|
return DW::Task::FAILED;
|
||
|
|
}
|
||
|
|
|
||
|
|
$evt->configure_logger;
|
||
|
|
|
||
|
|
$log->debug( 'Processing event from raw params: ', join( ', ', @$a ) );
|
||
|
|
|
||
|
|
# step 1: see if we can split this into a bunch of ProcessSub directly.
|
||
|
|
# we can only do this if A) all clusters are up, and B) subs is reasonably
|
||
|
|
# small. say, under 5,000.
|
||
|
|
my $split_per_cluster = 0; # bool: died or hit limit, split into per-cluster jobs
|
||
|
|
my @subs;
|
||
|
|
foreach my $cid (@LJ::CLUSTERS) {
|
||
|
|
my @more_subs = eval {
|
||
|
|
$evt->subscriptions(
|
||
|
|
cluster => $cid,
|
||
|
|
limit => $LJ::ESN::MAX_FILTER_SET - @subs + 1
|
||
|
|
);
|
||
|
|
};
|
||
|
|
if ($@) {
|
||
|
|
$log->debug( 'Failed scanning for subscriptions from cluster: ', $cid );
|
||
|
|
|
||
|
|
# if there were errors (say, the cluster is down), abort!
|
||
|
|
# that is, abort the fast path and we'll resort to
|
||
|
|
# per-cluster scanning
|
||
|
|
$split_per_cluster = "some_error";
|
||
|
|
last;
|
||
|
|
}
|
||
|
|
|
||
|
|
$log->debug(
|
||
|
|
sprintf( 'Found %d subscriptions from cluster %d.', scalar(@more_subs), $cid ) );
|
||
|
|
|
||
|
|
push @subs, @more_subs;
|
||
|
|
if ( @subs > $LJ::ESN::MAX_FILTER_SET ) {
|
||
|
|
$split_per_cluster = "hit_max";
|
||
|
|
last;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# If there are no subscriptions and we didn't hit an edge case, exit now
|
||
|
|
unless ( @subs || $split_per_cluster ) {
|
||
|
|
$log->debug('No subscriptions found for event.');
|
||
|
|
$incr->( 'completed', 1, ['err:NoSubsNoSplit'] );
|
||
|
|
return DW::Task::COMPLETED;
|
||
|
|
}
|
||
|
|
|
||
|
|
# this is the slow/safe/on-error/lots-of-subscribers path
|
||
|
|
my @subjobs;
|
||
|
|
if ($split_per_cluster) {
|
||
|
|
my $params = $evt->raw_params;
|
||
|
|
foreach my $cid (@LJ::CLUSTERS) {
|
||
|
|
push @subjobs, DW::Task::ESN::FindSubsByCluster->new( $cid, $params );
|
||
|
|
}
|
||
|
|
$log->debug(
|
||
|
|
sprintf(
|
||
|
|
'Slow path: exploding job into %d cluster scan jobs because: %s',
|
||
|
|
scalar(@subjobs), $split_per_cluster
|
||
|
|
)
|
||
|
|
);
|
||
|
|
}
|
||
|
|
else {
|
||
|
|
# the fast path, filter those max 5,000 subscriptions down to ones that match,
|
||
|
|
# then split right into processing those notification methods
|
||
|
|
@subjobs = LJ::ESN->tasks_of_unique_matching_subs( $evt, @subs );
|
||
|
|
$log->debug(
|
||
|
|
sprintf( 'Fast path: exploding job into %d processing jobs.', scalar(@subjobs) ) );
|
||
|
|
}
|
||
|
|
|
||
|
|
# And if those subscriptions didn't turn into actual jobs, nothing to do
|
||
|
|
unless (@subjobs) {
|
||
|
|
$log->debug('No notification jobs found for subscriptions.');
|
||
|
|
$incr->( 'completed', 1, [ 'err:NoSubsWithSplit', 'split:' . $split_per_cluster ] );
|
||
|
|
return DW::Task::COMPLETED;
|
||
|
|
}
|
||
|
|
|
||
|
|
unless ( DW::TaskQueue->send(@subjobs) ) {
|
||
|
|
$incr->( 'completed', 1, ['err:FailedSend'] );
|
||
|
|
return DW::Task::FAILED;
|
||
|
|
}
|
||
|
|
|
||
|
|
$incr->( 'completed', 1, ['err:None'] );
|
||
|
|
return DW::Task::COMPLETED;
|
||
|
|
}
|
||
|
|
|
||
|
|
1;
|
||
|
|
|