Files
self 4658b76241
CI / Test (push) Failing after 22s
feat: prefix Kobold plugin slug
Rename Kobold and Trust plugin identity references to tribe-one-prefixed slugs across runtime API, e2e fixtures, and docs.
2026-06-17 22:33:25 +02:00

639 lines
20 KiB
Elixir

defmodule TribeOne.TribesPlugin.Kobold.API do
@moduledoc false
alias TribeOne.TribesPlugin.Kobold
alias TribeOne.TribesPlugin.Kobold.Bookmark
alias TribeOne.TribesPlugin.Kobold.Commit
alias TribeOne.TribesPlugin.Kobold.Dataset
alias TribeOne.TribesPlugin.Kobold.Projector
alias TribeOne.TribesPlugin.Kobold.RecordProjection
alias TribeOne.TribesPlugin.Kobold.ResourceDefinition
def health do
{:ok, %{ok: true, plugin: "tribe-one-kobold", version: manifest_version()}}
end
def management_health(_context, _params), do: health()
def management_schema(_context, _params), do: schema()
def management_reset(_context, params), do: reset(params)
def management_create_dataset(_context, params), do: create_dataset(params)
def management_create_resource_definition(_context, params),
do: create_resource_definition(params)
def management_upsert_record(_context, params), do: upsert_record(params)
def management_state(_context, params), do: state(params)
def management_access_rule_create(_context, params), do: create_access_rule(params)
def management_rebuild_projections(_context, params), do: rebuild_projections(params)
def schema do
{:ok,
%{
ok: true,
schema: %{
version: 2,
tables: %{
datasets: table_present?("kobold_datasets"),
resource_definitions: table_present?("kobold_resource_definitions"),
commits: table_present?("kobold_commits"),
bookmarks: table_present?("kobold_bookmarks"),
record_projections: table_present?("kobold_record_projections")
},
capabilities: %{
public_dataset_sync: true,
private_cluster_sync: true,
commit_chunks: true,
private_local_datasets: false,
schema_as_data: true
}
}
}}
end
def reset(%{"run_id" => run_id}) when is_binary(run_id) and run_id != "" do
tables = [
"kobold_record_projections",
"kobold_bookmarks",
"kobold_commits",
"kobold_resource_definitions",
"kobold_datasets"
]
deleted =
Map.new(tables, fn table ->
{:ok, result} =
Ecto.Adapters.SQL.query(Tribes.Repo, "DELETE FROM #{table} WHERE run_id = $1", [run_id])
{table, result.num_rows}
end)
{:ok, %{ok: true, run_id: run_id, deleted: deleted}}
end
def reset(_params), do: {:error, :invalid_run_id}
def create_dataset(params) do
attrs = %{
run_id: optional_string(params, "run_id"),
name: required_string!(params, "name"),
description: optional_string(params, "description"),
owner_pubkey: optional_string(params, "owner_pubkey"),
type: optional_string(params, "type") || "org.tribes.kobold.dataset",
schema_name: optional_string(params, "schema_name"),
metadata: map_param(params, "metadata")
}
action =
if visibility(params) == :public, do: :create_public_dataset, else: :create_private_dataset
with {:ok, dataset} <- apply(Kobold, action, [attrs, [authorize?: false]]),
:ok <- maybe_seed_dataset_policy(dataset) do
{:ok, %{ok: true, dataset: dataset_json(dataset)}}
end
rescue
error in ArgumentError -> {:error, error.message}
end
def create_resource_definition(%{"dataset_id" => dataset_id} = params)
when is_binary(dataset_id) and dataset_id != "" do
with {:ok, dataset} <- Kobold.get_dataset(dataset_id, authorize?: false) do
attrs = %{
dataset_id: dataset.id,
run_id: dataset.run_id,
name: required_string!(params, "name"),
display_name: optional_string(params, "display_name"),
fields: fields_param(params),
metadata: map_param(params, "metadata")
}
action =
if dataset.visibility == :public,
do: :create_public_resource_definition,
else: :create_private_resource_definition
with {:ok, definition} <- apply(Kobold, action, [attrs, [authorize?: false]]) do
{:ok, %{ok: true, resource: resource_definition_json(definition)}}
end
end
rescue
error in ArgumentError -> {:error, error.message}
end
def create_resource_definition(_params), do: {:error, :invalid_dataset_id}
def upsert_record(%{"dataset_id" => dataset_id} = params)
when is_binary(dataset_id) and dataset_id != "" do
resource_name = required_string!(params, "resource_name")
with {:ok, dataset} <- Kobold.get_dataset(dataset_id, authorize?: false),
:ok <- authorize_dataset(params, dataset, "write"),
{:ok, definition} <-
Kobold.get_resource_definition(dataset.id, resource_name, authorize?: false) do
fields = map_param(params, "fields")
:ok = validate_fields!(definition.fields, fields)
record_id = optional_string(params, "record_id") || Ash.UUID.generate()
operation = record_upsert_operation(resource_name, record_id, fields, params)
with {:ok, commit} <- create_commit(dataset, [operation], params),
{:ok, bookmark} <- upsert_main_bookmark(dataset, commit),
{:ok, projection} <- record_projection(dataset.id, resource_name, record_id) do
{:ok,
%{
ok: true,
commit: commit_json(commit),
bookmark: bookmark_json(bookmark),
event: operation_event_json(commit, operation),
record: record_projection_json(projection)
}}
end
end
rescue
error in ArgumentError -> {:error, error.message}
end
def upsert_record(_params), do: {:error, :invalid_dataset_id}
def state(params) do
run_id = optional_string(params, "run_id")
with {:ok, datasets} <- datasets_for_run(run_id),
{:ok, datasets} <- authorize_datasets(params, datasets, "read"),
{:ok, resources} <- resources_for_datasets(datasets),
{:ok, commits} <- commits_for_run(run_id, datasets),
{:ok, bookmarks} <- bookmarks_for_datasets(datasets),
{:ok, records} <- record_projections_for_datasets(datasets) do
{:ok,
%{
ok: true,
run_id: run_id,
datasets: Enum.map(datasets, &dataset_json/1),
resources: Enum.map(resources, &resource_definition_json/1),
commits: Enum.map(commits, &commit_json/1),
bookmarks: Enum.map(bookmarks, &bookmark_json/1),
events: commits_to_event_json(commits),
records: Enum.map(records, &record_projection_json/1)
}}
end
end
def create_access_rule(params) do
with {:ok, rule} <-
Tribes.Access.put_rule(%{
plugin_name: "tribe-one-kobold",
resource_type: required_string!(params, "resource_type"),
resource_id: Map.get(params, "resource_id", "*"),
action: required_string!(params, "action"),
subject_type: Map.get(params, "subject_type", "tribe"),
subject_id: Map.get(params, "subject_id", "*"),
effect: Map.get(params, "effect", "allow"),
condition: Map.get(params, "condition", "none"),
min_trust_score: Map.get(params, "min_trust_score"),
note: optional_string(params, "note")
}) do
{:ok, %{ok: true, rule: rule}}
end
rescue
error in ArgumentError -> {:error, error.message}
end
def rebuild_projections(params) do
run_id = optional_string(params, "run_id")
with {:ok, datasets} <- datasets_for_run(run_id),
{:ok, commits} <- commits_for_run(run_id, datasets),
:ok <- clear_projections(datasets),
{:ok, applied} <- apply_commits(commits) do
{:ok, %{ok: true, rebuilt: applied, commits: length(commits)}}
end
end
defp create_commit(%Dataset{} = dataset, operations, params) do
parent_commit_ids =
case Kobold.get_bookmark(dataset.id, "main", authorize?: false) do
{:ok, %{commit_id: commit_id}} when is_binary(commit_id) -> [commit_id]
_other -> []
end
Kobold.create_commit(
%{
dataset_id: dataset.id,
run_id: dataset.run_id,
parent_commit_ids: parent_commit_ids,
message: optional_string(params, "message") || "Update #{dataset.name}",
operations: operations,
metadata: map_param(params, "metadata"),
author_pubkey:
optional_string(params, "actor_pubkey") ||
optional_string(params, "remote_tribe_pubkey")
},
authorize?: false
)
end
defp upsert_main_bookmark(%Dataset{} = dataset, %{id: commit_id}) when is_binary(commit_id) do
Kobold.upsert_bookmark(
%{
dataset_id: dataset.id,
run_id: dataset.run_id,
name: "main",
commit_id: commit_id,
metadata: %{}
},
authorize?: false
)
end
defp record_upsert_operation(resource_name, record_id, fields, params) do
%{
"op" => "upsert",
"resource_name" => resource_name,
"record_id" => record_id,
"fields" => fields,
"metadata" => map_param(params, "metadata"),
"actor_pubkey" =>
optional_string(params, "actor_pubkey") || optional_string(params, "remote_tribe_pubkey")
}
end
defp maybe_seed_dataset_policy(%Dataset{visibility: :public} = dataset) do
with {:ok, _read_rule} <-
Tribes.Access.put_rule(%{
plugin_name: "tribe-one-kobold",
resource_type: "kobold.dataset",
resource_id: dataset.id,
action: "read",
subject_type: "tribe",
subject_id: "*",
effect: "allow",
condition: "min_trust_score",
min_trust_score: 0,
note: "Default public dataset read access after Trust handshake"
}),
{:ok, _advertise_rule} <-
Tribes.Access.put_rule(%{
plugin_name: "tribe-one-kobold",
resource_type: "kobold.dataset",
resource_id: dataset.id,
action: "advertise",
subject_type: "tribe",
subject_id: "*",
effect: "allow",
condition: "min_trust_score",
min_trust_score: 0,
note: "Default public dataset advertisement after Trust handshake"
}) do
:ok
else
{:error, reason} -> {:error, reason}
end
end
defp maybe_seed_dataset_policy(%Dataset{}), do: :ok
defp authorize_datasets(params, datasets, action) do
case remote_subject(params) do
nil ->
{:ok, datasets}
subject ->
{:ok, Enum.filter(datasets, &dataset_allowed?(subject, &1, action))}
end
end
defp authorize_dataset(params, dataset, action) do
case remote_subject(params) do
nil ->
:ok
subject ->
if dataset_allowed?(subject, dataset, action) do
:ok
else
{:error, :access_denied}
end
end
end
defp dataset_allowed?(subject, dataset, action) do
Tribes.Access.allowed?(subject, action, Tribes.Access.resource("kobold.dataset", dataset.id))
end
defp remote_subject(params) do
case optional_string(params, "remote_tribe_pubkey") do
nil -> nil
pubkey -> Tribes.Access.subject(:tribe, pubkey)
end
end
defp datasets_for_run(nil), do: Kobold.list_datasets(authorize?: false)
defp datasets_for_run(run_id), do: Kobold.list_datasets_by_run(run_id, authorize?: false)
defp resources_for_datasets(datasets) do
ids = dataset_ids(datasets)
with {:ok, resources} <- Ash.read(ResourceDefinition, authorize?: false) do
{:ok,
resources |> Enum.filter(&MapSet.member?(ids, &1.dataset_id)) |> Enum.sort_by(& &1.name)}
end
end
defp commits_for_run(_run_id, []), do: {:ok, []}
defp commits_for_run(nil, datasets) do
ids = dataset_ids(datasets)
with {:ok, commits} <- Ash.read(Commit, authorize?: false) do
{:ok,
commits
|> Enum.filter(&MapSet.member?(ids, &1.dataset_id))
|> Enum.sort_by(& &1.inserted_at)}
end
end
defp commits_for_run(run_id, datasets) do
ids = dataset_ids(datasets)
with {:ok, commits} <- Kobold.list_commits_by_run(run_id, authorize?: false) do
{:ok,
commits
|> Enum.filter(&MapSet.member?(ids, &1.dataset_id))
|> Enum.sort_by(& &1.inserted_at)}
end
end
defp bookmarks_for_datasets([]), do: {:ok, []}
defp bookmarks_for_datasets(datasets) do
ids = dataset_ids(datasets)
with {:ok, bookmarks} <- Ash.read(Bookmark, authorize?: false) do
{:ok,
bookmarks |> Enum.filter(&MapSet.member?(ids, &1.dataset_id)) |> Enum.sort_by(& &1.name)}
end
end
defp record_projections_for_datasets([]), do: {:ok, []}
defp record_projections_for_datasets(datasets) do
ids = dataset_ids(datasets)
with {:ok, records} <- Ash.read(RecordProjection, authorize?: false) do
{:ok,
records
|> Enum.filter(&(MapSet.member?(ids, &1.dataset_id) and not &1.deleted?))
|> Enum.sort_by(&{&1.dataset_id, &1.resource_name, &1.record_id})}
end
end
defp record_projection(dataset_id, resource_name, record_id) do
with {:ok, records} <-
Kobold.list_record_projections_by_dataset(dataset_id, authorize?: false),
%RecordProjection{} = record <-
Enum.find(records, &(&1.resource_name == resource_name and &1.record_id == record_id)) do
{:ok, record}
else
nil -> {:error, :record_projection_not_found}
{:error, _reason} = error -> error
end
end
defp clear_projections([]), do: :ok
defp clear_projections(datasets) do
ids = Enum.map(datasets, & &1.id)
placeholders = ids |> Enum.with_index(1) |> Enum.map(fn {_id, index} -> "$#{index}" end)
sql =
"DELETE FROM kobold_record_projections WHERE dataset_id IN (#{Enum.join(placeholders, ",")})"
case Ecto.Adapters.SQL.query(Tribes.Repo, sql, ids) do
{:ok, _result} -> :ok
{:error, reason} -> {:error, reason}
end
end
defp apply_commits(commits) do
Enum.reduce_while(commits, {:ok, 0}, fn commit, {:ok, count} ->
case Projector.apply_commit(commit) do
{:ok, applied} -> {:cont, {:ok, count + applied}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp dataset_ids(datasets), do: datasets |> Enum.map(& &1.id) |> MapSet.new()
defp validate_fields!(schema, fields) do
Enum.each(schema, fn {name, definition} ->
required? = truthy?(Map.get(definition, "required"))
present? = Map.has_key?(fields, name)
if required? and not present? do
raise ArgumentError, "missing required field #{name}"
end
if present? do
validate_field_type!(name, Map.get(definition, "type", "string"), Map.get(fields, name))
end
end)
:ok
end
defp validate_field_type!(_name, "string", value) when is_binary(value), do: :ok
defp validate_field_type!(_name, "integer", value) when is_integer(value), do: :ok
defp validate_field_type!(_name, "number", value) when is_number(value), do: :ok
defp validate_field_type!(_name, "boolean", value) when is_boolean(value), do: :ok
defp validate_field_type!(_name, "map", value) when is_map(value), do: :ok
defp validate_field_type!(_name, "any", _value), do: :ok
defp validate_field_type!(name, type, _value) do
raise ArgumentError, "invalid #{type} field #{name}"
end
defp dataset_json(%Dataset{} = dataset) do
%{
id: dataset.id,
run_id: dataset.run_id,
name: dataset.name,
description: dataset.description,
owner_pubkey: dataset.owner_pubkey,
type: dataset.type,
schema_name: dataset.schema_name,
schema_version: dataset.schema_version,
visibility: to_string(dataset.visibility),
replication_mode: to_string(dataset.replication_mode),
origin_dataset_id: dataset.origin_dataset_id,
metadata: dataset.metadata,
inserted_at: datetime_json(dataset.inserted_at),
updated_at: datetime_json(dataset.updated_at)
}
end
defp resource_definition_json(%ResourceDefinition{} = definition) do
%{
id: definition.id,
dataset_id: definition.dataset_id,
run_id: definition.run_id,
name: definition.name,
display_name: definition.display_name,
fields: definition.fields,
metadata: definition.metadata,
inserted_at: datetime_json(definition.inserted_at),
updated_at: datetime_json(definition.updated_at)
}
end
defp commit_json(commit) do
%{
id: commit.id,
dataset_id: commit.dataset_id,
run_id: commit.run_id,
change_id: commit.change_id,
parent_commit_ids: commit.parent_commit_ids || [],
message: commit.message,
operations: commit.operations || [],
metadata: commit.metadata,
author_pubkey: commit.author_pubkey,
inserted_at: datetime_json(commit.inserted_at),
updated_at: datetime_json(commit.updated_at)
}
end
defp bookmark_json(bookmark) do
%{
id: bookmark.id,
dataset_id: bookmark.dataset_id,
run_id: bookmark.run_id,
name: bookmark.name,
commit_id: bookmark.commit_id,
metadata: bookmark.metadata,
inserted_at: datetime_json(bookmark.inserted_at),
updated_at: datetime_json(bookmark.updated_at)
}
end
defp operation_event_json(commit, operation) do
%{
id: commit.id,
commit_id: commit.id,
dataset_id: commit.dataset_id,
run_id: commit.run_id,
sequence: nil,
resource_name: Map.get(operation, "resource_name"),
record_id: Map.get(operation, "record_id"),
event_type: operation_event_type(operation),
fields: Map.get(operation, "fields", %{}),
metadata: Map.get(operation, "metadata", %{}),
actor_pubkey: Map.get(operation, "actor_pubkey"),
inserted_at: datetime_json(commit.inserted_at),
updated_at: datetime_json(commit.updated_at)
}
end
defp commits_to_event_json(commits) do
Enum.flat_map(commits, fn commit ->
commit.operations
|> List.wrap()
|> Enum.map(&operation_event_json(commit, &1))
end)
end
defp operation_event_type(operation) do
case Map.get(operation, "op") do
"delete" -> "record_delete"
"record_delete" -> "record_delete"
_other -> "record_upsert"
end
end
defp record_projection_json(%RecordProjection{} = projection) do
%{
dataset_id: projection.dataset_id,
run_id: projection.run_id,
resource_name: projection.resource_name,
record_id: projection.record_id,
fields: projection.fields,
deleted: projection.deleted?,
head_commit_id: projection.head_commit_id
}
end
defp table_present?(table) do
query = "SELECT to_regclass($1)::text"
case Ecto.Adapters.SQL.query(Tribes.Repo, query, ["public.#{table}"]) do
{:ok, %{rows: [[name]]}} when is_binary(name) -> true
_other -> false
end
end
defp required_string!(params, key) do
case Map.get(params, key) do
value when is_binary(value) and value != "" -> value
_other -> raise ArgumentError, "missing or invalid #{key}"
end
end
defp optional_string(params, key) do
case Map.get(params, key) do
value when is_binary(value) and value != "" -> value
_other -> nil
end
end
defp visibility(params) do
case Map.get(params, "visibility", "private") do
"public" -> :public
"private" -> :private
other when is_atom(other) and other in [:public, :private] -> other
_other -> raise ArgumentError, "visibility must be public or private"
end
end
defp map_param(params, key) do
case Map.get(params, key, %{}) do
value when is_map(value) -> value
_other -> raise ArgumentError, "#{key} must be an object"
end
end
defp fields_param(params) do
fields = map_param(params, "fields")
Enum.each(fields, fn
{name, definition} when is_binary(name) and is_map(definition) ->
type = Map.get(definition, "type", "string")
unless type in ["string", "integer", "number", "boolean", "map", "any"] do
raise ArgumentError, "unsupported field type #{type}"
end
_other ->
raise ArgumentError, "fields must map names to field definitions"
end)
fields
end
defp truthy?(true), do: true
defp truthy?("true"), do: true
defp truthy?(1), do: true
defp truthy?(_), do: false
defp manifest_version do
case :application.get_key(:tribe_one_kobold, :vsn) do
{:ok, version} -> List.to_string(version)
_other -> "0.0.0"
end
end
defp datetime_json(nil), do: nil
defp datetime_json(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)
defp datetime_json(%NaiveDateTime{} = datetime),
do: datetime |> DateTime.from_naive!("Etc/UTC") |> DateTime.to_iso8601()
end