檢視原始碼 Phoenix.Presence 行為 (Phoenix v1.7.14)

提供追蹤臨在狀態給程序和頻道。

此行為提供臨在狀態功能,例如提取給定主題的臨在狀態,以及處理在即時發生的加入和離開事件的差異。使用這個模組會定義一個監視程式和一個實作 Phoenix.Tracker 行為的模組,該行為使用 Phoenix.PubSub 廣播追蹤臨在狀態的更新。

如果你只想要使用 Phoenix.Presence 所提供的功能中的某個子集,例如追蹤程序但沒有廣播更新,我們建議你查看 phoenix_pubsub 計畫中的 Phoenix.Tracker 功能。

範例用法

從在你應用程式內定義一個臨在狀態模組開始,該模組使用 Phoenix.Presence 並提供包含你的組態的 :otp_app,以及 :pubsub_server

defmodule MyAppWeb.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end

:pubsub_server 必須指向一個在你的應用程式中執行的現有 pubsub 伺服器,對於新的應用程式,此伺服器預設包含在 MyApp.PubSub 中。

接下來,在 lib/my_app/application.ex 的監視程式樹中新增新的監視程式。它必須在 PubSub 子項的後面,端點的前面

children = [
  ...
  {Phoenix.PubSub, name: MyApp.PubSub},
  MyAppWeb.Presence,
  MyAppWeb.Endpoint
]

新增後,就能在加入你的頻道後追蹤臨在狀態

defmodule MyAppWeb.MyChannel do
  use MyAppWeb, :channel
  alias MyAppWeb.Presence

  def join("some:topic", _params, socket) do
    send(self(), :after_join)
    {:ok, assign(socket, :user_id, ...)}
  end

  def handle_info(:after_join, socket) do
    {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
      online_at: inspect(System.system_time(:second))
    })

    push(socket, "presence_state", Presence.list(socket))
    {:noreply, socket}
  end
end

在上方的範例中,Presence.track 用於註冊此頻道的程序作為一個符有使用者 ID 的 socket 的臨在狀態,並提供一份臨在狀態資料。接下來,socket 的主題的目前臨在狀態資訊會以 "presence_state" 事件推送到用戶端。

最後,臨在狀態的加入和離開事件的差異會在即時發生時以 "presence_diff" 事件傳送給用戶端。差異結構會是 :joins:leaves 的映射,格式如下

%{
  joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}},
  leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
},

請參閱 list/1 瞭解臨在狀態資料結構的詳細資訊。

提取臨在狀態資訊

應該將身分資料最小化,用來儲存較小、暫時性的狀態,例如使用者的「線上」或「離開中」狀態。較詳細資料,例如需要從資料庫取得的使用者詳細資料,可透過覆寫 fetch/2 函式完成。

fetch/2 回呼會在使用 list/1 和每次更新時觸發,並作為一種機制,可以在將資訊廣播給所有頻道訂閱者之前單次擷取身分資訊。這可避免 N 個查詢問題,並提供單一處可將孤立資料擷取分組以延伸身分資料的位置。

此函式必須回傳符合概述的身分資料結構的資料映射表,包含 :metas 金鑰,但可延伸資訊映射表以包含任何其他資訊。例如

def fetch(_topic, presences) do
  users = presences |> Map.keys() |> Accounts.get_users_map()

  for {key, %{metas: metas}} <- presences, into: %{} do
    {key, %{metas: metas, user: users[String.to_integer(key)]}}
  end
end

Account.get_users_map/1 可實作如下

def get_users_map(ids) do
  query =
    from u in User,
      where: u.id in ^ids,
      select: {u.id, u}

  query |> Repo.all() |> Enum.into(%{})
end

上述 fetch/2 函式會從已註冊提供給定主題的身分狀態資料庫中取得所有使用者。接著會將身分狀態資訊延伸到使用者的資訊身分狀態的 :user 金鑰,同時仍保留原始身分狀態資料中的必備 :metas 欄位。

將 Elixir 作為身分狀態用戶端使用

