#!/usr/bin/perl
#
# Danga's Distributed Lock Daemon
#
# Copyright 2004, Danga Interactive
# Copyright 2005-2006, Six Apart, Ltd.
#
# Authors:
#   Brad Fitzpatrick <brad@danga.com>
#   Jonathan Steinert <jsteinert@sixapart.com>
#
# License:
#   terms of Perl itself.
#

use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();
## the storage load intervals: %IVs = ( time_iv => load, ... )
our %IVs = ();
## haw many load buckets we want
our $Bucket_qty = 100; #
## the total duration of load measurement
our $Total_duration = 60 * 5; # 5mn
## the duration handled by one bucket
our $IV_duration = int($Total_duration / $Bucket_qty); 
## how many bucket held before we purge the old ones
our $Bucket_limit =  2 * $Bucket_qty;

use vars qw($DEBUG);
$DEBUG = 0;

my (
    $daemonize,
    $nokeepalive,
    $hostname,
    $table,
   );
my $conf_port = 7002;
my $lock_type = "internal";


Getopt::Long::GetOptions(
    'd|daemon'       => \$daemonize,
    'p|port=i'       => \$conf_port,
    'debug=i'        => \$DEBUG,
    'n|no-keepalive' => \$nokeepalive,
    't|type=s'       => \$lock_type,
    'h|hostname=s'   => \$hostname,
    'g|table=s'  => \$table,
   );

# Statistics counters
my $lock_successes = 0;
my $lock_failures  = 0;

my $client_class;
my @client_options;
if ($lock_type eq 'internal') {
    $client_class = "Client::Internal";
}
elsif($lock_type eq 'dlmfs') {
    $client_class = "Client::DLMFS";
}
elsif($lock_type eq 'dbi') {
    length( $hostname ) or die( "-h (--hostname) must be included with a hostname in dbi mode\n" );
    length( $table ) or die( "-T (--table) must be included with a table name in dbi mode\n" );
    $client_class = "Client::DBI";
    @client_options = ( $hostname, $table );
}
else {
    die( "Unknown lock type of '$lock_type' specified.\n" );
}

$client_class->_setup( @client_options );

daemonize() if $daemonize;

use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);

# Linux-specific:
use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
use constant TCP_KEEPCNT   => 6; # Number of keepalives before death

$SIG{'PIPE'} = "IGNORE";  # handled manually

# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
                                   Type      => SOCK_STREAM,
                                   Proto     => IPPROTO_TCP,
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this.  In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);

my $accept_handler = sub {
    my $csock = $server->accept();
    return unless $csock;

    printf("Listen child making a Client for %d.\n", fileno($csock))
        if $DEBUG;

    IO::Handle::blocking($csock, 0);
    setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

    # Enable keep alive
    unless ( $nokeepalive ) {
        (setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 10)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) &&
         1
        ) || die "Couldn't set keep-alive settings on socket (Not on Linux?)";
    }

    my $client = $client_class->new($csock);
    $client->watch_read(1);
};

Client->OtherFds(fileno($server) => $accept_handler);
Client->EventLoop();

sub daemonize {
    my($pid, $sess_id, $i);

    ## Fork and exit parent
    if ($pid = fork) { exit 0; }

    ## Detach ourselves from the terminal
    croak "Cannot detach from controlling terminal"
        unless $sess_id = POSIX::setsid();

    ## Prevent possibility of acquiring a controling terminal
    $SIG{'HUP'} = 'IGNORE';
    if ($pid = fork) { exit 0; }

    ## Change working directory
    chdir "/";

    ## Clear file creation mask
    umask 0;

    ## Close open file descriptors
    close(STDIN);
    close(STDOUT);
    close(STDERR);

    ## Reopen stderr, stdout, stdin to /dev/null
    open(STDIN,  "+>/dev/null");
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}

#####################################################################
### C L I E N T   C L A S S
#####################################################################
package Client;

use Danga::Socket;
use base 'Danga::Socket';
use fields (
            'locks',  # hashref of locks held by this connection. values are 1
            'read_buf',
            );

# TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed)

