#!/usr/bin/perl # # bin/worker/ses-incoming-email # # Worker that processes incoming email delivered via AWS SES. Polls an # SQS queue that receives SNS notifications from SES receipt rules. # Each notification references an email stored in S3. # # Pipeline: SES -> S3 (store email) -> SNS -> SQS -> this worker # # This replaces the Postfix + incoming-mail-inject.pl pipeline with # a fully AWS-native solution. # # Authors: # Mark Smith # # Copyright (c) 2026 by Dreamwidth Studios, LLC. # # This program is free software; you may redistribute it and/or modify it under # the same terms as Perl itself. For a copy of the license, please reference # 'perldoc perlartistic' or 'perldoc perlgpl'. # use v5.10; use strict; BEGIN { require "$ENV{LJHOME}/cgi-bin/ljlib.pl"; } use Getopt::Long; use JSON; use Log::Log4perl; use Paws; use Time::HiRes qw/ time /; use DW::IncomingEmail; my $log = Log::Log4perl->get_logger(__PACKAGE__); my $verbose = 0; GetOptions( 'verbose|v' => \$verbose ); # Worker lifecycle limits (same pattern as other DW workers) my $exit_after_secs = 300 + int( rand() * 600 ); # 5-15 minutes my $exit_after_messages = 100; my $message_timeout = 300; # 5 minutes per message my $start_time = time(); my $messages_count = 0; # SQS queue name — configure via $LJ::SES_INCOMING_EMAIL_QUEUE or use default my $queue_name = $LJ::SES_INCOMING_EMAIL_QUEUE || 'dw-prod-ses-incoming-email'; # Initialize AWS clients my $paws = Paws->new( config => { region => $LJ::SQS{region} || 'us-east-1', }, ); my $sqs = $paws->service('SQS'); my $s3 = $paws->service('S3'); # Resolve queue URL my $queue_url; { my $res = eval { $sqs->GetQueueUrl( QueueName => $queue_name ) }; if ($@) { die "Failed to get SQS queue URL for $queue_name: " . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ); } $queue_url = $res->QueueUrl; $log->info("Polling SQS queue: $queue_name ($queue_url)"); } # Main work loop while (1) { # Check lifecycle limits last if ( time() - $start_time ) >= $exit_after_secs; last if $messages_count >= $exit_after_messages; # Long-poll for messages my $res = eval { $sqs->ReceiveMessage( QueueUrl => $queue_url, MaxNumberOfMessages => 10, WaitTimeSeconds => 10, ); }; if ($@) { $log->error( "SQS receive error: " . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) ); sleep 5; next; } my $messages = $res->Messages; next unless $messages && ref $messages eq 'ARRAY' && @$messages; my @completed_handles; foreach my $msg (@$messages) { $messages_count++; # Set alarm for message timeout local $SIG{ALRM} = sub { die "Message processing timeout\n" }; alarm($message_timeout); my $ok = eval { process_message( $msg->Body ) }; my $err = $@; alarm(0); if ($err) { $log->error("Error processing message: $err"); # Don't delete — let SQS visibility timeout handle retry next; } if ($ok) { push @completed_handles, $msg->ReceiptHandle; } # else: transient failure, don't delete } # Delete successfully processed messages if (@completed_handles) { my $idx = 0; eval { $sqs->DeleteMessageBatch( QueueUrl => $queue_url, Entries => [ map { { Id => $idx++, ReceiptHandle => $_ } } @completed_handles ], ); }; if ($@) { $log->error( "Failed to delete messages: " . ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) ); } } } $log->info("Worker exiting after $messages_count messages, " . int( time() - $start_time ) . "s runtime" ); # Parse an SQS message body (SNS notification wrapping SES event), # fetch the email from S3, and process it. # # Returns 1 on success/drop, 0 on transient failure. sub process_message { my ($body) = @_; # Parse the SNS envelope my $sns = eval { decode_json($body) }; if ($@) { $log->error("Failed to parse SNS JSON: $@"); return 1; # drop malformed messages } # The SES notification is JSON-encoded inside the SNS Message field my $ses = eval { decode_json( $sns->{Message} ) }; if ($@) { $log->error("Failed to parse SES notification: $@"); return 1; } unless ( $ses->{notificationType} eq 'Received' ) { $log->info("Ignoring non-Received notification: $ses->{notificationType}"); return 1; } # Check SES spam/virus verdicts before we even fetch the email. # SES populates these when scan_enabled is true on the receipt rule. my $receipt = $ses->{receipt} || {}; my $spam_verdict = $receipt->{spamVerdict}{status} || 'MISSING'; my $virus_verdict = $receipt->{virusVerdict}{status} || 'MISSING'; my $source = $ses->{mail}{source} || 'unknown'; my $recipients = join( ', ', @{ $ses->{mail}{destination} || [] } ); $log->info("Incoming email from=$source to=$recipients " . "spam=$spam_verdict virus=$virus_verdict" ); return 1 if $spam_verdict eq 'FAIL'; return 1 if $virus_verdict eq 'FAIL'; # Extract S3 location from the SES receipt action my $action = $receipt->{action}; unless ( $action && $action->{type} eq 'S3' ) { $log->error( "SES notification missing S3 action: " . ( $action->{type} || 'unknown' ) ); return 1; } my $bucket = $action->{bucketName}; my $key = $action->{objectKey}; unless ( $bucket && $key ) { $log->error("SES notification missing bucket/key"); return 1; } # Fetch the raw email from S3 my $s3_res = eval { $s3->GetObject( Bucket => $bucket, Key => $key, ); }; if ($@) { if ( ref $@ && $@->isa('Paws::Exception') ) { $log->error( "S3 fetch failed: " . $@->message ); } else { $log->error("S3 fetch failed: $@"); } return 0; # transient failure, retry } my $raw_email = $s3_res->Body; unless ( defined $raw_email && length $raw_email ) { $log->error("Empty email from S3: s3://$bucket/$key"); return 1; } $log->info( "Processing email from s3://$bucket/$key (" . length($raw_email) . " bytes)" ); # Process the email using the shared pipeline my $rv = DW::IncomingEmail->process($raw_email); # Clean up the S3 object after successful processing if ($rv) { eval { $s3->DeleteObject( Bucket => $bucket, Key => $key, ); }; $log->warn("Failed to delete s3://$bucket/$key: $@") if $@; } return $rv; }