mirror of
https://git.pleroma.social/pleroma/pleroma.git
synced 2025-04-24 05:47:41 -04:00
Merge branch 'relay-actor-type' into 'develop'
Use 'Application' for relay actor type See merge request pleroma/pleroma!3934
This commit is contained in:
commit
360d897e71
9 changed files with 196 additions and 8 deletions
1
changelog.d/relay-actor-type.fix
Normal file
1
changelog.d/relay-actor-type.fix
Normal file
|
@ -0,0 +1 @@
|
|||
Instance relay now uses actor type Application
|
|
@ -218,7 +218,8 @@ defmodule Pleroma.Application do
|
|||
if Application.get_env(:pleroma, __MODULE__)[:background_migrators] do
|
||||
[
|
||||
Pleroma.Migrators.HashtagsTableMigrator,
|
||||
Pleroma.Migrators.ContextObjectsDeletionMigrator
|
||||
Pleroma.Migrators.ContextObjectsDeletionMigrator,
|
||||
Pleroma.Migrators.RelayActorTypeMigrator
|
||||
]
|
||||
else
|
||||
[]
|
||||
|
|
|
@ -45,4 +45,5 @@ defmodule Pleroma.DataMigration do
|
|||
|
||||
def populate_hashtags_table, do: get_by_name("populate_hashtags_table")
|
||||
def delete_context_objects, do: get_by_name("delete_context_objects")
|
||||
def update_relay_actor_type, do: get_by_name("update_relay_actor_type")
|
||||
end
|
||||
|
|
144
lib/pleroma/migrators/relay_actor_type_migrator.ex
Normal file
144
lib/pleroma/migrators/relay_actor_type_migrator.ex
Normal file
|
@ -0,0 +1,144 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2024 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Migrators.RelayActorTypeMigrator do
|
||||
defmodule State do
|
||||
use Pleroma.Migrators.Support.BaseMigratorState
|
||||
|
||||
@impl Pleroma.Migrators.Support.BaseMigratorState
|
||||
defdelegate data_migration(), to: Pleroma.DataMigration, as: :update_relay_actor_type
|
||||
end
|
||||
|
||||
import Ecto.Changeset
|
||||
|
||||
use Pleroma.Migrators.Support.BaseMigrator
|
||||
|
||||
alias Pleroma.Migrators.Support.BaseMigrator
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.Builder
|
||||
alias Pleroma.Web.ActivityPub.Pipeline
|
||||
|
||||
@doc "This migration updates relay actor type to 'Application'."
|
||||
|
||||
@impl BaseMigrator
|
||||
def feature_config_path, do: [:features, :update_relay_actor_type]
|
||||
|
||||
@impl BaseMigrator
|
||||
def fault_rate_allowance, do: 0
|
||||
|
||||
@impl BaseMigrator
|
||||
def perform do
|
||||
# data_migration_id = data_migration_id()
|
||||
max_processed_id = get_stat(:max_processed_id, 0)
|
||||
|
||||
Logger.info("Migrating local relay actor types (from uid: #{max_processed_id})...")
|
||||
|
||||
query()
|
||||
|> Repo.chunk_stream(1, :batches, timeout: :infinity)
|
||||
|> Stream.each(fn users ->
|
||||
user_ids = Enum.map(users, fn user -> user.id end)
|
||||
|
||||
results = Enum.map(users, &update_relay_actor_type(&1))
|
||||
|
||||
# failed_ids =
|
||||
# results
|
||||
# |> Enum.filter(&(elem(&1, 0) == :error))
|
||||
# |> Enum.map(&elem(&1, 1))
|
||||
|
||||
chunk_affected_count =
|
||||
results
|
||||
# |> Enum.filter(&(elem(&1, 0) == :ok))
|
||||
|> length()
|
||||
|
||||
# for failed_id <- failed_ids do
|
||||
# _ =
|
||||
# Repo.query(
|
||||
# "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
|
||||
# "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
|
||||
# [data_migration_id, failed_id]
|
||||
# )
|
||||
# end
|
||||
|
||||
# _ =
|
||||
# Repo.query(
|
||||
# "DELETE FROM data_migration_failed_ids " <>
|
||||
# "WHERE data_migration_id = $1 AND record_id = ANY($2)",
|
||||
# [data_migration_id, user_ids -- failed_ids]
|
||||
# )
|
||||
|
||||
max_user_id = Enum.at(user_ids, -1)
|
||||
|
||||
put_stat(:max_processed_id, max_user_id)
|
||||
increment_stat(:iteration_processed_count, length(user_ids))
|
||||
increment_stat(:processed_count, length(user_ids))
|
||||
increment_stat(:failed_count, 0)
|
||||
increment_stat(:affected_count, chunk_affected_count)
|
||||
put_stat(:records_per_second, records_per_second())
|
||||
persist_state()
|
||||
end)
|
||||
|> Stream.run()
|
||||
end
|
||||
|
||||
@impl BaseMigrator
|
||||
def query do
|
||||
ap_id = Pleroma.Web.ActivityPub.Relay.ap_id()
|
||||
|
||||
from(
|
||||
u in User,
|
||||
where: u.local == true and u.ap_id == ^ap_id and u.actor_type == "Person"
|
||||
)
|
||||
end
|
||||
|
||||
@spec update_relay_actor_type(User.t()) :: {:ok | :error, integer()}
|
||||
defp update_relay_actor_type(user) do
|
||||
with changeset <- cast(user, %{actor_type: "Application"}, [:actor_type]),
|
||||
{:ok, unpersisted_user} <- Ecto.Changeset.apply_action(changeset, :update),
|
||||
updated_object <-
|
||||
Pleroma.Web.ActivityPub.UserView.render("user.json", user: unpersisted_user)
|
||||
|> Map.delete("@context"),
|
||||
{:ok, update_data, []} <- Builder.update(user, updated_object),
|
||||
{:ok, _update, _} <-
|
||||
Pipeline.common_pipeline(update_data,
|
||||
local: true,
|
||||
user_update_changeset: changeset
|
||||
) do
|
||||
{:ok, user.id}
|
||||
else
|
||||
_ -> {:error, user.id}
|
||||
end
|
||||
end
|
||||
|
||||
@impl BaseMigrator
|
||||
def retry_failed do
|
||||
data_migration_id = data_migration_id()
|
||||
|
||||
failed_objects_query()
|
||||
|> Repo.chunk_stream(100, :one)
|
||||
|> Stream.each(fn user ->
|
||||
with {res, _} when res != :error <- update_relay_actor_type(user) do
|
||||
_ =
|
||||
Repo.query(
|
||||
"DELETE FROM data_migration_failed_ids " <>
|
||||
"WHERE data_migration_id = $1 AND record_id = $2",
|
||||
[data_migration_id, user.id]
|
||||
)
|
||||
end
|
||||
end)
|
||||
|> Stream.run()
|
||||
|
||||
put_stat(:failed_count, failures_count())
|
||||
persist_state()
|
||||
|
||||
force_continue()
|
||||
end
|
||||
|
||||
defp failed_objects_query do
|
||||
from(u in User)
|
||||
|> join(:inner, [u], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
|
||||
on: dmf.record_id == u.id
|
||||
)
|
||||
|> where([_u, dmf], dmf.data_migration_id == ^data_migration_id())
|
||||
|> order_by([u], asc: u.id)
|
||||
end
|
||||
end
|
|
@ -2235,12 +2235,12 @@ defmodule Pleroma.User do
|
|||
Creates an internal service actor by URI if missing.
|
||||
Optionally takes nickname for addressing.
|
||||
"""
|
||||
@spec get_or_create_service_actor_by_ap_id(String.t(), String.t()) :: User.t() | nil
|
||||
def get_or_create_service_actor_by_ap_id(uri, nickname) do
|
||||
@spec get_or_create_service_actor_by_ap_id(String.t(), String.t(), String.t()) :: User.t() | nil
|
||||
def get_or_create_service_actor_by_ap_id(uri, nickname, actor_type \\ "Service") do
|
||||
{_, user} =
|
||||
case get_cached_by_ap_id(uri) do
|
||||
nil ->
|
||||
with {:error, %{errors: errors}} <- create_service_actor(uri, nickname) do
|
||||
with {:error, %{errors: errors}} <- create_service_actor(uri, nickname, actor_type) do
|
||||
Logger.error("Cannot create service actor: #{uri}/.\n#{inspect(errors)}")
|
||||
{:error, nil}
|
||||
end
|
||||
|
@ -2262,15 +2262,16 @@ defmodule Pleroma.User do
|
|||
|> update_and_set_cache()
|
||||
end
|
||||
|
||||
@spec create_service_actor(String.t(), String.t()) ::
|
||||
@spec create_service_actor(String.t(), String.t(), String.t()) ::
|
||||
{:ok, User.t()} | {:error, Ecto.Changeset.t()}
|
||||
defp create_service_actor(uri, nickname) do
|
||||
defp create_service_actor(uri, nickname, actor_type) do
|
||||
%User{
|
||||
invisible: true,
|
||||
local: true,
|
||||
ap_id: uri,
|
||||
nickname: nickname,
|
||||
follower_address: uri <> "/followers"
|
||||
follower_address: uri <> "/followers",
|
||||
actor_type: actor_type
|
||||
}
|
||||
|> change
|
||||
|> put_private_key()
|
||||
|
|
|
@ -11,12 +11,13 @@ defmodule Pleroma.Web.ActivityPub.Relay do
|
|||
require Logger
|
||||
|
||||
@nickname "relay"
|
||||
@actor_type "Application"
|
||||
|
||||
@spec ap_id() :: String.t()
|
||||
def ap_id, do: "#{Pleroma.Web.Endpoint.url()}/#{@nickname}"
|
||||
|
||||
@spec get_actor() :: User.t() | nil
|
||||
def get_actor, do: User.get_or_create_service_actor_by_ap_id(ap_id(), @nickname)
|
||||
def get_actor, do: User.get_or_create_service_actor_by_ap_id(ap_id(), @nickname, @actor_type)
|
||||
|
||||
@spec follow(String.t()) :: {:ok, Activity.t()} | {:error, any()}
|
||||
def follow(target_instance) do
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2024 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Repo.Migrations.DataMigrationUpdateRelayActorType do
|
||||
use Ecto.Migration
|
||||
|
||||
def up do
|
||||
dt = NaiveDateTime.utc_now()
|
||||
|
||||
execute(
|
||||
"INSERT INTO data_migrations(name, inserted_at, updated_at) " <>
|
||||
"VALUES ('update_relay_actor_type', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;"
|
||||
)
|
||||
end
|
||||
|
||||
def down do
|
||||
execute("DELETE FROM data_migrations WHERE name = 'update_relay_actor_type';")
|
||||
end
|
||||
end
|
|
@ -68,6 +68,20 @@ defmodule Pleroma.UserTest do
|
|||
end) =~ "Cannot create service actor:"
|
||||
end
|
||||
|
||||
test "returns user of given type" do
|
||||
uri = "#{Pleroma.Web.Endpoint.url()}/relay"
|
||||
followers_uri = "#{uri}/followers"
|
||||
|
||||
assert %User{
|
||||
nickname: "relay",
|
||||
invisible: true,
|
||||
local: true,
|
||||
ap_id: ^uri,
|
||||
follower_address: ^followers_uri,
|
||||
actor_type: "Application"
|
||||
} = User.get_or_create_service_actor_by_ap_id(uri, "relay", "Application")
|
||||
end
|
||||
|
||||
test "returns invisible actor" do
|
||||
uri = "#{Pleroma.Web.Endpoint.url()}/internal/fetch-test"
|
||||
followers_uri = "#{uri}/followers"
|
||||
|
|
|
@ -24,6 +24,11 @@ defmodule Pleroma.Web.ActivityPub.RelayTest do
|
|||
assert User.invisible?(user)
|
||||
end
|
||||
|
||||
test "relay actor is of type Application" do
|
||||
user = Relay.get_actor()
|
||||
assert user.actor_type == "Application"
|
||||
end
|
||||
|
||||
describe "follow/1" do
|
||||
test "returns errors when user not found" do
|
||||
assert capture_log(fn ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue