#!/usr/bin/perl
# This code was forked from the LiveJournal project owned and operated
# by Live Journal, Inc. The code has been modified and expanded by 
# Dreamwidth Studios, LLC. These files were originally licensed under
# the terms of the license supplied by Live Journal, Inc, which can
# currently be found at:
#
# http://code.livejournal.org/trac/livejournal/browser/trunk/LICENSE-LiveJournal.txt
#
# In accordance with the original license, this code and all its
# modifications are provided under the GNU General Public License. 
# A copy of that license can be found in the LICENSE file included as
# part of this distribution.

package LJ::Worker::ExpungeUsers;

use strict;
BEGIN {
    require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use base 'LJ::Worker::Manual';

my $days_before_expunge =  60; # 60 days before expiration
my $lock_name           = "worker_expunge_user_query";

my $sleep_when_idle     = 10;
my $sleep_for_lock      =  3;

my $target_queue_depth  =  100; # try to keep queue depth near this number
my $enqueue_pct         =  0.3; # percentage of target queue depth at which we should enqueue more jobs

my $cache_sock; # [ $sock, connected_at ]

die "No \$LJ::MOVE_SERVER defined\n"
    unless $LJ::MOVE_SERVER;

sub debug {
    LJ::Worker::Manual->cond_debug(@_);
}

sub work {
    my $class = shift;
    my @uids = @_;

    # is the job server free enough that we should enqueue more jobs?
    debug("starting work");
    return 0 unless should_enqueue();

    # load userids from db
    my $lock = get_lock();
    debug("got lock: $lock_name");

    # inside our lock, still need to add more?
    return 0 unless should_enqueue();

    # load users for expunge
    my @users = load_expungable_users(@uids);

    # nothing to do?
    debug("got users: " . scalar @users);
    return 0 unless @users;

    # register jobs with the move server
    my $rv = register_expunge_jobs(@users);
    return 0 unless $rv;

    # lock is released when it falls out of scope
    debug("releasing lock: $lock_name");

    return 1;
}

sub on_idle {
    debug("sleeping while idle");
    return sleep $sleep_when_idle;
}

################################################################################
# Utility Functions
#

sub get_lock {
    my $lock = undef;
    while (! $lock) { 
        debug("trying lock: $lock_name");
        $lock = LJ::locker()->trylock($lock_name);
        last if $lock;
        sleep $sleep_for_lock;
    }

    return $lock;
}

sub load_expungable_users {
    my @uids = @_;

    my $dbr = LJ::get_db_reader()
        or die "unable to contact global db reader";

    unless (@uids) {
        debug("querying within $days_before_expunge days");
        my $sth = $dbr->prepare
            ("SELECT userid FROM user WHERE clusterid>0 AND statusvis='D' AND journaltype IN ('P','C') " .
             "AND statusvisdate < NOW() - INTERVAL $days_before_expunge DAY LIMIT $target_queue_depth");
        $sth->execute;
        die "Error selecting users for expunge: " . $dbr->errstr if $dbr->err;

        while (my $uid = $sth->fetchrow_array) {
            push @uids, $uid;
        }
    }

    # load users for move
    my $us = LJ::load_userids(@uids);
    die "Unable to load user objects" unless $us;

    # filter out users that we can't expunge
    @uids = grep { $us->{$_} && $us->{$_}->can_expunge } @uids;

    return map { $us->{$_} } @uids;
}

sub get_server_sock {

    if ($cache_sock) {
        my ($sock, $at) = @$cache_sock;
        return $sock if time() < $at + 60;

        # invalidate cache
        $sock->close;
        undef $cache_sock;
    }

    debug("connecting move server: $LJ::MOVE_SERVER");
    my $sock = IO::Socket::INET->new(PeerAddr => $LJ::MOVE_SERVER,
                                     Proto    => 'tcp')
        or die "Unable to contact move servers: $LJ::MOVE_SERVER";
    
    # save sock in cache
    $cache_sock = [ $sock, time() ];

    return $sock;
}

sub register_expunge_jobs {
    my @users = @_;

    my $sock = get_server_sock();

    my $before_ct = outstanding_job_ct($sock);

    my @jobs = (); # "uid:src_cid:dest_cid opt1=1 opt2=1", ...
    foreach my $u (@users) {
        push @jobs, join(" ", join(":", $u->id, $u->clusterid, "0"), "delete=1", "expungedel=1");
    }
    debug("expunging users: " . join(",", map { $_->{user} } @users));

    my $job_cmd = "add_jobs " . join(", ", @jobs);
    $sock->send("$job_cmd\r\n");

    # look for a valid response
    while (my $res = <$sock>) {
        last if $res =~ /^END/;
    }

    my $after_ct = outstanding_job_ct();

    # if the delta between before_ct and after_ct is at least n% of
    # what we tried to insert, then we know new non-dup jobs went in,
    # so let's return 1 to signal that there are non-dups being worked on
    my $delta              = $after_ct - $before_ct;
    my $required_delta_pct = 1 - $enqueue_pct;
    my $required_delta     = $target_queue_depth * $required_delta_pct;

    debug("registered expunge jobs: want_pct=$required_delta_pct, delta=$delta, want_delta=$required_delta");

    return $delta > $required_delta ? 1 : 0;
}

sub outstanding_job_ct {
    my $sock = get_server_sock();

    my $list_cmd = "list_jobs";
    $sock->send("$list_cmd\r\n");

    # look for a valid response
    my $in_queue = 0;
    while (my $res = <$sock>) {
        if ($res =~ /(\d+) queued jobs/) {
            $in_queue = $1;
            next;
        }
        last if $res =~ /^END/;
    }

    return $in_queue;
}

sub should_enqueue {
    my $in_queue = outstanding_job_ct();

    # if the number of jobs in queue is less than n% of the target
    # queue depth, then we'll enqueue more

    my $enqueue_at = $target_queue_depth * $enqueue_pct;
    debug("in_queue: $in_queue, enqueue_at: $enqueue_at, target_depth: $target_queue_depth");

    return $in_queue < $enqueue_at ? 1 : 0;
}

################################################################################

# in this mode, it's just debug... don't write to db.
if (@ARGV) {
    my $user = shift;
    my $u = LJ::load_user($user);
    __PACKAGE__->work($u->{userid});
} else {
    __PACKAGE__->run();
}
