Replace Cachex with Nebulex: Object

This commit is contained in:
Mark Felder 2025-04-04 19:11:48 -07:00
parent e94d597506
commit 7faad7f49a
27 changed files with 80 additions and 84 deletions

View file

@ -158,7 +158,6 @@ defmodule Pleroma.Application do
[
build_cachex("used_captcha", ttl_interval: seconds_valid_interval()),
build_cachex("user", default_ttl: 25_000, ttl_interval: 1000, limit: 2500),
build_cachex("object", default_ttl: 25_000, ttl_interval: 1000, limit: 2500),
build_cachex("scrubber", limit: 2500),
build_cachex("scrubber_management", limit: 2500),
build_cachex("idempotency", expiration: idempotency_expiration(), limit: 2500),

View file

@ -0,0 +1,12 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.CachingNebulex do
@callback put(key :: any(), value :: any(), opts :: Keyword.t()) :: :ok
@callback put(key :: any(), value :: any()) :: :ok
@callback get(key :: any(), opts :: Keyword.t()) :: any()
@callback get(key :: any()) :: any()
@callback delete(key :: any(), opts :: Keyword.t()) :: :ok
@callback delete(key :: any()) :: :ok
end

View file

@ -540,7 +540,7 @@ defmodule Pleroma.Notification do
# For some activities, only notify the author of the object
def get_potential_receiver_ap_ids(%{data: %{"type" => type, "object" => object_id}})
when type in ~w{Like Announce EmojiReact} do
case Object.get_cached_by_ap_id(object_id) do
case Object.get_by_ap_id(object_id) do
%Object{data: %{"actor" => actor}} ->
[actor]

View file

@ -4,11 +4,13 @@
defmodule Pleroma.Object do
use Ecto.Schema
use Nebulex.Caching
import Ecto.Query
import Ecto.Changeset
alias Pleroma.Activity
alias Pleroma.Cache
alias Pleroma.Config
alias Pleroma.Hashtag
alias Pleroma.Object
@ -25,6 +27,7 @@ defmodule Pleroma.Object do
@derive {Jason.Encoder, only: [:data]}
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@nebulex Pleroma.Config.get([:nebulex, :provider], Cache)
schema "objects" do
field(:data, :map)
@ -97,11 +100,16 @@ defmodule Pleroma.Object do
defp hashtags_changed?(_, _), do: false
def get_by_id(nil), do: nil
def get_by_id(id), do: Repo.get(Object, id)
def get_by_id(id), do: do_get_by_id(id)
@decorate cacheable(cache: @nebulex, key: {Object, id}, opts: [ttl: 25_000])
defp do_get_by_id(id), do: Repo.get(Object, id)
def get_by_ap_id(nil), do: nil
def get_by_ap_id(ap_id), do: do_get_by_ap_id(ap_id)
def get_by_ap_id(ap_id) do
@decorate cacheable(cache: @nebulex, key: {Object, ap_id}, opts: [ttl: 25_000])
defp do_get_by_ap_id(ap_id) do
Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
end
@ -165,7 +173,7 @@ defmodule Pleroma.Object do
end
true ->
get_cached_by_ap_id(ap_id)
get_by_ap_id(ap_id)
end
end
@ -183,20 +191,6 @@ defmodule Pleroma.Object do
# Legacy objects can be accessed by anybody
def authorize_access(%Object{}, %User{}), do: :ok
@spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
def get_cached_by_ap_id(ap_id) do
key = "object:#{ap_id}"
with {:ok, nil} <- @cachex.get(:object_cache, key),
object when not is_nil(object) <- get_by_ap_id(ap_id),
{:ok, true} <- @cachex.put(:object_cache, key, object) do
object
else
{:ok, object} -> object
nil -> nil
end
end
def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
%ObjectTombstone{
id: id,
@ -221,7 +215,7 @@ defmodule Pleroma.Object do
def delete(%Object{data: %{"id" => id}} = object) do
with {:ok, _obj} = swap_object_with_tombstone(object),
deleted_activity = Activity.delete_all_by_object_ap_id(id),
{:ok, _} <- invalid_object_cache(object) do
{:ok, _} <- evict_cache(object) do
cleanup_attachments(
Config.get([:instance, :cleanup_attachments]),
object
@ -242,25 +236,20 @@ defmodule Pleroma.Object do
def prune(%Object{data: %{"id" => _id}} = object) do
with {:ok, object} <- Repo.delete(object),
{:ok, _} <- invalid_object_cache(object) do
{:ok, _} <- evict_cache(object) do
{:ok, object}
end
end
def invalid_object_cache(%Object{data: %{"id" => id}}) do
with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
@cachex.del(:web_resp_cache, URI.parse(id).path)
end
end
def set_cache(%Object{data: %{"id" => ap_id}} = object) do
@cachex.put(:object_cache, "object:#{ap_id}", object)
def evict_cache(%Object{data: %{"id" => id}} = object) do
@cachex.del(:web_resp_cache, URI.parse(id).path)
@nebulex.delete({Object, id})
{:ok, object}
end
def update_and_set_cache(changeset) do
def update_and_evict_cache(changeset) do
with {:ok, object} <- Repo.update(changeset) do
set_cache(object)
evict_cache(object)
end
end
@ -282,7 +271,7 @@ defmodule Pleroma.Object do
)
|> Repo.update_all([])
|> case do
{1, [object]} -> set_cache(object)
{1, [object]} -> evict_cache(object)
_ -> {:error, "Not found"}
end
end
@ -309,7 +298,7 @@ defmodule Pleroma.Object do
)
|> Repo.update_all([])
|> case do
{1, [object]} -> set_cache(object)
{1, [object]} -> evict_cache(object)
_ -> {:error, "Not found"}
end
end
@ -332,7 +321,7 @@ defmodule Pleroma.Object do
)
|> Repo.update_all([])
|> case do
{1, [object]} -> set_cache(object)
{1, [object]} -> evict_cache(object)
_ -> {:error, "Not found"}
end
end
@ -355,7 +344,7 @@ defmodule Pleroma.Object do
)
|> Repo.update_all([])
|> case do
{1, [object]} -> set_cache(object)
{1, [object]} -> evict_cache(object)
_ -> {:error, "Not found"}
end
end
@ -384,7 +373,7 @@ defmodule Pleroma.Object do
object
|> Object.change(%{data: data})
|> update_and_set_cache()
|> update_and_evict_cache()
else
_ -> :noop
end

View file

@ -28,7 +28,7 @@ defmodule Pleroma.Object.Fetcher do
with {:ok, new_data, _} <- ObjectValidator.validate(new_data, %{}),
{:ok, new_data} <- MRF.filter(new_data),
{:ok, new_object, _} <-
Object.Updater.do_update_and_invalidate_cache(
Object.Updater.do_update(
object,
new_data,
_touch_changeset? = true
@ -67,7 +67,7 @@ defmodule Pleroma.Object.Fetcher do
@spec fetch_object_from_id(String.t(), list()) ::
{:ok, Object.t()} | {fetcher_errors(), any()} | Pipeline.errors()
def fetch_object_from_id(id, options \\ []) do
with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
with {_, nil} <- {:fetch_object, Object.get_by_ap_id(id)},
{_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
{_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
{_, nil} <- {:normalize, Object.normalize(data, fetch: false)},

View file

@ -256,7 +256,7 @@ defmodule Pleroma.Object.Updater do
defp maybe_touch_changeset(changeset, _), do: changeset
def do_update_and_invalidate_cache(orig_object, updated_object, touch_changeset? \\ false) do
def do_update(orig_object, updated_object, touch_changeset? \\ false) do
orig_object_ap_id = updated_object["id"]
orig_object_data = orig_object.data
@ -272,9 +272,7 @@ defmodule Pleroma.Object.Updater do
|> Object.change(%{data: updated_object_data})
|> maybe_touch_changeset(touch_changeset?)
with {:ok, new_object} <- Repo.update(changeset),
{:ok, _} <- Object.invalid_object_cache(new_object),
{:ok, _} <- Object.set_cache(new_object),
with {:ok, new_object} <- Object.update_and_evict_cache(changeset),
# The metadata/utils.ex uses the object id for the cache.
{:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
if used_history_in_new_object? do

View file

@ -1832,7 +1832,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
def enqueue_pin_fetches(%{pinned_objects: pins}) do
# enqueue a task to fetch all pinned objects
Enum.each(pins, fn {ap_id, _} ->
if is_nil(Object.get_cached_by_ap_id(ap_id)) do
if is_nil(Object.get_by_ap_id(ap_id)) do
Pleroma.Workers.RemoteFetcherWorker.new(%{
"op" => "fetch_remote",
"id" => ap_id,

View file

@ -80,7 +80,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
def object(%{assigns: assigns} = conn, _) do
with ap_id <- Endpoint.url() <> conn.request_path,
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
%Object{} = object <- Object.get_by_ap_id(ap_id),
user <- Map.get(assigns, :user, nil),
{_, true} <- {:visible?, Visibility.visible_for_user?(object, user)} do
conn

View file

@ -81,7 +81,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.AnnounceValidator do
with actor when is_binary(actor) <- get_field(cng, :actor),
object when is_binary(object) <- get_field(cng, :object),
%User{} = actor <- User.get_cached_by_ap_id(actor),
%Object{} = object <- Object.get_cached_by_ap_id(object),
%Object{} = object <- Object.get_by_ap_id(object),
false <- Visibility.public?(object) do
same_actor = object.data["actor"] == actor.ap_id
recipients = get_field(cng, :to) ++ get_field(cng, :cc)

View file

@ -57,7 +57,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.CommonValidations do
cng
|> validate_change(field_name, fn field_name, object_id ->
object = Object.get_cached_by_ap_id(object_id) || Activity.get_by_ap_id(object_id)
object = Object.get_by_ap_id(object_id) || Activity.get_by_ap_id(object_id)
cond do
!object ->

View file

@ -57,7 +57,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.CreateChatMessageValidator do
def validate_object_nonexistence(cng) do
cng
|> validate_change(:object, fn :object, object_id ->
if Object.get_cached_by_ap_id(object_id) do
if Object.get_by_ap_id(object_id) do
[{:object, "The object to create already exists"}]
else
[]

View file

@ -112,7 +112,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.CreateGenericValidator do
def validate_object_nonexistence(cng) do
cng
|> validate_change(:object, fn :object, object_id ->
if Object.get_cached_by_ap_id(object_id) do
if Object.get_by_ap_id(object_id) do
[{:object, "The object to create already exists"}]
else
[]

View file

@ -470,7 +470,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
{:ok, _, updated} =
Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
Object.Updater.do_update(orig_object, updated_object)
if updated do
object

View file

@ -728,7 +728,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
replies_uris =
with limit when limit > 0 <-
Pleroma.Config.get([:activitypub, :note_replies_output_limit], 0),
%Object{} = object <- Object.get_cached_by_ap_id(obj_data["id"]) do
%Object{} = object <- Object.get_by_ap_id(obj_data["id"]) do
object
|> Object.self_replies()
|> select([o], fragment("?->>'id'", o.data))

View file

@ -327,7 +327,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
object
|> Changeset.change(data: data)
|> Object.update_and_set_cache()
|> Object.update_and_evict_cache()
end
@spec add_emoji_reaction_to_object(Activity.t(), Object.t()) ::
@ -889,7 +889,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
{:ok, object} =
activity.object
|> Object.change(%{data: object_data})
|> Object.update_and_set_cache()
|> Object.update_and_evict_cache()
activity_data =
activity.data

View file

@ -281,7 +281,7 @@ defmodule Pleroma.Web.ActivityPub.UserView do
pinned_objects
|> Enum.sort_by(fn {_, pinned_at} -> pinned_at end, &>=/2)
|> Enum.map(fn {id, _} ->
ObjectView.render("object.json", %{object: Object.get_cached_by_ap_id(id)})
ObjectView.render("object.json", %{object: Object.get_by_ap_id(id)})
end)
%{

View file

@ -357,7 +357,7 @@ defmodule Pleroma.Web.CommonAPI do
Activity.normalize(activity.data)
end)
object = Object.get_cached_by_ap_id(object.data["id"])
object = Object.get_by_ap_id(object.data["id"])
{:ok, answer_activities, object}
end
end
@ -679,7 +679,7 @@ defmodule Pleroma.Web.CommonAPI do
{:ok, object} =
object
|> Object.change(%{data: new_data})
|> Object.update_and_set_cache()
|> Object.update_and_evict_cache()
{:ok, Map.put(activity, :object, object)}
end

View file

@ -119,7 +119,7 @@ defmodule Pleroma.Web.Feed.FeedView do
end
def get_href(id) do
with %Object{data: %{"external_url" => external_url}} <- Object.get_cached_by_ap_id(id) do
with %Object{data: %{"external_url" => external_url}} <- Object.get_by_ap_id(id) do
external_url
else
_e -> id

View file

@ -55,26 +55,6 @@ defmodule Pleroma.ObjectTest do
assert found_object.data["type"] == "Tombstone"
end
test "ensures cache is cleared for the object" do
object = insert(:note)
cached_object = Object.get_cached_by_ap_id(object.data["id"])
assert object == cached_object
Cachex.put(:web_resp_cache, URI.parse(object.data["id"]).path, "cofe")
Object.delete(cached_object)
{:ok, nil} = Cachex.get(:object_cache, "object:#{object.data["id"]}")
{:ok, nil} = Cachex.get(:web_resp_cache, URI.parse(object.data["id"]).path)
cached_object = Object.get_cached_by_ap_id(object.data["id"])
refute object == cached_object
assert cached_object.data["type"] == "Tombstone"
end
end
describe "delete attachments" do

View file

@ -35,7 +35,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.AnnounceValidationTest do
test "keeps announced object context", %{valid_announce: valid_announce} do
assert %Object{data: %{"context" => object_context}} =
Object.get_cached_by_ap_id(valid_announce["object"])
Object.get_by_ap_id(valid_announce["object"])
{:ok, %{"context" => context}, _} =
valid_announce

View file

@ -41,7 +41,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.DeleteValidationTest do
{:ok, _object} =
object
|> Ecto.Changeset.change(%{data: data})
|> Object.update_and_set_cache()
|> Object.update_and_evict_cache()
{:error, cng} = ObjectValidator.validate(valid_post_delete, [])
assert {:object, {"object not in allowed types", []}} in cng.errors

View file

@ -47,13 +47,10 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.DeleteHandlingTest do
test "it works for incoming when the object has been pruned" do
activity = insert(:note_activity)
{:ok, object} =
{:ok, _object} =
Object.normalize(activity.data["object"], fetch: false)
|> Repo.delete()
# TODO: mock cachex
Cachex.del(:object_cache, "object:#{object.data["id"]}")
deleting_user = insert(:user)
data =

View file

@ -258,7 +258,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusViewTest do
|> Map.put("content", nil)
Object.change(note_object, %{data: data})
|> Object.update_and_set_cache()
|> Object.update_and_evict_cache()
User.get_cached_by_ap_id(note.data["actor"])

View file

@ -67,6 +67,8 @@ defmodule Pleroma.DataCase do
def setup_multi_process_mode(tags) do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Pleroma.Repo)
Mox.stub_with(Pleroma.NebulexMock, Pleroma.VoidCache)
if tags[:async] do
Mox.stub_with(Pleroma.CachexMock, Pleroma.NullCache)
Mox.set_mox_private()

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
Mox.defmock(Pleroma.CachexMock, for: Pleroma.Caching)
Mox.defmock(Pleroma.NebulexMock, for: Pleroma.CachingNebulex)
Mox.defmock(Pleroma.Web.ActivityPub.ObjectValidatorMock,
for: Pleroma.Web.ActivityPub.ObjectValidator.Validating

View file

@ -0,0 +1,19 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.VoidCache do
@moduledoc """
A module simulating a permanently empty cache.
"""
@behaviour Pleroma.CachingNebulex
@impl true
def put(_, _, _ \\ []), do: :ok
@impl true
def get(_, _ \\ []), do: nil
@impl true
def delete(_, _ \\ []), do: :ok
end

View file

@ -18,7 +18,6 @@ Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, :manual)
Mox.defmock(Pleroma.ReverseProxy.ClientMock, for: Pleroma.ReverseProxy.Client)
Mox.defmock(Pleroma.GunMock, for: Pleroma.Gun)
Mox.defmock(Pleroma.NebulexMock, for: Nebulex.Cache)
{:ok, _} = Application.ensure_all_started(:ex_machina)