#!/usr/bin/perl
#
# bin/worker/ses-incoming-email
#
# Worker that processes incoming email delivered via AWS SES. Polls an
# SQS queue that receives SNS notifications from SES receipt rules.
# Each notification references an email stored in S3.
#
# Pipeline: SES -> S3 (store email) -> SNS -> SQS -> this worker
#
# This replaces the Postfix + incoming-mail-inject.pl pipeline with
# a fully AWS-native solution.
#
# Authors:
#     Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2026 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself.  For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

use v5.10;
use strict;

BEGIN {
    require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use Getopt::Long;
use JSON;
use Log::Log4perl;
use Paws;
use Time::HiRes qw/ time /;

use DW::IncomingEmail;

my $log = Log::Log4perl->get_logger(__PACKAGE__);

my $verbose = 0;
GetOptions( 'verbose|v' => \$verbose );

# Worker lifecycle limits (same pattern as other DW workers)
my $exit_after_secs     = 300 + int( rand() * 600 );    # 5-15 minutes
my $exit_after_messages = 100;
my $message_timeout     = 300;                           # 5 minutes per message

my $start_time     = time();
my $messages_count = 0;

# SQS queue name — configure via $LJ::SES_INCOMING_EMAIL_QUEUE or use default
my $queue_name = $LJ::SES_INCOMING_EMAIL_QUEUE || 'dw-prod-ses-incoming-email';

# Initialize AWS clients
my $paws = Paws->new(
    config => {
        region => $LJ::SQS{region} || 'us-east-1',
    },
);
my $sqs = $paws->service('SQS');
my $s3  = $paws->service('S3');

# Resolve queue URL
my $queue_url;
{
    my $res = eval { $sqs->GetQueueUrl( QueueName => $queue_name ) };
    if ($@) {
        die "Failed to get SQS queue URL for $queue_name: "
            . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ );
    }
    $queue_url = $res->QueueUrl;
    $log->info("Polling SQS queue: $queue_name ($queue_url)");
}

# Main work loop
while (1) {

    # Check lifecycle limits
    last if ( time() - $start_time ) >= $exit_after_secs;
    last if $messages_count >= $exit_after_messages;

    # Long-poll for messages
    my $res = eval {
        $sqs->ReceiveMessage(
            QueueUrl            => $queue_url,
            MaxNumberOfMessages => 10,
            WaitTimeSeconds     => 10,
        );
    };
    if ($@) {
        $log->error( "SQS receive error: "
                . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) );
        sleep 5;
        next;
    }

    my $messages = $res->Messages;
    next unless $messages && ref $messages eq 'ARRAY' && @$messages;

    my @completed_handles;

    foreach my $msg (@$messages) {
        $messages_count++;

        # Set alarm for message timeout
        local $SIG{ALRM} = sub { die "Message processing timeout\n" };
        alarm($message_timeout);

        my $ok = eval { process_message( $msg->Body ) };
        my $err = $@;

        alarm(0);

        if ($err) {
            $log->error("Error processing message: $err");

            # Don't delete — let SQS visibility timeout handle retry
            next;
        }

        if ($ok) {
            push @completed_handles, $msg->ReceiptHandle;
        }

        # else: transient failure, don't delete
    }

    # Delete successfully processed messages
    if (@completed_handles) {
        my $idx = 0;
        eval {
            $sqs->DeleteMessageBatch(
                QueueUrl => $queue_url,
                Entries  => [ map { { Id => $idx++, ReceiptHandle => $_ } } @completed_handles ],
            );
        };
        if ($@) {
            $log->error( "Failed to delete messages: "
                    . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) );
        }
    }
}

$log->info("Worker exiting after $messages_count messages, "
        . int( time() - $start_time )
        . "s runtime" );

# Parse an SQS message body (SNS notification wrapping SES event),
# fetch the email from S3, and process it.
#
# Returns 1 on success/drop, 0 on transient failure.
sub process_message {
    my ($body) = @_;

    # Parse the SNS envelope
    my $sns = eval { decode_json($body) };
    if ($@) {
        $log->error("Failed to parse SNS JSON: $@");
        return 1;    # drop malformed messages
    }

    # The SES notification is JSON-encoded inside the SNS Message field
    my $ses = eval { decode_json( $sns->{Message} ) };
    if ($@) {
        $log->error("Failed to parse SES notification: $@");
        return 1;
    }

    unless ( $ses->{notificationType} eq 'Received' ) {
        $log->info("Ignoring non-Received notification: $ses->{notificationType}");
        return 1;
    }

    # Check SES spam/virus verdicts before we even fetch the email.
    # SES populates these when scan_enabled is true on the receipt rule.
    my $receipt      = $ses->{receipt} || {};
    my $spam_verdict = $receipt->{spamVerdict}{status}    || 'MISSING';
    my $virus_verdict = $receipt->{virusVerdict}{status}   || 'MISSING';
    my $source       = $ses->{mail}{source}               || 'unknown';
    my $recipients   = join( ', ', @{ $ses->{mail}{destination} || [] } );

    $log->info("Incoming email from=$source to=$recipients "
            . "spam=$spam_verdict virus=$virus_verdict" );

    return 1 if $spam_verdict eq 'FAIL';
    return 1 if $virus_verdict eq 'FAIL';

    # Extract S3 location from the SES receipt action
    my $action = $receipt->{action};
    unless ( $action && $action->{type} eq 'S3' ) {
        $log->error( "SES notification missing S3 action: " . ( $action->{type} || 'unknown' ) );
        return 1;
    }

    my $bucket = $action->{bucketName};
    my $key    = $action->{objectKey};

    unless ( $bucket && $key ) {
        $log->error("SES notification missing bucket/key");
        return 1;
    }

    # Fetch the raw email from S3
    my $s3_res = eval {
        $s3->GetObject(
            Bucket => $bucket,
            Key    => $key,
        );
    };
    if ($@) {
        if ( ref $@ && $@->isa('Paws::Exception') ) {
            $log->error( "S3 fetch failed: " . $@->message );
        }
        else {
            $log->error("S3 fetch failed: $@");
        }
        return 0;    # transient failure, retry
    }

    my $raw_email = $s3_res->Body;
    unless ( defined $raw_email && length $raw_email ) {
        $log->error("Empty email from S3: s3://$bucket/$key");
        return 1;
    }

    $log->info( "Processing email from s3://$bucket/$key (" . length($raw_email) . " bytes)" );

    # Process the email using the shared pipeline
    my $rv = DW::IncomingEmail->process($raw_email);

    # Clean up the S3 object after successful processing
    if ($rv) {
        eval {
            $s3->DeleteObject(
                Bucket => $bucket,
                Key    => $key,
            );
        };
        $log->warn("Failed to delete s3://$bucket/$key: $@") if $@;
    }

    return $rv;
}
