mourningdove/bin/worker/expunge-users

222 lines
6 KiB
Text
Raw Permalink Normal View History

2026-05-24 01:03:05 +00:00
#!/usr/bin/perl
# This code was forked from the LiveJournal project owned and operated
# by Live Journal, Inc. The code has been modified and expanded by
# Dreamwidth Studios, LLC. These files were originally licensed under
# the terms of the license supplied by Live Journal, Inc, which can
# currently be found at:
#
# http://code.livejournal.org/trac/livejournal/browser/trunk/LICENSE-LiveJournal.txt
#
# In accordance with the original license, this code and all its
# modifications are provided under the GNU General Public License.
# A copy of that license can be found in the LICENSE file included as
# part of this distribution.
package LJ::Worker::ExpungeUsers;
use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}
use base 'LJ::Worker::Manual';
my $days_before_expunge = 60; # 60 days before expiration
my $lock_name = "worker_expunge_user_query";
my $sleep_when_idle = 10;
my $sleep_for_lock = 3;
my $target_queue_depth = 100; # try to keep queue depth near this number
my $enqueue_pct = 0.3; # percentage of target queue depth at which we should enqueue more jobs
my $cache_sock; # [ $sock, connected_at ]
die "No \$LJ::MOVE_SERVER defined\n"
unless $LJ::MOVE_SERVER;
sub debug {
LJ::Worker::Manual->cond_debug(@_);
}
sub work {
my $class = shift;
my @uids = @_;
# is the job server free enough that we should enqueue more jobs?
debug("starting work");
return 0 unless should_enqueue();
# load userids from db
my $lock = get_lock();
debug("got lock: $lock_name");
# inside our lock, still need to add more?
return 0 unless should_enqueue();
# load users for expunge
my @users = load_expungable_users(@uids);
# nothing to do?
debug("got users: " . scalar @users);
return 0 unless @users;
# register jobs with the move server
my $rv = register_expunge_jobs(@users);
return 0 unless $rv;
# lock is released when it falls out of scope
debug("releasing lock: $lock_name");
return 1;
}
sub on_idle {
debug("sleeping while idle");
return sleep $sleep_when_idle;
}
################################################################################
# Utility Functions
#
sub get_lock {
my $lock = undef;
while (! $lock) {
debug("trying lock: $lock_name");
$lock = LJ::locker()->trylock($lock_name);
last if $lock;
sleep $sleep_for_lock;
}
return $lock;
}
sub load_expungable_users {
my @uids = @_;
my $dbr = LJ::get_db_reader()
or die "unable to contact global db reader";
unless (@uids) {
debug("querying within $days_before_expunge days");
my $sth = $dbr->prepare
("SELECT userid FROM user WHERE clusterid>0 AND statusvis='D' AND journaltype IN ('P','C') " .
"AND statusvisdate < NOW() - INTERVAL $days_before_expunge DAY LIMIT $target_queue_depth");
$sth->execute;
die "Error selecting users for expunge: " . $dbr->errstr if $dbr->err;
while (my $uid = $sth->fetchrow_array) {
push @uids, $uid;
}
}
# load users for move
my $us = LJ::load_userids(@uids);
die "Unable to load user objects" unless $us;
# filter out users that we can't expunge
@uids = grep { $us->{$_} && $us->{$_}->can_expunge } @uids;
return map { $us->{$_} } @uids;
}
sub get_server_sock {
if ($cache_sock) {
my ($sock, $at) = @$cache_sock;
return $sock if time() < $at + 60;
# invalidate cache
$sock->close;
undef $cache_sock;
}
debug("connecting move server: $LJ::MOVE_SERVER");
my $sock = IO::Socket::INET->new(PeerAddr => $LJ::MOVE_SERVER,
Proto => 'tcp')
or die "Unable to contact move servers: $LJ::MOVE_SERVER";
# save sock in cache
$cache_sock = [ $sock, time() ];
return $sock;
}
sub register_expunge_jobs {
my @users = @_;
my $sock = get_server_sock();
my $before_ct = outstanding_job_ct($sock);
my @jobs = (); # "uid:src_cid:dest_cid opt1=1 opt2=1", ...
foreach my $u (@users) {
push @jobs, join(" ", join(":", $u->id, $u->clusterid, "0"), "delete=1", "expungedel=1");
}
debug("expunging users: " . join(",", map { $_->{user} } @users));
my $job_cmd = "add_jobs " . join(", ", @jobs);
$sock->send("$job_cmd\r\n");
# look for a valid response
while (my $res = <$sock>) {
last if $res =~ /^END/;
}
my $after_ct = outstanding_job_ct();
# if the delta between before_ct and after_ct is at least n% of
# what we tried to insert, then we know new non-dup jobs went in,
# so let's return 1 to signal that there are non-dups being worked on
my $delta = $after_ct - $before_ct;
my $required_delta_pct = 1 - $enqueue_pct;
my $required_delta = $target_queue_depth * $required_delta_pct;
debug("registered expunge jobs: want_pct=$required_delta_pct, delta=$delta, want_delta=$required_delta");
return $delta > $required_delta ? 1 : 0;
}
sub outstanding_job_ct {
my $sock = get_server_sock();
my $list_cmd = "list_jobs";
$sock->send("$list_cmd\r\n");
# look for a valid response
my $in_queue = 0;
while (my $res = <$sock>) {
if ($res =~ /(\d+) queued jobs/) {
$in_queue = $1;
next;
}
last if $res =~ /^END/;
}
return $in_queue;
}
sub should_enqueue {
my $in_queue = outstanding_job_ct();
# if the number of jobs in queue is less than n% of the target
# queue depth, then we'll enqueue more
my $enqueue_at = $target_queue_depth * $enqueue_pct;
debug("in_queue: $in_queue, enqueue_at: $enqueue_at, target_depth: $target_queue_depth");
return $in_queue < $enqueue_at ? 1 : 0;
}
################################################################################
# in this mode, it's just debug... don't write to db.
if (@ARGV) {
my $user = shift;
my $u = LJ::load_user($user);
__PACKAGE__->work($u->{userid});
} else {
__PACKAGE__->run();
}