mourningdove/cgi-bin/DW/Worker/ContentImporter.pm

318 lines
8.3 KiB
Perl
Raw Permalink Normal View History

2026-05-24 01:03:05 +00:00
#!/usr/bin/perl
#
# DW::Worker::ContentImporter
#
# Generic helper functions for Content Importers
#
# Authors:
# Andrea Nall <anall@andreanall.com>
#
# Copyright (c) 2009 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#
package DW::Worker::ContentImporter;
=head1 NAME
DW::Worker::ContentImporter - Generic helper functions for Content Importers
=cut
use strict;
use v5.10;
use Log::Log4perl;
my $log = Log::Log4perl->get_logger(__PACKAGE__);
use Time::HiRes qw/ sleep time /;
use Carp qw/ croak confess /;
use Storable;
use LWP::UserAgent;
use XMLRPC::Lite;
use Digest::MD5 qw/ md5_hex /;
use LJ::Protocol;
use LJ::Talk;
use base 'TheSchwartz::Worker';
=head1 Saving API
All Saving API functions take as the first two options the target user
option, followed by a consistent hashref passed to every function.
=head2 C<< $class->merge_trust( $user, $hashref, $friends ) >>
$friends is a reference to an array of hashrefs, with each hashref with the following format:
{
userid => ..., # local userid of the friend
groupmask => 1, # groupmask
}
=cut
sub merge_trust {
my ( $class, $u, $opts, $friends ) = @_;
foreach my $friend (@$friends) {
my $to_u = LJ::load_userid( $friend->{userid} );
$u->add_edge( $to_u, trust => { mask => $friend->{groupmask}, nonotify => 1, } );
}
}
=head2 C<< $class->merge_watch( $user, $hashref, $friends ) >>
$friends is a reference to an array of hashrefs, with each hashref with the following format:
{
userid => ..., # local userid of the friend
fgcolor => '#ff0000', # foreground color
bgcolor => '#00ff00', # background color
}
=cut
sub merge_watch {
my ( $class, $u, $opts, $friends ) = @_;
foreach my $friend (@$friends) {
my $to_u = LJ::load_userid( $friend->{userid} );
$u->add_edge(
$to_u,
watch => {
nonotify => 1,
fgcolor => LJ::color_todb( $friend->{fgcolor} ),
bgcolor => LJ::color_todb( $friend->{bgcolor} ),
}
);
}
}
=head1 Helper Functions
=head2 C<< $class->import_data( $userid, $import_data_id ) >>
Returns a hash of the data we're using as source.
=cut
sub import_data {
my ( $class, $userid, $impid ) = @_;
my $dbh = LJ::get_db_writer()
or croak 'unable to get global database master';
my $hr = $dbh->selectrow_hashref(
'SELECT userid, hostname, username, usejournal, password_md5, import_data_id, options '
. 'FROM import_data WHERE userid = ? AND import_data_id = ?',
undef, $userid, $impid
);
croak $dbh->errstr if $dbh->err;
$hr->{options} = Storable::thaw( $hr->{options} ) || {}
if $hr && $hr->{options};
return $hr;
}
=head2 C<< $class->userids_to_message( $userid ) >>
For communities, this returns the userids for all of the admins so we let them know what has
happened with their import.
=cut
sub userids_to_message {
my ( $class, $uid ) = @_;
my $u = LJ::load_userid($uid)
or return $uid; # fail?
return $uid unless $u->is_community;
return $u->maintainer_userids;
}
=head2 C<< $class->_should_exit( $message ) >>
Determine whether or not this failure message should be considered to be more than just a
job failure. I.e., whether we should exit the worker to try to get a new IP.
=cut
sub _should_exit {
my ( $class, $message ) = @_;
return 1 if $message =~ /Failed to connect to the server/i;
return 0;
}
=head2 C<< $class->fail( $import_data, $item, $job, "text", [arguments, ...] ) >>
Permanently fail this import job.
=cut
sub fail {
my ( $class, $imp, $item, $job, $msgt, @args ) = @_;
$0 = 'content-importer [bored]';
# clear "request" cache of db handles to force revalidation in case one we need now
# has been idle during a long import
$LJ::DBIRole->clear_req_cache();
if ( my $dbh = LJ::get_db_writer() ) {
$dbh->do(
"UPDATE import_items SET status = 'failed', last_touch = UNIX_TIMESTAMP() "
. "WHERE userid = ? AND item = ? AND import_data_id = ?",
undef, $imp->{userid}, $item, $imp->{import_data_id}
);
warn "IMPORTER ERROR: " . $dbh->errstr . "\n" if $dbh->err;
}
my $msg = sprintf( $msgt, @args );
warn "Permanent failure: $msg\n"
if $LJ::IS_DEV_SERVER;
# fire an event for the user to know that it failed
foreach my $uid ( $class->userids_to_message( $imp->{userid} ) ) {
LJ::Event::ImportStatus->new( $uid, $item, { type => 'fail', msg => $msg } )->fire;
}
$job->permanent_failure($msg);
if ( $class->_should_exit($msg) ) {
$log->fatal('Important failure: exiting worker.');
open FILE, '>', "$LJ::VAR/$$.please_die" or exit 2;
close FILE;
}
return;
}
=head2 C<< $class->temp_fail( $job, "text", [arguments, ...] ) >>
Temporarily fail this import job, it will get retried if it hasn't failed too many times.
=cut
sub temp_fail {
my ( $class, $imp, $item, $job, $msgt, @args ) = @_;
$0 = 'content-importer [bored]';
# clear "request" cache of db handles to force revalidation in case one we need now
# has been idle during a long import
$LJ::DBIRole->clear_req_cache();
# Check if we are out of failures
my $max_fails = $class->max_retries;
my $this_fail = $job->failures + 1; # Add this failure on.
return $class->fail( $imp, $item, $job, $msgt, @args ) if $this_fail >= $max_fails;
my $msg = sprintf( $msgt, @args );
warn "Temporary failure: $msg\n"
if $LJ::IS_DEV_SERVER;
# fire an event for the user to know that it failed (temporarily)
foreach my $uid ( $class->userids_to_message( $imp->{userid} ) ) {
LJ::Event::ImportStatus->new(
$uid, $item,
{
type => 'temp_fail',
msg => $msg,
failures => $job->failures,
retries => $job->funcname->max_retries,
}
)->fire;
}
$job->failed($msg);
if ( $class->_should_exit($msg) ) {
$log->fatal('Important failure: exiting worker.');
open FILE, '>', "$LJ::VAR/$$.please_die" or exit 2;
close FILE;
}
return;
}
=head2 C<< $class->ok( $import_data, $item, $job, $show ) >>
Successfully end this import job.
=cut
sub ok {
my ( $class, $imp, $item, $job, $show ) = @_;
$0 = 'content-importer [bored]';
# clear "request" cache of db handles to force revalidation in case one we need now
# has been idle during a long import
$LJ::DBIRole->clear_req_cache();
if ( my $dbh = LJ::get_db_writer() ) {
$dbh->do(
"UPDATE import_items SET status = 'succeeded', last_touch = UNIX_TIMESTAMP() "
. "WHERE userid = ? AND item = ? AND import_data_id = ?",
undef, $imp->{userid}, $item, $imp->{import_data_id}
);
warn "IMPORTER ERROR: " . $dbh->errstr . "\n" if $dbh->err;
}
# advise the user this finished
unless ( defined $show && $show == 0 ) {
foreach my $uid ( $class->userids_to_message( $imp->{userid} ) ) {
LJ::Event::ImportStatus->new( $uid, $item, { type => 'ok' } )->fire;
}
}
$job->completed;
return;
}
=head2 C<< $class->decline( $job, %opts ) >>
Decline to process the job for now. Will retry again later. (Does not count against the maximum number of retries)
=cut
sub decline {
my ( $class, $job, %opts ) = @_;
$opts{delay} ||= 3600 * 24;
$job->run_after( time() + $opts{delay} );
$job->declined(1);
$job->save();
return;
}
=head2 C<< $class->enabled( $data ) >>
Check whether this import source is enabled.
=cut
sub enabled {
my ( $class, $data ) = @_;
return LJ::is_enabled( "importing", $data->{hostname} );
}
=head2 C<< $class->status( $import_data, $item, $args ) >>
This creates an LJ::Event::ImportStatus item for the user to look at. Note that $args
is a hashref that is passed straight through in the item.
=cut
sub status {
my ( $class, $imp, $item, $args ) = @_;
foreach my $uid ( $class->userids_to_message( $imp->{userid} ) ) {
LJ::Event::ImportStatus->new( $uid, $item, { type => 'status', %{ $args || {} } } )->fire;
}
}
1;