244 lines
6.9 KiB
Perl
Executable file
244 lines
6.9 KiB
Perl
Executable file
#!/usr/bin/perl
|
|
#
|
|
# bin/worker/ses-incoming-email
|
|
#
|
|
# Worker that processes incoming email delivered via AWS SES. Polls an
|
|
# SQS queue that receives SNS notifications from SES receipt rules.
|
|
# Each notification references an email stored in S3.
|
|
#
|
|
# Pipeline: SES -> S3 (store email) -> SNS -> SQS -> this worker
|
|
#
|
|
# This replaces the Postfix + incoming-mail-inject.pl pipeline with
|
|
# a fully AWS-native solution.
|
|
#
|
|
# Authors:
|
|
# Mark Smith <mark@dreamwidth.org>
|
|
#
|
|
# Copyright (c) 2026 by Dreamwidth Studios, LLC.
|
|
#
|
|
# This program is free software; you may redistribute it and/or modify it under
|
|
# the same terms as Perl itself. For a copy of the license, please reference
|
|
# 'perldoc perlartistic' or 'perldoc perlgpl'.
|
|
#
|
|
|
|
use v5.10;
|
|
use strict;
|
|
|
|
BEGIN {
|
|
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
|
|
}
|
|
|
|
use Getopt::Long;
|
|
use JSON;
|
|
use Log::Log4perl;
|
|
use Paws;
|
|
use Time::HiRes qw/ time /;
|
|
|
|
use DW::IncomingEmail;
|
|
|
|
my $log = Log::Log4perl->get_logger(__PACKAGE__);
|
|
|
|
my $verbose = 0;
|
|
GetOptions( 'verbose|v' => \$verbose );
|
|
|
|
# Worker lifecycle limits (same pattern as other DW workers)
|
|
my $exit_after_secs = 300 + int( rand() * 600 ); # 5-15 minutes
|
|
my $exit_after_messages = 100;
|
|
my $message_timeout = 300; # 5 minutes per message
|
|
|
|
my $start_time = time();
|
|
my $messages_count = 0;
|
|
|
|
# SQS queue name — configure via $LJ::SES_INCOMING_EMAIL_QUEUE or use default
|
|
my $queue_name = $LJ::SES_INCOMING_EMAIL_QUEUE || 'dw-prod-ses-incoming-email';
|
|
|
|
# Initialize AWS clients
|
|
my $paws = Paws->new(
|
|
config => {
|
|
region => $LJ::SQS{region} || 'us-east-1',
|
|
},
|
|
);
|
|
my $sqs = $paws->service('SQS');
|
|
my $s3 = $paws->service('S3');
|
|
|
|
# Resolve queue URL
|
|
my $queue_url;
|
|
{
|
|
my $res = eval { $sqs->GetQueueUrl( QueueName => $queue_name ) };
|
|
if ($@) {
|
|
die "Failed to get SQS queue URL for $queue_name: "
|
|
. ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ );
|
|
}
|
|
$queue_url = $res->QueueUrl;
|
|
$log->info("Polling SQS queue: $queue_name ($queue_url)");
|
|
}
|
|
|
|
# Main work loop
|
|
while (1) {
|
|
|
|
# Check lifecycle limits
|
|
last if ( time() - $start_time ) >= $exit_after_secs;
|
|
last if $messages_count >= $exit_after_messages;
|
|
|
|
# Long-poll for messages
|
|
my $res = eval {
|
|
$sqs->ReceiveMessage(
|
|
QueueUrl => $queue_url,
|
|
MaxNumberOfMessages => 10,
|
|
WaitTimeSeconds => 10,
|
|
);
|
|
};
|
|
if ($@) {
|
|
$log->error( "SQS receive error: "
|
|
. ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) );
|
|
sleep 5;
|
|
next;
|
|
}
|
|
|
|
my $messages = $res->Messages;
|
|
next unless $messages && ref $messages eq 'ARRAY' && @$messages;
|
|
|
|
my @completed_handles;
|
|
|
|
foreach my $msg (@$messages) {
|
|
$messages_count++;
|
|
|
|
# Set alarm for message timeout
|
|
local $SIG{ALRM} = sub { die "Message processing timeout\n" };
|
|
alarm($message_timeout);
|
|
|
|
my $ok = eval { process_message( $msg->Body ) };
|
|
my $err = $@;
|
|
|
|
alarm(0);
|
|
|
|
if ($err) {
|
|
$log->error("Error processing message: $err");
|
|
|
|
# Don't delete — let SQS visibility timeout handle retry
|
|
next;
|
|
}
|
|
|
|
if ($ok) {
|
|
push @completed_handles, $msg->ReceiptHandle;
|
|
}
|
|
|
|
# else: transient failure, don't delete
|
|
}
|
|
|
|
# Delete successfully processed messages
|
|
if (@completed_handles) {
|
|
my $idx = 0;
|
|
eval {
|
|
$sqs->DeleteMessageBatch(
|
|
QueueUrl => $queue_url,
|
|
Entries => [ map { { Id => $idx++, ReceiptHandle => $_ } } @completed_handles ],
|
|
);
|
|
};
|
|
if ($@) {
|
|
$log->error( "Failed to delete messages: "
|
|
. ( ref $@ && $@->isa('Paws::Exception') ? $@->message : $@ ) );
|
|
}
|
|
}
|
|
}
|
|
|
|
$log->info("Worker exiting after $messages_count messages, "
|
|
. int( time() - $start_time )
|
|
. "s runtime" );
|
|
|
|
# Parse an SQS message body (SNS notification wrapping SES event),
|
|
# fetch the email from S3, and process it.
|
|
#
|
|
# Returns 1 on success/drop, 0 on transient failure.
|
|
sub process_message {
|
|
my ($body) = @_;
|
|
|
|
# Parse the SNS envelope
|
|
my $sns = eval { decode_json($body) };
|
|
if ($@) {
|
|
$log->error("Failed to parse SNS JSON: $@");
|
|
return 1; # drop malformed messages
|
|
}
|
|
|
|
# The SES notification is JSON-encoded inside the SNS Message field
|
|
my $ses = eval { decode_json( $sns->{Message} ) };
|
|
if ($@) {
|
|
$log->error("Failed to parse SES notification: $@");
|
|
return 1;
|
|
}
|
|
|
|
unless ( $ses->{notificationType} eq 'Received' ) {
|
|
$log->info("Ignoring non-Received notification: $ses->{notificationType}");
|
|
return 1;
|
|
}
|
|
|
|
# Check SES spam/virus verdicts before we even fetch the email.
|
|
# SES populates these when scan_enabled is true on the receipt rule.
|
|
my $receipt = $ses->{receipt} || {};
|
|
my $spam_verdict = $receipt->{spamVerdict}{status} || 'MISSING';
|
|
my $virus_verdict = $receipt->{virusVerdict}{status} || 'MISSING';
|
|
my $source = $ses->{mail}{source} || 'unknown';
|
|
my $recipients = join( ', ', @{ $ses->{mail}{destination} || [] } );
|
|
|
|
$log->info("Incoming email from=$source to=$recipients "
|
|
. "spam=$spam_verdict virus=$virus_verdict" );
|
|
|
|
return 1 if $spam_verdict eq 'FAIL';
|
|
return 1 if $virus_verdict eq 'FAIL';
|
|
|
|
# Extract S3 location from the SES receipt action
|
|
my $action = $receipt->{action};
|
|
unless ( $action && $action->{type} eq 'S3' ) {
|
|
$log->error( "SES notification missing S3 action: " . ( $action->{type} || 'unknown' ) );
|
|
return 1;
|
|
}
|
|
|
|
my $bucket = $action->{bucketName};
|
|
my $key = $action->{objectKey};
|
|
|
|
unless ( $bucket && $key ) {
|
|
$log->error("SES notification missing bucket/key");
|
|
return 1;
|
|
}
|
|
|
|
# Fetch the raw email from S3
|
|
my $s3_res = eval {
|
|
$s3->GetObject(
|
|
Bucket => $bucket,
|
|
Key => $key,
|
|
);
|
|
};
|
|
if ($@) {
|
|
if ( ref $@ && $@->isa('Paws::Exception') ) {
|
|
$log->error( "S3 fetch failed: " . $@->message );
|
|
}
|
|
else {
|
|
$log->error("S3 fetch failed: $@");
|
|
}
|
|
return 0; # transient failure, retry
|
|
}
|
|
|
|
my $raw_email = $s3_res->Body;
|
|
unless ( defined $raw_email && length $raw_email ) {
|
|
$log->error("Empty email from S3: s3://$bucket/$key");
|
|
return 1;
|
|
}
|
|
|
|
$log->info( "Processing email from s3://$bucket/$key (" . length($raw_email) . " bytes)" );
|
|
|
|
# Process the email using the shared pipeline
|
|
my $rv = DW::IncomingEmail->process($raw_email);
|
|
|
|
# Clean up the S3 object after successful processing
|
|
if ($rv) {
|
|
eval {
|
|
$s3->DeleteObject(
|
|
Bucket => $bucket,
|
|
Key => $key,
|
|
);
|
|
};
|
|
$log->warn("Failed to delete s3://$bucket/$key: $@") if $@;
|
|
}
|
|
|
|
return $rv;
|
|
}
|