You've already forked tribes-plugin-aether
forked from tribes/tribes-plugin-template
d257221dc8
Pin marmot-ts 0.5.1, expose Marmot chat routes/configuration, and keep the web UI disabled until transport, storage, and signing adapters are implemented.
237 lines
7.0 KiB
Elixir
237 lines
7.0 KiB
Elixir
defmodule Aether.Chat do
|
|
@moduledoc """
|
|
Chat domain and provider facade for Aether.
|
|
"""
|
|
|
|
use Ash.Domain,
|
|
otp_app: :aether
|
|
|
|
require Ash.Query
|
|
|
|
alias Aether.Chat.{Channel, Message}
|
|
|
|
@default_channel_slug "general"
|
|
@default_channel_title "General Chat"
|
|
@pubsub Tribes.PubSub
|
|
|
|
resources do
|
|
resource Channel do
|
|
define(:create_channel, action: :create)
|
|
define(:get_channel, action: :by_id, args: [:id])
|
|
define(:get_channel_by_slug, action: :by_slug, args: [:slug])
|
|
define(:list_channels, action: :read)
|
|
end
|
|
|
|
resource Message do
|
|
define(:create_message, action: :create)
|
|
define(:get_message, action: :by_id, args: [:id])
|
|
define(:list_all_messages, action: :read)
|
|
end
|
|
end
|
|
|
|
def supported_conversation_kinds, do: [:group, :context_group]
|
|
|
|
def supported_backends, do: [:public_sync, :marmot]
|
|
|
|
def default_channel_slug, do: @default_channel_slug
|
|
|
|
def standalone_path(%Channel{slug: slug}), do: standalone_path(slug)
|
|
def standalone_path(slug) when is_binary(slug), do: "/aether/chat/" <> slug
|
|
|
|
def embed_path(%Channel{slug: slug}), do: embed_path(slug)
|
|
def embed_path(slug) when is_binary(slug), do: "/aether/chat/embed/" <> slug
|
|
|
|
def marmot_path(%Channel{slug: slug}), do: marmot_path(slug)
|
|
def marmot_path(slug) when is_binary(slug), do: "/aether/chat/marmot/" <> slug
|
|
|
|
def ensure_context_channel(provider, type, id, attrs \\ %{}, opts \\ [])
|
|
when is_binary(provider) and is_binary(type) and is_binary(id) and is_map(attrs) do
|
|
attrs
|
|
|> Map.merge(%{
|
|
context_provider: provider,
|
|
context_type: type,
|
|
context_id: id
|
|
})
|
|
|> ensure_channel(opts)
|
|
end
|
|
|
|
def ash_opts(context \\ nil) do
|
|
[
|
|
authorize?: false,
|
|
context: %{
|
|
private: %{
|
|
system?: true,
|
|
system_purpose: :aether_chat,
|
|
plugin_management_context: context
|
|
}
|
|
}
|
|
]
|
|
end
|
|
|
|
def ensure_channel(attrs \\ %{}, opts \\ []) when is_map(attrs) do
|
|
attrs = normalize_channel_attrs(attrs)
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
case get_existing_channel(attrs.slug, ash_opts) do
|
|
{:ok, nil} -> create_channel(attrs, ash_opts)
|
|
{:ok, %Channel{} = channel} -> {:ok, channel}
|
|
{:error, _reason} = error -> error
|
|
end
|
|
end
|
|
|
|
defp get_existing_channel(slug, ash_opts) do
|
|
Channel
|
|
|> Ash.Query.filter(slug == ^slug)
|
|
|> Ash.Query.limit(1)
|
|
|> Ash.read_one(ash_opts)
|
|
end
|
|
|
|
def list_messages(channel_or_id, opts \\ []) do
|
|
channel_id = channel_id(channel_or_id)
|
|
limit = Keyword.get(opts, :limit, 100)
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
result =
|
|
Message
|
|
|> Ash.Query.filter(channel_id == ^channel_id)
|
|
|> Ash.Query.sort(inserted_at: :desc, id: :desc)
|
|
|> Ash.Query.limit(limit)
|
|
|> Ash.read(ash_opts)
|
|
|
|
case result do
|
|
{:ok, messages} -> {:ok, Enum.reverse(messages)}
|
|
{:error, _reason} = error -> error
|
|
end
|
|
end
|
|
|
|
def post_message(%Channel{} = channel, attrs, opts \\ []) when is_map(attrs) do
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
attrs
|
|
|> normalize_message_attrs(channel)
|
|
|> create_message(ash_opts)
|
|
|> maybe_broadcast_message()
|
|
end
|
|
|
|
def subscribe_channel(%Channel{} = channel), do: subscribe_channel(channel.id)
|
|
|
|
def subscribe_channel(channel_id) when is_binary(channel_id) do
|
|
if pubsub_started?() do
|
|
Phoenix.PubSub.subscribe(@pubsub, topic(channel_id))
|
|
else
|
|
{:error, :pubsub_not_started}
|
|
end
|
|
end
|
|
|
|
def topic(channel_id) when is_binary(channel_id), do: "aether:chat:" <> channel_id
|
|
|
|
defp maybe_broadcast_message({:ok, %Message{} = message}) do
|
|
broadcast_message(message)
|
|
{:ok, message}
|
|
end
|
|
|
|
defp maybe_broadcast_message({:error, _reason} = error), do: error
|
|
|
|
defp broadcast_message(%Message{} = message) do
|
|
if pubsub_started?() do
|
|
Phoenix.PubSub.broadcast_from(
|
|
@pubsub,
|
|
self(),
|
|
topic(message.channel_id),
|
|
{:aether_chat, :message, message}
|
|
)
|
|
end
|
|
end
|
|
|
|
defp pubsub_started?, do: Process.whereis(@pubsub) != nil
|
|
|
|
defp normalize_channel_attrs(attrs) do
|
|
attrs = atomize_known_keys(attrs)
|
|
title = attrs[:title] || @default_channel_title
|
|
slug = attrs[:slug] || context_slug(attrs) || slugify(title) || @default_channel_slug
|
|
|
|
%{
|
|
slug: slug,
|
|
title: title,
|
|
description: attrs[:description],
|
|
backend: attrs[:backend] || :public_sync,
|
|
conversation_kind: attrs[:conversation_kind] || default_conversation_kind(attrs),
|
|
context_provider: attrs[:context_provider],
|
|
context_type: attrs[:context_type],
|
|
context_id: attrs[:context_id],
|
|
metadata: attrs[:metadata] || %{}
|
|
}
|
|
end
|
|
|
|
defp normalize_message_attrs(attrs, %Channel{} = channel) do
|
|
attrs = atomize_known_keys(attrs)
|
|
|
|
%{
|
|
channel_id: channel.id,
|
|
author_id: attrs[:author_id],
|
|
author_pubkey: attrs[:author_pubkey],
|
|
body: String.trim(attrs[:body] || ""),
|
|
client_message_id: attrs[:client_message_id],
|
|
metadata: attrs[:metadata] || %{}
|
|
}
|
|
end
|
|
|
|
defp atomize_known_keys(attrs) do
|
|
Enum.reduce(attrs, %{}, fn
|
|
{key, value}, acc when is_atom(key) -> Map.put(acc, key, value)
|
|
{key, value}, acc when is_binary(key) -> put_known_key(acc, key, value)
|
|
end)
|
|
end
|
|
|
|
defp put_known_key(acc, key, value) do
|
|
case key do
|
|
"slug" -> Map.put(acc, :slug, value)
|
|
"title" -> Map.put(acc, :title, value)
|
|
"description" -> Map.put(acc, :description, value)
|
|
"backend" -> Map.put(acc, :backend, parse_atom(value))
|
|
"conversation_kind" -> Map.put(acc, :conversation_kind, parse_atom(value))
|
|
"context_provider" -> Map.put(acc, :context_provider, value)
|
|
"context_type" -> Map.put(acc, :context_type, value)
|
|
"context_id" -> Map.put(acc, :context_id, value)
|
|
"metadata" -> Map.put(acc, :metadata, value)
|
|
"author_id" -> Map.put(acc, :author_id, value)
|
|
"author_pubkey" -> Map.put(acc, :author_pubkey, value)
|
|
"body" -> Map.put(acc, :body, value)
|
|
"client_message_id" -> Map.put(acc, :client_message_id, value)
|
|
_other -> acc
|
|
end
|
|
end
|
|
|
|
defp parse_atom(value) when is_atom(value), do: value
|
|
defp parse_atom("public_sync"), do: :public_sync
|
|
defp parse_atom("marmot"), do: :marmot
|
|
defp parse_atom("group"), do: :group
|
|
defp parse_atom("context_group"), do: :context_group
|
|
defp parse_atom(_value), do: nil
|
|
|
|
defp context_slug(%{context_provider: provider, context_type: type, context_id: id})
|
|
when is_binary(provider) and is_binary(type) and is_binary(id) do
|
|
Enum.map_join([provider, type, id], "-", &slugify/1)
|
|
end
|
|
|
|
defp context_slug(_attrs), do: nil
|
|
|
|
defp default_conversation_kind(%{context_provider: provider}) when is_binary(provider),
|
|
do: :context_group
|
|
|
|
defp default_conversation_kind(_attrs), do: :group
|
|
|
|
defp channel_id(%Channel{id: id}), do: id
|
|
defp channel_id(id) when is_binary(id), do: id
|
|
|
|
defp slugify(value) when is_binary(value) do
|
|
slug =
|
|
value
|
|
|> String.downcase()
|
|
|> String.replace(~r/[^a-z0-9]+/u, "-")
|
|
|> String.trim("-")
|
|
|
|
if slug == "", do: nil, else: slug
|
|
end
|
|
end
|