sub new {
    my Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    $self->{locks} = {};
    $self->{read_buf} = '';
    return $self;
}

# Client
sub event_read {
    my Client $self = shift;

    my $bref = $self->read(1024);
    return $self->close() unless defined $bref;
    $self->{read_buf} .= $$bref;

    if ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
        my $line = $1;
        $self->process_line( $line );
    }
}

sub process_line {
    my Client $self = shift;
    my $line = shift;

    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
        if ($cmd_handler) {
            my $args = decode_url_args(\$args);
            $cmd_handler->($self, $args);
            next;
        }
    }

    return $self->err_line('unknown_command');
}

sub close {
    my Client $self = shift;

    foreach my $lock (keys %{$self->{locks}}) {
        $self->_release_lock($lock);
    }

    $self->SUPER::close;
}


# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }

sub cmd_status {
    my Client $self = shift;

    my $runtime = time - $^T;

    my $load = current_load();
    $self->write("STATUS: OK\n");
    $self->write("SUCCESSES: $lock_successes\n");
    $self->write("FAILURES: $lock_failures\n");
    $self->write("RUNTIME: $runtime\n");
    $self->write("LOAD: $load\n");
    $self->write("\n");

    return 1;
}

sub iv_for_time {
    my $time = shift;
    return $time - ($time % $main::IV_duration);
}

sub latest_ivs {
    my $time = shift || time;
    my $current_iv = iv_for_time($time) + $main::IV_duration;
    return map { $current_iv -= $main::IV_duration } ( 0 .. $main::Bucket_qty - 1);  
}


sub current_load {
    my $sum = 0; 
    for (grep { $_ } @main::IVs{latest_ivs()}) {
         $sum += $_;
    }
    return $sum;
}

sub cmd_load {
    my Client $self = shift;
    my $load = current_load();
    $self->write("LOAD: $load\n");
    $self->write("\n");

    return 1;
}

sub increase_load {
    my $time = time; 
    $main::IVs{ iv_for_time($time) }++;
    if (scalar( keys %main::IVs ) > $main::Bucket_limit) {
        # purge IVs 
        %main::IVs = map { $_ => $main::IVs{$_} } latest_ivs($time);
    }
}

# gets a lock or fails with 'taken'
sub cmd_trylock {
    my Client $self = shift;
    my $args = shift;

    my $lock = $args->{lock};
    my $lockstate = $self->_trylock( $lock );

    increase_load();
    if ($lockstate) {
        $lock_successes++;
    } else {
        $lock_failures++;
    }

    return $lockstate;
}

# releases a lock or fails with 'didnthave'
sub cmd_releaselock {
    my Client $self = shift;
    my $args = shift;

    my $lock = $args->{lock};
    return $self->err_line("empty_lock") unless length($lock);
    return $self->err_line("didnthave") unless $self->{locks}{$lock};

    $self->_release_lock($lock);
    return $self->ok_line;
}

# shows current locks
sub cmd_locks {
    my Client $self = shift;
    my $args = shift;

    $self->write("LOCKS:\n");
    $self->write( join( "\n", $self->_get_locks ) );
    $self->write("\n");

    return 1;
}

sub cmd_noop {
    my Client $self = shift;
    # TODO: set self's last activity time so it isn't cleaned in a purge
    #       of stale connections?
    return $self->ok_line;
}

sub ok_line {
    my Client $self = shift;
    my $args = shift || {};
    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
    $self->write("OK $argline\r\n");
    return 1;
}

sub err_line {
    my Client $self = shift;
    my $err_code = shift;
    my $err_text = {
        'unknown_command' => "Unknown server command",
    }->{$err_code};

    $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
    return 0;
}

sub eurl
{
    my $a = $_[0];
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $a =~ tr/ /+/;
    return $a;
}

sub durl
{
    my ($a) = @_;
    $a =~ tr/+/ /;
    $a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
    return $a;
}

sub decode_url_args
{
    my $a = shift;
    my $buffer = ref $a ? $a : \$a;
    my $ret = {};

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs)
    {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
    }
    return $ret;
}

package Client::Internal;

use base 'Client';

our (%holder);  # hash of lock -> Client object holding it
# TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed)

sub _setup {
    # Nothing to set up.
}

sub _trylock {
    my Client::Internal $self = shift;
    my $lock = shift;

    return $self->err_line("empty_lock") unless length($lock);
    return $self->err_line("taken") if defined $holder{$lock};

    $holder{$lock} = $self;
    $self->{locks}{$lock} = 1;

    return $self->ok_line();
}

sub _release_lock {
    my Client::Internal $self = shift;
    my $lock = shift;

    # TODO: notify waiters
    delete $self->{locks}{$lock};
    delete $holder{$lock};
    return 1;
}

sub _get_locks {
    return map { "  $_ = " . $holder{$_}->as_string } (sort keys %holder);
}

package Client::DLMFS;

use base 'Client';
use Fcntl;
use Errno qw(EEXIST ETXTBSY);

sub FLAGS () { O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL }
sub PATH () { "/dlm/ddlockd" };

sub _setup {
    -d "/dlm" or die( "DLMFS mount at /dlm not found\n" );
    mkdir PATH;
}

sub _trylock {
    my Client::Internal $self = shift;
    my $lock = shift;

    return $self->err_line("empty_lock") unless length($lock);

    if (sysopen( my $handle, PATH . "/$lock", FLAGS )) {
        $self->{locks}{$lock} = 1;
        return $self->ok_line();
    }
    else {
        if ($! == EEXIST) {
            return $self->err_line( "local taken" );
        }
        elsif( $! == ETXTBSY) {
            unlink( PATH . "/$lock" );
            return $self->err_line( "remote taken" );
        }
        else {
            return $self->err_line( "unknown: $!" );
        }
    }
}

sub _release_lock {
    my Client::Internal $self = shift;
    my $lock = shift;

    # TODO: notify waiters
    delete $self->{locks}{$lock};
    unlink( PATH . "/$lock" );
    return 1;
}

sub _get_locks {
# TODO
#    return map { "  $_ = " . $holder{$_}->as_string } (sort keys %holder);
}

package Client::DBI;

# CREATE TABLE

use base 'Client';
use fields qw(dbh);

my $hostname;
my $table;
my $dbh;

sub _setup {
    my Client::DBI $self = shift;
    ($hostname, $table) = @_;
    eval "use DBI; 1" or die "No DBI available?";
    $dbh = DBI->connect( 'dbi:mysql:dbname=sixalock', '', '', {AutoCommit => 0} ) or die;
}

sub _trylock {
    my Client::DBI $self = shift;
    my $lock = shift;

    my $local_locks = $self->{locks};
    exists( $local_locks->{$lock} ) and return $self->err_line( "local taken" );

    $dbh->do( 'START TRANSACTION' ) or return $self->err_line( "Transaction failed to start" );

    my $sth = $dbh->prepare( "SELECT * FROM $table WHERE name=?" );
    $sth->execute( $lock )
        or return $self->err_line( "STH->execute failed" );
    my $ary = $sth->fetchall_arrayref;
    ref($ary) eq 'ARRAY'
        or return $self->err_line( "DBI->selectall_arrayref returned non-arrayref" );
    scalar @$ary == 0
        or return $self->err_line( "remote taken" );

    $dbh->do( "INSERT INTO $table (name) VALUES (?)", {}, $lock )
        or return $self->err_line( "INSERT failed" );
    $dbh->do( 'COMMIT' )
        or return $self->err_line( "COMMIT failed" );
    return $self->ok_line();
}

sub _release_lock {
    my Client::DBI $self = shift;
    my $lock = shift;

    my $locks = $self->{locks};
    if (exists( $locks->{$lock} )) {
        delete $locks->{$lock};
        $dbh->do( "SELECT AND DELETE * FROM $table WHERE name=?", {}, $lock );
        return 1;
    }
    else {
        return 0;
    }
}

sub _get_locks {
    my Client::DBI $self = shift;

    my $ary = $dbh->selectall_arrayref( "SELECT name FROM $table" );
    return map { $_->[0] } @$ary;
}

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
