Reduce latency to post to an Inbox

This moves the expensive work into the Oban worker which also can reduce system load by restricting the concurrency of these actions to the maximum number of Oban workers allowed per the queue.
This commit is contained in:
Mark Felder 2023-12-05 13:50:45 -05:00
parent 6a6a631c81
commit c48af2cbe7
2 changed files with 31 additions and 16 deletions

View file

@ -272,16 +272,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
end
end
def inbox(%{assigns: %{valid_signature: true}} = conn, %{"nickname" => nickname} = params) do
with %User{} = recipient <- User.get_cached_by_nickname(nickname),
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(params["actor"]),
true <- Utils.recipient_in_message(recipient, actor, params),
params <- Utils.maybe_splice_recipient(recipient.ap_id, params) do
Federator.incoming_ap_doc(params)
json(conn, "ok")
end
end
def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
Federator.incoming_ap_doc(params)
json(conn, "ok")

View file

@ -3,24 +3,49 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@impl Oban.Worker
def perform(%Job{
args: %{"op" => "incoming_ap_doc", "params" => params = %{"nickname" => nickname}}
}) do
with {:nickname, %User{} = recipient} <- {:nickname, User.get_cached_by_nickname(nickname)},
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(params["actor"]),
{:in_message, true} <-
{:in_message, Utils.recipient_in_message(recipient, actor, params)},
params <- Utils.maybe_splice_recipient(recipient.ap_id, params),
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
{:ok, res}
else
e -> process_errors(e)
end
end
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
{:ok, res}
else
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
{:error, :already_present} -> {:cancel, :already_present}
{:error, {:validate_object, reason}} -> {:cancel, reason}
{:error, {:error, {:validate, reason}}} -> {:cancel, reason}
{:error, {:reject, reason}} -> {:cancel, reason}
e -> e
e -> process_errors(e)
end
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(5)
defp process_errors(errors) do
case errors do
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
{:error, :already_present} -> {:cancel, :already_present}
{:error, {:validate_object, reason}} -> {:cancel, reason}
{:error, {:error, {:validate, reason}}} -> {:cancel, reason}
{:error, {:reject, reason}} -> {:cancel, reason}
{:nickname, {:error, reason}} -> {:cancel, reason}
{:in_message, false} -> {:cancel, "Recipient not in message"}
e -> e
end
end
end