111 lines
2.9 KiB
Ruby
111 lines
2.9 KiB
Ruby
class IndexSweeper
|
|
|
|
REDIS = AsyncIndexer::REDIS
|
|
|
|
def self.async_cleanup(klass, expected_ids, found_ids)
|
|
deleted_ids = expected_ids.map(&:to_i).select { |id| !found_ids.include?(id) }
|
|
|
|
if deleted_ids.any?
|
|
AsyncIndexer.index(klass, deleted_ids, "cleanup")
|
|
end
|
|
end
|
|
|
|
def initialize(batch, indexer)
|
|
@batch = batch
|
|
@indexer = indexer
|
|
@success_ids = []
|
|
@rerun_ids = []
|
|
end
|
|
|
|
def process_batch
|
|
return if @batch.nil?
|
|
|
|
load_errors
|
|
|
|
@batch["items"].each do |item|
|
|
process_document(item)
|
|
end
|
|
|
|
save_errors
|
|
|
|
if @success_ids.present? && @indexer.respond_to?(:handle_success)
|
|
@indexer.handle_success(@success_ids)
|
|
end
|
|
|
|
if @rerun_ids.any?
|
|
AsyncIndexer.new(@indexer, "failures").enqueue_ids(
|
|
@indexer.find_elasticsearch_ids(@rerun_ids)
|
|
)
|
|
end
|
|
end
|
|
|
|
# Returns a list of all permanent failures associated with the given indexer.
|
|
# Used for testing purposes. (Can be used for diagnostic purposes, as well.)
|
|
def self.permanent_failures(indexer)
|
|
failures = []
|
|
|
|
REDIS.hgetall("#{indexer}:failures").each_pair do |id, value|
|
|
JSON.parse(value).each do |info|
|
|
if info["count"] >= 3
|
|
failures << { id.to_s => info["error"] }
|
|
end
|
|
end
|
|
end
|
|
|
|
failures
|
|
end
|
|
|
|
private
|
|
|
|
# Calculate which IDs were included in this batch.
|
|
def batch_ids
|
|
@batch_ids ||= @batch["items"].map { |item| item.values.first["_id"].to_s }
|
|
end
|
|
|
|
# Load information about previous errors for all the items in this batch.
|
|
def load_errors
|
|
@errors = REDIS.mapped_hmget("#{@indexer}:failures", *batch_ids)
|
|
@errors.transform_values! { |value| JSON.parse(value || "[]") }
|
|
end
|
|
|
|
# Save information about all the errors for all the items in this batch.
|
|
def save_errors
|
|
return unless @errors.present?
|
|
|
|
# Clear out the blank errors.
|
|
blank = @errors.select { |_, v| v.blank? }.keys
|
|
REDIS.hdel("#{@indexer}:failures", blank) if blank.present?
|
|
|
|
# Save the items with non-blank errors.
|
|
present = @errors.select { |_, v| v.present? }
|
|
present.transform_values!(&:to_json)
|
|
REDIS.mapped_hmset("#{@indexer}:failures", present) if present.present?
|
|
end
|
|
|
|
def process_document(item)
|
|
document = item[item.keys.first] # update/index/delete
|
|
id = document["_id"]
|
|
|
|
if document["error"]
|
|
if add_error(id, document["error"]) < 3
|
|
@rerun_ids << id
|
|
end
|
|
else
|
|
@errors[id.to_s].clear
|
|
@success_ids << id
|
|
end
|
|
end
|
|
|
|
# Add an error for the given document ID. Return the total number of times
|
|
# that error has occurred.
|
|
def add_error(id, error)
|
|
@errors[id.to_s].each do |info|
|
|
next unless info["error"] == error
|
|
return info["count"] += 1
|
|
end
|
|
|
|
# The error hasn't been seen before, so we need to add it with a count of 1.
|
|
@errors[id.to_s] << { "error" => error, "count" => 1 }
|
|
1 # we return the count
|
|
end
|
|
end
|