身分狀態非常適合外部用戶端使用,例如 JavaScript 應用程式,但它也能用於從 Elixir 用戶端處理作業,在伺服器發生時追蹤身分狀態變更。這可藉由實作身分狀態模組中的非必備 init/1handle_metas/4 回呼來完成。例如,下列回呼會接收身分狀態資料變更,並廣播其他 Elixir 處理作業使用者加入和離開

defmodule MyApp.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub

  def init(_opts) do
    {:ok, %{}} # user-land state
  end

  def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
    # fetch existing presence information for the joined users and broadcast the
    # event to all subscribers
    for {user_id, presence} <- joins do
      user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
      msg = {MyApp.PresenceClient, {:join, user_data}}
      Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
    end

    # fetch existing presence information for the left users and broadcast the
    # event to all subscribers
    for {user_id, presence} <- leaves do
      metas =
        case Map.fetch(presences, user_id) do
          {:ok, presence_metas} -> presence_metas
          :error -> []
        end

      user_data = %{user: presence.user, metas: metas}
      msg = {MyApp.PresenceClient, {:leave, user_data}}
      Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
    end

    {:ok, state}
  end
end

handle_metas/4 回呼會接收主題、身分狀態差異、具有其身分資料的該主題的目前的狀態,以及從 init 和後續 handle_metas/4 呼叫累積的任何使用者端狀態。在我們的實作範例中,會在 diff 的 :joins:leaves 中存取,並根據已知身分資料資訊填妥一個完整的狀態。然後會廣播給本機節點訂閱者關於使用者加入和離開。

身分狀態測試

每次呼叫 fetch 回呼,都會從個別處理作業中進行呼叫。由於這些處理作業會非同步執行,因此通常需要確保在每個測試結束時關閉它們。可透過使用 ExUnit 的 on_exit 鉤子加上 fetchers_pids 函式來達成

on_exit(fn ->
  for pid <- MyAppWeb.Presence.fetchers_pids() do
    ref = Process.monitor(pid)
    assert_receive {:DOWN, ^ref, _, _, _}, 1000
  end
end)

摘要

回呼

使用附加數據延伸在場訊息。

傳回 Socket/主題金鑰配對的在場資料的對應。

接收在場資料的變更。

初始化在場客戶端狀態。

傳回 Socket/主題的在場訊息。

追蹤頻道的處理程序為一個在場狀態。

追蹤任意處理程序為一個在場狀態。

停止追蹤頻道的處理程序。

停止追蹤一個處理程序。

更新頻道在場狀態的資料。

更新處理程序在場狀態的資料。

類型

@type presence() :: %{key: String.t(), meta: map()}
@type presences() :: %{required(String.t()) => %{metas: [map()]}}
@type topic() :: String.t()

回呼

連結到此回呼

fetch(topic, presences)

查看程式碼
@callback fetch(topic(), presences()) :: presences()

使用附加數據延伸在場訊息。

list/1 用於列出指定 topic 的所有在場狀態時,此回呼會觸發一次,以在廣播給所有頻道用戶端之前,修改結果。此舉可以避免 N 次查詢問題,並提供一個擴充在場資料的單一位置。您必須傳回一組與原始結果相符的數據,包含 :metas 金鑰,但也可以延伸對應,包含任何額外資訊。

預設實作只會將 presences 直接傳遞,而不做任何變更。

範例

def fetch(_topic, presences) do
  query =
    from u in User,
      where: u.id in ^Map.keys(presences),
      select: {u.id, u}

  users = query |> Repo.all() |> Enum.into(%{})
  for {key, %{metas: metas}} <- presences, into: %{} do
    {key, %{metas: metas, user: users[key]}}
  end
end
@callback get_by_key(Phoenix.Socket.t() | topic(), key :: String.t()) :: [presence()]

傳回 Socket/主題金鑰配對的在場資料的對應。

範例

使用與 list/1 中每個在場狀態相同的數據格式,但僅傳回主題和金鑰配對下在場狀態的資料。例如,金鑰為 "user1" 的用戶,從兩台設備連線到同一個聊天室 "room:1",可以傳回

