168 lines
5.2 KiB
Text
168 lines
5.2 KiB
Text
|
|
#!/usr/bin/perl
|
||
|
|
#
|
||
|
|
# import-scheduler
|
||
|
|
#
|
||
|
|
# This module is responsible for scheduling import jobs. This takes care of
|
||
|
|
# the hairy logic and dependencies so we don't have to do that in TheSchwartz.
|
||
|
|
#
|
||
|
|
# Authors:
|
||
|
|
# Mark Smith <mark@dreamwidth.org>
|
||
|
|
#
|
||
|
|
# Copyright (c) 2009 by Dreamwidth Studios, LLC.
|
||
|
|
#
|
||
|
|
# This program is free software; you may redistribute it and/or modify it under
|
||
|
|
# the same terms as Perl itself. For a copy of the license, please reference
|
||
|
|
# 'perldoc perlartistic' or 'perldoc perlgpl'.
|
||
|
|
#
|
||
|
|
|
||
|
|
use strict;
|
||
|
|
BEGIN {
|
||
|
|
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
|
||
|
|
}
|
||
|
|
|
||
|
|
use Time::HiRes qw/ gettimeofday tv_interval /;
|
||
|
|
use Getopt::Long;
|
||
|
|
|
||
|
|
$| = 1; # Line buffered.
|
||
|
|
my $DEBUG = 0;
|
||
|
|
sub _log {
|
||
|
|
my $txt = "[$$] " . sprintf( shift() . "\n", @_ );
|
||
|
|
|
||
|
|
if ( $DEBUG ) {
|
||
|
|
print $txt;
|
||
|
|
} else {
|
||
|
|
open FILE, ">>$ENV{LJHOME}/logs/import-scheduler.log" or die;
|
||
|
|
print FILE $txt;
|
||
|
|
close FILE;
|
||
|
|
}
|
||
|
|
|
||
|
|
return undef;
|
||
|
|
}
|
||
|
|
|
||
|
|
sub worker_helper {
|
||
|
|
eval { worker(); };
|
||
|
|
if ( my $msg = $@ ) {
|
||
|
|
$msg =~ s/\r?\n/ /g;
|
||
|
|
_log( "worker died: %s", $msg );
|
||
|
|
exit 1;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
sub worker {
|
||
|
|
LJ::start_request();
|
||
|
|
|
||
|
|
my $dbh = LJ::get_db_writer()
|
||
|
|
or die "no global master database handle available";
|
||
|
|
|
||
|
|
my ( $userid, $impid ) = $dbh->selectrow_array(
|
||
|
|
q{SELECT userid, import_data_id FROM import_items WHERE status = 'ready'
|
||
|
|
ORDER BY priority DESC
|
||
|
|
LIMIT 1}
|
||
|
|
);
|
||
|
|
die $dbh->errstr . "\n" if $dbh->err;
|
||
|
|
|
||
|
|
return _log( "nothing to do" )
|
||
|
|
unless $userid;
|
||
|
|
|
||
|
|
my $u = LJ::load_userid( $userid )
|
||
|
|
or die "userid $userid doesn't exist\n";
|
||
|
|
_log( 'user %s(%d) has stuff to import', $u->user, $u->id );
|
||
|
|
|
||
|
|
# now load up and ensure this is their most recent import group
|
||
|
|
my $maxid = $dbh->selectrow_array(
|
||
|
|
q{SELECT MAX(import_data_id) FROM import_items WHERE userid = ?},
|
||
|
|
undef, $userid
|
||
|
|
);
|
||
|
|
if ( $maxid > $impid ) {
|
||
|
|
_log( 'user %s(%d) had stuff to import but it was aborted', $u->user, $u->id );
|
||
|
|
$dbh->do( q{UPDATE import_items SET status = 'aborted'
|
||
|
|
WHERE userid = ? AND import_data_id = ? AND status = 'ready'},
|
||
|
|
undef, $userid, $impid );
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
my $statusrows = $dbh->selectall_arrayref(
|
||
|
|
q{SELECT item, status, created, import_data_id, priority
|
||
|
|
FROM import_items WHERE userid=?},
|
||
|
|
undef, $u->id
|
||
|
|
);
|
||
|
|
die $dbh->errstr . "\n" if $dbh->err;
|
||
|
|
die "user had no status rows!\n" unless @{ $statusrows || [] };
|
||
|
|
|
||
|
|
my @status = (
|
||
|
|
map {
|
||
|
|
{
|
||
|
|
item => $_->[0],
|
||
|
|
status => $_->[1],
|
||
|
|
created => $_->[2],
|
||
|
|
impid => $_->[3],
|
||
|
|
priority => $_->[4]
|
||
|
|
}
|
||
|
|
} @$statusrows
|
||
|
|
);
|
||
|
|
|
||
|
|
# FIXME: handle the case where the user already has an import_job going and it's
|
||
|
|
# different from the current import_data_id that we are trying to import for.
|
||
|
|
# if we don't handle this case, we're going to get pretty confused about what
|
||
|
|
# to update later...
|
||
|
|
|
||
|
|
# now find the ready jobs, dispatch them
|
||
|
|
foreach my $item ( @status ) {
|
||
|
|
|
||
|
|
next if $item->{status} ne 'ready';
|
||
|
|
|
||
|
|
_log( 'scheduling %d:%s', $item->{impid}, $item->{item} );
|
||
|
|
|
||
|
|
my $class = {
|
||
|
|
lj_bio => 'DW::Worker::ContentImporter::LiveJournal::Bio',
|
||
|
|
lj_tags => 'DW::Worker::ContentImporter::LiveJournal::Tags',
|
||
|
|
lj_entries => 'DW::Worker::ContentImporter::LiveJournal::Entries',
|
||
|
|
lj_comments => 'DW::Worker::ContentImporter::LiveJournal::Comments',
|
||
|
|
lj_userpics => 'DW::Worker::ContentImporter::LiveJournal::Userpics',
|
||
|
|
lj_friends => 'DW::Worker::ContentImporter::LiveJournal::Friends',
|
||
|
|
lj_friendgroups => 'DW::Worker::ContentImporter::LiveJournal::FriendGroups',
|
||
|
|
lj_verify => 'DW::Worker::ContentImporter::LiveJournal::Verify',
|
||
|
|
}->{ $item->{item} } || die 'unknown item ';
|
||
|
|
|
||
|
|
my $sh = LJ::theschwartz()
|
||
|
|
or die "no schwartz client\n";
|
||
|
|
|
||
|
|
my $job = TheSchwartz::Job->new(
|
||
|
|
funcname => $class,
|
||
|
|
uniqkey => join( '-', ( $item->{item}, $u->id ) ),
|
||
|
|
arg => {
|
||
|
|
userid => $u->id,
|
||
|
|
import_data_id => $item->{impid},
|
||
|
|
}
|
||
|
|
) or die "can't create job\n";
|
||
|
|
|
||
|
|
my $h = $sh->insert( $job );
|
||
|
|
unless ( $h ) {
|
||
|
|
# best guess is that this is a dupe, so abort it
|
||
|
|
_log( 'looks like they already have %s queued, aborting this one', $item->{item} );
|
||
|
|
$dbh->do( q{UPDATE import_items SET status = 'aborted'
|
||
|
|
WHERE userid = ? AND import_data_id = ? AND item = ? AND status = 'ready'},
|
||
|
|
undef, $u->id, $item->{impid}, $item->{item} );
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
$u->set_prop( import_job => $item->{impid} );
|
||
|
|
|
||
|
|
$dbh->do( "UPDATE import_items SET status = 'queued', last_touch = UNIX_TIMESTAMP() " .
|
||
|
|
"WHERE userid = ? AND item = ? AND import_data_id = ?",
|
||
|
|
undef, $u->id, $item->{item}, $item->{impid} );
|
||
|
|
die $dbh->errstr . "\n" if $dbh->err;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# run the job in a loop
|
||
|
|
while ( 1 ) {
|
||
|
|
my $once = 0;
|
||
|
|
GetOptions( 'verbose' => \$DEBUG, 'once' => \$once );
|
||
|
|
|
||
|
|
worker_helper();
|
||
|
|
last if $once;
|
||
|
|
|
||
|
|
sleep 1;
|
||
|
|
}
|