409 lines
16 KiB
Text
409 lines
16 KiB
Text
|
|
#!/usr/bin/perl
|
||
|
|
#
|
||
|
|
# bin/worker/sphinx-copier
|
||
|
|
#
|
||
|
|
# Responsible for ensuring a user is up to date when they make a new post or
|
||
|
|
# edit an existing one.
|
||
|
|
#
|
||
|
|
# Authors:
|
||
|
|
# Mark Smith <mark@dreamwidth.org>
|
||
|
|
#
|
||
|
|
# Copyright (c) 2009-2013 by Dreamwidth Studios, LLC.
|
||
|
|
#
|
||
|
|
# This program is free software; you may redistribute it and/or modify it under
|
||
|
|
# the same terms as Perl itself. For a copy of the license, please reference
|
||
|
|
# 'perldoc perlartistic' or 'perldoc perlgpl'.
|
||
|
|
#
|
||
|
|
|
||
|
|
use strict;
|
||
|
|
BEGIN {
|
||
|
|
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
|
||
|
|
}
|
||
|
|
|
||
|
|
use LJ::Worker::TheSchwartz;
|
||
|
|
|
||
|
|
schwartz_decl( 'DW::Worker::Sphinx::Copier' );
|
||
|
|
schwartz_work();
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
package DW::Worker::Sphinx::Copier;
|
||
|
|
use base 'TheSchwartz::Worker';
|
||
|
|
use Encode;
|
||
|
|
use Carp qw/ croak /;
|
||
|
|
|
||
|
|
sub sphinx_db {
|
||
|
|
my $dbsx = LJ::get_dbh( 'sphinx_search' )
|
||
|
|
or croak "Unable to connect to Sphinx search database.";
|
||
|
|
|
||
|
|
# We have to use utf8 when we write to the db; Sphinx requires that data
|
||
|
|
# actually be properly encoded.
|
||
|
|
$dbsx->do( q{SET NAMES 'utf8'} );
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
return $dbsx;
|
||
|
|
}
|
||
|
|
|
||
|
|
sub work {
|
||
|
|
my ( $class, $job ) = @_;
|
||
|
|
my $a = $job->arg;
|
||
|
|
$job->grabbed_until( time() + 3600*12 );
|
||
|
|
$job->save;
|
||
|
|
|
||
|
|
my $u = LJ::load_userid( $a->{userid} )
|
||
|
|
or croak "Invalid userid: $a->{userid}.";
|
||
|
|
warn "[$$] Sphinx copier started for " . $u->user . "(" . $u->id . "), source " .
|
||
|
|
( $a->{source} // 'unknown' ) . ".\n";
|
||
|
|
return $job->completed unless $u->is_person || $u->is_community;
|
||
|
|
|
||
|
|
# annotate copier is going
|
||
|
|
$0 = sprintf( 'sphinx-copier [%s(%d) - %d, %d]', $u->user, $u->id,
|
||
|
|
$a->{jitemid} // 0, $a->{jtalkid} // 0 );
|
||
|
|
|
||
|
|
# We copy comments for paid users, allowing them to search through the
|
||
|
|
# comments to their journal.
|
||
|
|
my $copy_comments = $u->is_paid ? 1 : 0;
|
||
|
|
|
||
|
|
# There are several modes. Either we can do a full import (no arguments),
|
||
|
|
# we can import a particular entry (edited or something), or we can import
|
||
|
|
# a specific comment (again, edited or posted).
|
||
|
|
#
|
||
|
|
# These are all best effort. If they fail, we don't do anything fancy and
|
||
|
|
# assume that the next time the user posts or edits something, we'll end
|
||
|
|
# up fixing up whatever was forgotten.
|
||
|
|
if ( exists $a->{jitemid} ) {
|
||
|
|
warn "[$$] * Requested copy of only entry $a->{jitemid}.\n";
|
||
|
|
copy_entry( $u, $a->{jitemid}, !$copy_comments );
|
||
|
|
} elsif ( exists $a->{jtalkid} ) {
|
||
|
|
warn "[$$] * Requested copy of only comment $a->{jtalkid}.\n";
|
||
|
|
copy_comment( $u, $a->{jtalkid} ) if $copy_comments;
|
||
|
|
} else {
|
||
|
|
warn "[$$] * Requested complete recopy of user.\n";
|
||
|
|
my $time = LJ::MemCache::get( [ $u->id, "sphinx-copy-full2:" . $u->id ] );
|
||
|
|
if ( $time > time() - 86400 ) {
|
||
|
|
warn "[$$] * Copied less than a day ago. Skipping.\n";
|
||
|
|
$job->completed;
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
LJ::MemCache::set( [ $u->id, "sphinx-copy-full2:" . $u->id ], time() );
|
||
|
|
copy_entry( $u, undef, 1 );
|
||
|
|
copy_comment( $u ) if $copy_comments;
|
||
|
|
}
|
||
|
|
|
||
|
|
$0 = 'sphinx-copier [bored]';
|
||
|
|
$job->completed;
|
||
|
|
|
||
|
|
# If memory usage is over 300MB after a job finishes, terminate.
|
||
|
|
my $rssmb = LJ::gtop()->proc_mem($$)->resident/1024/1024;
|
||
|
|
exit 0 if $rssmb > 300;
|
||
|
|
}
|
||
|
|
|
||
|
|
sub copy_comment {
|
||
|
|
my ( $u, $only_jtalkid ) = @_;
|
||
|
|
my $dbsx = sphinx_db()
|
||
|
|
or croak "Sphinx database not available.";
|
||
|
|
my $dbfrom = LJ::get_cluster_master( $u->clusterid )
|
||
|
|
or croak "User cluster master not available.";
|
||
|
|
|
||
|
|
# If the parameter is not an arrayref, then make it one if it's defined.
|
||
|
|
$only_jtalkid = [ $only_jtalkid ]
|
||
|
|
if defined $only_jtalkid && !ref $only_jtalkid;
|
||
|
|
|
||
|
|
# A full comment import. We slice it by 1000 comment groups to make the
|
||
|
|
# memory usage something that isn't insane. (This codepath exists because
|
||
|
|
# of cfud and sixwordstories. Congrats!)
|
||
|
|
if ( !defined $only_jtalkid ) {
|
||
|
|
my $maxid = $dbfrom->selectrow_array(
|
||
|
|
'SELECT MAX(jtalkid) FROM talk2 WHERE journalid = ?',
|
||
|
|
undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
|
||
|
|
my $n = 0;
|
||
|
|
while ( $n < $maxid ) {
|
||
|
|
my $m = $n + 1000;
|
||
|
|
$m = $maxid if $m > $maxid;
|
||
|
|
|
||
|
|
$0 = sprintf( 'sphinx-copier [%s(%d) - comments: %0.2f%% of %d]',
|
||
|
|
$u->user, $u->id, $m / $maxid * 100, $maxid );
|
||
|
|
copy_comment( $u, [ $n+1..$m ] );
|
||
|
|
$n = $m;
|
||
|
|
}
|
||
|
|
warn "[$$] Done with mass-copy.\n";
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
my ( $entries, $comments );
|
||
|
|
my ( @copy_jitemids, @delete_jtalkids );
|
||
|
|
my $allowpublic = $u->include_in_global_search ? 1 : 0;
|
||
|
|
|
||
|
|
my $in = join ',', @$only_jtalkid;
|
||
|
|
$comments = $dbfrom->selectall_hashref(
|
||
|
|
qq{SELECT jtalkid, nodeid, state, posterid, UNIX_TIMESTAMP(datepost) AS 'datepost'
|
||
|
|
FROM talk2 WHERE journalid = ? AND jtalkid IN ($in)},
|
||
|
|
'jtalkid', undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
return unless ref $comments eq 'HASH' && %$comments;
|
||
|
|
|
||
|
|
# Now we have some comments, get data we need to build security for the
|
||
|
|
# entries we're going to use.
|
||
|
|
{
|
||
|
|
my %jitemids;
|
||
|
|
$jitemids{$comments->{$_}->{nodeid}} = 1 foreach keys %$comments;
|
||
|
|
my $inlist = join(',', map { '?' } keys %jitemids);
|
||
|
|
$entries = $dbfrom->selectall_hashref(
|
||
|
|
qq{SELECT jitemid, security, allowmask FROM log2
|
||
|
|
WHERE journalid = ? AND jitemid IN ($inlist)},
|
||
|
|
'jitemid', undef, $u->id, keys %jitemids
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
|
||
|
|
foreach my $row ( values %$entries ) {
|
||
|
|
# Auto-convert usemask-with-no-groups to private.
|
||
|
|
$row->{security} = 'private'
|
||
|
|
if $row->{security} eq 'usemask' && $row->{allowmask} == 0;
|
||
|
|
|
||
|
|
# We need extra security bits for some metadata. We have to do this this way because
|
||
|
|
# it makes it easier to later do searches on various combinations of things at the same
|
||
|
|
# time... Also, even though these are bits, we're not going to ever use them as actual bits.
|
||
|
|
my @extrabits;
|
||
|
|
push @extrabits, 101 if $row->{security} eq 'private';
|
||
|
|
push @extrabits, 102 if $row->{security} eq 'public';
|
||
|
|
|
||
|
|
# have to do some more munging
|
||
|
|
$row->{allowmask} = join ',', LJ::bit_breakdown( $row->{allowmask} ), @extrabits;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# Comment loop.
|
||
|
|
my @jtalkids;
|
||
|
|
foreach my $jtalkid ( keys %$comments ) {
|
||
|
|
my $state = $comments->{$jtalkid}->{state};
|
||
|
|
my $force_private = 0; # Override security to private.
|
||
|
|
|
||
|
|
if ( $state eq 'D' ) {
|
||
|
|
push @delete_jtalkids, int($jtalkid);
|
||
|
|
next;
|
||
|
|
} elsif ( $state eq 'S' || ( $state ne 'A' && $state ne 'F' ) ) {
|
||
|
|
# If it's screened or in an unexpected state, make it private so
|
||
|
|
# only owners can see it.
|
||
|
|
$force_private = 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
push @jtalkids, [ $jtalkid, $force_private ];
|
||
|
|
}
|
||
|
|
|
||
|
|
while (my @items = splice(@jtalkids, 0, 1000)) {
|
||
|
|
last unless @items;
|
||
|
|
|
||
|
|
my @l_jtalkids = map { $_->[0] } @items;
|
||
|
|
my %private = map { $_->[0] => $_->[1] } @items;
|
||
|
|
my $in = join ',', @l_jtalkids;
|
||
|
|
|
||
|
|
my $text = $dbfrom->selectall_hashref(
|
||
|
|
qq{SELECT jtalkid, subject, body
|
||
|
|
FROM talktext2 WHERE journalid = ? AND jtalkid IN ($in)},
|
||
|
|
'jtalkid', undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
|
||
|
|
foreach my $jtd ( keys %$text ) {
|
||
|
|
my ( $subj, $body ) = ( $text->{$jtd}->{subject}, $text->{$jtd}->{body} );
|
||
|
|
LJ::text_uncompress( \$subj );
|
||
|
|
$text->{$jtd}->{subject} = Encode::decode( 'utf8', $subj );
|
||
|
|
LJ::text_uncompress( \$body );
|
||
|
|
$text->{$jtd}->{body} = Encode::decode( 'utf8', $body );
|
||
|
|
}
|
||
|
|
|
||
|
|
my $old_ids = $dbsx->selectall_hashref(
|
||
|
|
qq{SELECT jtalkid, id FROM items_raw WHERE journalid = ? AND jtalkid IN ($in)},
|
||
|
|
'jtalkid', undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
foreach my $jid ( keys %$text ) {
|
||
|
|
my $allowmask = $entries->{$comments->{$jid}->{nodeid}}->{allowmask} // '101';
|
||
|
|
$allowmask = '101' if $private{$jid};
|
||
|
|
|
||
|
|
my $id = $old_ids->{$jid}->{id} || LJ::alloc_global_counter('X');
|
||
|
|
$dbsx->do(
|
||
|
|
q{REPLACE INTO items_raw (id, journalid, jtalkid, jitemid, poster_id,
|
||
|
|
date_posted, title, data, security_bits, allow_global_search, touchtime)
|
||
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, COMPRESS(?), ?, ?, UNIX_TIMESTAMP())},
|
||
|
|
undef, $id, $u->id, $jid,
|
||
|
|
(map { $comments->{$jid}->{$_} } qw/ nodeid posterid datepost /),
|
||
|
|
$text->{$jid}->{subject}, $text->{$jid}->{body}, $allowmask, $allowpublic,
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
# let the viewer know what they missed
|
||
|
|
# warn "[$$] Inserted comment #$jid for " . $u->user . "(" . $u->id . ") as Sphinx id $id.\n";
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# deletes are easy...
|
||
|
|
if ( @delete_jtalkids ) {
|
||
|
|
my $ct = $dbsx->do( 'DELETE FROM items_raw WHERE journalid = ? AND jtalkid IN (' .
|
||
|
|
join( ',', @delete_jtalkids ) . ')', undef, $u->id ) + 0;
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
warn "[$$] Actually deleted $ct comments.\n" if $ct > 0;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
sub copy_entry {
|
||
|
|
my ( $u, $only_jitemid, $skip_comments ) = @_;
|
||
|
|
my $dbsx = sphinx_db()
|
||
|
|
or croak "Sphinx database not available.";
|
||
|
|
my $dbfrom = LJ::get_cluster_master( $u->clusterid )
|
||
|
|
or croak "User cluster master not available.";
|
||
|
|
|
||
|
|
# If we're being asked to look at one post, that simplifies our processing
|
||
|
|
# quite a bit.
|
||
|
|
my ( $sphinx_times, $db_times, %comment_jitemids );
|
||
|
|
my ( @copy_jitemids, @delete_jitemids, %sphinx_ids );
|
||
|
|
|
||
|
|
if ( $only_jitemid ) {
|
||
|
|
$sphinx_times = $dbsx->selectall_hashref(
|
||
|
|
'SELECT id, jitemid FROM items_raw WHERE journalid = ? AND jitemid = ? AND jtalkid = 0',
|
||
|
|
'jitemid', undef, $u->id, $only_jitemid
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
$db_times = $dbfrom->selectall_hashref(
|
||
|
|
q{SELECT jitemid, UNIX_TIMESTAMP(logtime) AS 'createtime'
|
||
|
|
FROM log2 WHERE journalid = ? AND jitemid = ?},
|
||
|
|
'jitemid', undef, $u->id, $only_jitemid
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
} else {
|
||
|
|
$sphinx_times = $dbsx->selectall_hashref(
|
||
|
|
'SELECT id, jitemid FROM items_raw WHERE journalid = ? AND jtalkid = 0',
|
||
|
|
'jitemid', undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
$db_times = $dbfrom->selectall_hashref(
|
||
|
|
q{SELECT jitemid, UNIX_TIMESTAMP(logtime) AS 'createtime'
|
||
|
|
FROM log2 WHERE journalid = ?},
|
||
|
|
'jitemid', undef, $u->id
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
}
|
||
|
|
|
||
|
|
# This mostly just keeps track of the internal Sphinx document ID. We need to
|
||
|
|
# keep that as stable as we can.
|
||
|
|
foreach my $jitemid ( keys %$db_times ) {
|
||
|
|
$sphinx_ids{$jitemid} = $sphinx_times->{$jitemid}->{id}
|
||
|
|
if exists $sphinx_times->{$jitemid};
|
||
|
|
push @copy_jitemids, $jitemid;
|
||
|
|
$comment_jitemids{$jitemid} = 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
# now find deleted posts
|
||
|
|
foreach my $jitemid ( keys %$sphinx_times ) {
|
||
|
|
next if exists $db_times->{$jitemid};
|
||
|
|
|
||
|
|
warn "[$$] Deleting post #$jitemid.\n";
|
||
|
|
push @delete_jitemids, $jitemid;
|
||
|
|
$comment_jitemids{$jitemid} = 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
# deletes are easy...
|
||
|
|
if ( @delete_jitemids ) {
|
||
|
|
my $ct = $dbsx->do( 'DELETE FROM items_raw WHERE journalid = ? AND jtalkid = 0 AND jitemid IN (' .
|
||
|
|
join( ',', @delete_jitemids ) . ')', undef, $u->id ) + 0;
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
warn "[$$] Actually deleted $ct posts.\n";
|
||
|
|
}
|
||
|
|
|
||
|
|
# now to copy entries. this is not done enmasse since the major case will be after a user
|
||
|
|
# already has most of their posts copied and they are just updating one or two.
|
||
|
|
foreach my $jitemid ( @copy_jitemids ) {
|
||
|
|
my $row = $dbfrom->selectrow_hashref(
|
||
|
|
q{SELECT l.journalid, l.jitemid, l.posterid, l.security, l.allowmask, l.logtime, lt.subject, lt.event
|
||
|
|
FROM log2 l INNER JOIN logtext2 lt ON (l.journalid = lt.journalid AND l.jitemid = lt.jitemid)
|
||
|
|
WHERE l.journalid = ? AND l.jitemid = ?},
|
||
|
|
undef, $u->id, $jitemid
|
||
|
|
);
|
||
|
|
croak $dbfrom->errstr if $dbfrom->err;
|
||
|
|
|
||
|
|
# just make sure, in case we don't have a corresponding logtext2 row
|
||
|
|
next unless $row;
|
||
|
|
|
||
|
|
# Auto-convert usemask-with-no-groups to private.
|
||
|
|
$row->{security} = 'private'
|
||
|
|
if $row->{security} eq 'usemask' && $row->{allowmask} == 0;
|
||
|
|
|
||
|
|
# we need extra security bits for some metadata. we have to do this this way because
|
||
|
|
# it makes it easier to later do searches on various combinations of things at the same
|
||
|
|
# time... also, even though these are bits, we're not going to ever use them as actual bits.
|
||
|
|
my @extrabits;
|
||
|
|
push @extrabits, 101 if $row->{security} eq 'private';
|
||
|
|
push @extrabits, 102 if $row->{security} eq 'public';
|
||
|
|
|
||
|
|
# have to do some more munging
|
||
|
|
$row->{allowmask} = join ',', LJ::bit_breakdown( $row->{allowmask} ), @extrabits;
|
||
|
|
$row->{allowpublic} = $u->include_in_global_search ? 1 : 0;
|
||
|
|
|
||
|
|
# very important, the search engine can't index compressed crap...
|
||
|
|
foreach ( qw/ subject event / ) {
|
||
|
|
LJ::text_uncompress( \$row->{$_} );
|
||
|
|
|
||
|
|
# required, we store raw-bytes in our own database but the Sphinx system expects
|
||
|
|
# things to be proper UTF-8, this does it.
|
||
|
|
$row->{$_} = Encode::decode( 'utf8', $row->{$_} );
|
||
|
|
}
|
||
|
|
|
||
|
|
# insert
|
||
|
|
my $id = $sphinx_ids{$jitemid} // LJ::alloc_global_counter('X');
|
||
|
|
$dbsx->do(
|
||
|
|
q{REPLACE INTO items_raw (id, journalid, jitemid, jtalkid, poster_id,
|
||
|
|
security_bits, allow_global_search, date_posted, title, data, touchtime)
|
||
|
|
VALUES (?, ?, ?, 0, ?, ?, ?, UNIX_TIMESTAMP(?), ?, COMPRESS(?), UNIX_TIMESTAMP())},
|
||
|
|
undef, $id, map { $row->{$_} } qw/ journalid jitemid posterid
|
||
|
|
allowmask allowpublic logtime subject event /
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
# let the viewer know what they missed
|
||
|
|
#warn "[$$] Inserted post #$jitemid for " . $u->user . "(" . $u->id . ") as Sphinx id $id.\n";
|
||
|
|
}
|
||
|
|
|
||
|
|
unless ( $skip_comments ) {
|
||
|
|
my %commentids;
|
||
|
|
foreach my $jitemid ( keys %comment_jitemids ) {
|
||
|
|
# Comments we know about (we do this so that we can delete them if they've
|
||
|
|
# been removed).
|
||
|
|
my $jtalkids = $dbsx->selectcol_arrayref(
|
||
|
|
q{SELECT jtalkid FROM items_raw WHERE journalid = ? AND jitemid = ? AND jtalkid > 0},
|
||
|
|
undef, $u->id, $jitemid
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
if ( $jtalkids && ref $jtalkids eq 'ARRAY' ) {
|
||
|
|
$commentids{$_} = 1
|
||
|
|
foreach @$jtalkids;
|
||
|
|
}
|
||
|
|
|
||
|
|
# And this catches comments that we don't know about yet.
|
||
|
|
my $jtalkids2 = $dbfrom->selectcol_arrayref(
|
||
|
|
q{SELECT jtalkid FROM talk2 WHERE journalid = ? AND nodetype = 'L' AND nodeid = ?},
|
||
|
|
undef, $u->id, $jitemid
|
||
|
|
);
|
||
|
|
croak $dbsx->errstr if $dbsx->err;
|
||
|
|
|
||
|
|
if ( $jtalkids2 && ref $jtalkids2 eq 'ARRAY' ) {
|
||
|
|
$commentids{$_} = 1
|
||
|
|
foreach @$jtalkids2;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
copy_comment( $u, $_ ) foreach keys %commentids;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
sub keep_exit_status_for { 0 }
|
||
|
|
sub grab_for { 1800 }
|
||
|
|
sub max_retries { 3 }
|
||
|
|
sub retry_delay { 1800 }
|