mourningdove/bin/worker/import-scheduler

168 lines
5.2 KiB
Text
Raw Permalink Normal View History

2026-05-24 01:03:05 +00:00
#!/usr/bin/perl
#
# import-scheduler
#
# This module is responsible for scheduling import jobs. This takes care of
# the hairy logic and dependencies so we don't have to do that in TheSchwartz.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2009 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#
use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}
use Time::HiRes qw/ gettimeofday tv_interval /;
use Getopt::Long;
$| = 1; # Line buffered.
my $DEBUG = 0;
sub _log {
my $txt = "[$$] " . sprintf( shift() . "\n", @_ );
if ( $DEBUG ) {
print $txt;
} else {
open FILE, ">>$ENV{LJHOME}/logs/import-scheduler.log" or die;
print FILE $txt;
close FILE;
}
return undef;
}
sub worker_helper {
eval { worker(); };
if ( my $msg = $@ ) {
$msg =~ s/\r?\n/ /g;
_log( "worker died: %s", $msg );
exit 1;
}
}
sub worker {
LJ::start_request();
my $dbh = LJ::get_db_writer()
or die "no global master database handle available";
my ( $userid, $impid ) = $dbh->selectrow_array(
q{SELECT userid, import_data_id FROM import_items WHERE status = 'ready'
ORDER BY priority DESC
LIMIT 1}
);
die $dbh->errstr . "\n" if $dbh->err;
return _log( "nothing to do" )
unless $userid;
my $u = LJ::load_userid( $userid )
or die "userid $userid doesn't exist\n";
_log( 'user %s(%d) has stuff to import', $u->user, $u->id );
# now load up and ensure this is their most recent import group
my $maxid = $dbh->selectrow_array(
q{SELECT MAX(import_data_id) FROM import_items WHERE userid = ?},
undef, $userid
);
if ( $maxid > $impid ) {
_log( 'user %s(%d) had stuff to import but it was aborted', $u->user, $u->id );
$dbh->do( q{UPDATE import_items SET status = 'aborted'
WHERE userid = ? AND import_data_id = ? AND status = 'ready'},
undef, $userid, $impid );
return;
}
my $statusrows = $dbh->selectall_arrayref(
q{SELECT item, status, created, import_data_id, priority
FROM import_items WHERE userid=?},
undef, $u->id
);
die $dbh->errstr . "\n" if $dbh->err;
die "user had no status rows!\n" unless @{ $statusrows || [] };
my @status = (
map {
{
item => $_->[0],
status => $_->[1],
created => $_->[2],
impid => $_->[3],
priority => $_->[4]
}
} @$statusrows
);
# FIXME: handle the case where the user already has an import_job going and it's
# different from the current import_data_id that we are trying to import for.
# if we don't handle this case, we're going to get pretty confused about what
# to update later...
# now find the ready jobs, dispatch them
foreach my $item ( @status ) {
next if $item->{status} ne 'ready';
_log( 'scheduling %d:%s', $item->{impid}, $item->{item} );
my $class = {
lj_bio => 'DW::Worker::ContentImporter::LiveJournal::Bio',
lj_tags => 'DW::Worker::ContentImporter::LiveJournal::Tags',
lj_entries => 'DW::Worker::ContentImporter::LiveJournal::Entries',
lj_comments => 'DW::Worker::ContentImporter::LiveJournal::Comments',
lj_userpics => 'DW::Worker::ContentImporter::LiveJournal::Userpics',
lj_friends => 'DW::Worker::ContentImporter::LiveJournal::Friends',
lj_friendgroups => 'DW::Worker::ContentImporter::LiveJournal::FriendGroups',
lj_verify => 'DW::Worker::ContentImporter::LiveJournal::Verify',
}->{ $item->{item} } || die 'unknown item ';
my $sh = LJ::theschwartz()
or die "no schwartz client\n";
my $job = TheSchwartz::Job->new(
funcname => $class,
uniqkey => join( '-', ( $item->{item}, $u->id ) ),
arg => {
userid => $u->id,
import_data_id => $item->{impid},
}
) or die "can't create job\n";
my $h = $sh->insert( $job );
unless ( $h ) {
# best guess is that this is a dupe, so abort it
_log( 'looks like they already have %s queued, aborting this one', $item->{item} );
$dbh->do( q{UPDATE import_items SET status = 'aborted'
WHERE userid = ? AND import_data_id = ? AND item = ? AND status = 'ready'},
undef, $u->id, $item->{impid}, $item->{item} );
return;
}
$u->set_prop( import_job => $item->{impid} );
$dbh->do( "UPDATE import_items SET status = 'queued', last_touch = UNIX_TIMESTAMP() " .
"WHERE userid = ? AND item = ? AND import_data_id = ?",
undef, $u->id, $item->{item}, $item->{impid} );
die $dbh->errstr . "\n" if $dbh->err;
}
}
# run the job in a loop
while ( 1 ) {
my $once = 0;
GetOptions( 'verbose' => \$DEBUG, 'once' => \$once );
worker_helper();
last if $once;
sleep 1;
}