iex> MyPresence.get_by_key("room:1", "user1")
[%{name: "User 1", metas: [%{device: "Desktop"}, %{device: "Mobile"}]}]

如同 list/1,在場資料會傳遞至您的在場模組的 fetch 回呼,以取得任何附加資訊。

連結到此回呼

handle_metas(topic, diff, presences, state)

查看程式碼 (可選)
@callback handle_metas(
  topic :: String.t(),
  diff :: map(),
  presences :: map(),
  state :: term()
) ::
  {:ok, term()}

接收在場資料的變更。

@callback init(state :: term()) :: {:ok, new_state :: term()}

初始化在場客戶端狀態。

當你的 `presence` 模組啟動時呼叫,允許動態提供處理 `presence` 資訊的初始狀態。

@callback list(socket_or_topic :: Phoenix.Socket.t() | topic()) :: presences()

傳回 Socket/主題的在場訊息。

`presence` 資料結構

`presence` 資訊會被回傳為一個 `map`,其中 `presence` 是以字串形式做為主鍵並加以群組,並累積以下形式的資訊

%{key => %{metas: [%{phx_ref: ..., ...}, ...]}}

例如,假設有一個用戶 ID 為 123,他從兩個不同的裝置上線,以及一個用戶 ID 為 456,他只從一個裝置上線。以下為可能回傳的 `presence` 資訊

%{"123" => %{metas: [%{status: "away", phx_ref: ...},
                     %{status: "online", phx_ref: ...}]},
  "456" => %{metas: [%{status: "online", phx_ref: ...}]}}

`map` 的主鍵通常會指向一個資源 ID。該值會包含一個含有 :metas 鍵的 `map`,其中包含每個資源的一系列資訊。此外,每個資訊項都會包含一個 :phx_ref 鍵,可用於針對指定鍵唯一地識別資訊。如果資訊先前已更新,將會出現包含先前 :phx_ref 值的 :phx_ref_prev 鍵。

連結到此回呼

track(socket, key, meta)

查看程式碼
@callback track(socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

追蹤頻道的處理程序為一個在場狀態。

追蹤的 `presence` 會依據轉換為字串的 key 加以群組。例如,若要群組每個使用者的頻道,請使用使用者 ID 作為鍵。每個 `presence` 可以與資訊 `map` 關聯在一起,以儲存暫時的小狀態,例如使用者的上線狀態。若要儲存詳細資訊,請參閱 fetch/2

範例

alias MyApp.Presence
def handle_info(:after_join, socket) do
  {:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
    online_at: inspect(System.system_time(:second))
  })
  {:noreply, socket}
end
連結到此回呼

track(pid, topic, key, meta)

查看程式碼
@callback track(pid(), topic(), key :: String.t(), meta :: map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

追蹤任意處理程序為一個在場狀態。

track/3 相同,除了透過 topickey 追蹤任何程序。

@callback untrack(socket :: Phoenix.Socket.t(), key :: String.t()) :: :ok

停止追蹤頻道的處理程序。

連結到此回呼

untrack(pid, topic, key)

查看程式碼
@callback untrack(pid(), topic(), key :: String.t()) :: :ok

停止追蹤一個處理程序。

連結到此回呼

update(socket, key, meta)

查看程式碼
@callback update(
  socket :: Phoenix.Socket.t(),
  key :: String.t(),
  meta :: map() | (map() -> map())
) :: {:ok, ref :: binary()} | {:error, reason :: term()}

更新頻道在場狀態的資料。

透過傳遞一個新的 `map` 或一個接收當前 `map` 並回傳一個新的 `map` 的函式,取代 `presence` 的資訊。

連結到此回呼

update(pid, topic, key, meta)

查看程式碼
@callback update(pid(), topic(), key :: String.t(), meta :: map() | (map() -> map())) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

更新處理程序在場狀態的資料。

update/3 相同,但使用一個任意的程序。