檢視原始碼 Phoenix.Presence 行為 (Phoenix v1.7.14)
提供追蹤臨在狀態給程序和頻道。
此行為提供臨在狀態功能,例如提取給定主題的臨在狀態,以及處理在即時發生的加入和離開事件的差異。使用這個模組會定義一個監視程式和一個實作 Phoenix.Tracker
行為的模組,該行為使用 Phoenix.PubSub
廣播追蹤臨在狀態的更新。
如果你只想要使用 Phoenix.Presence
所提供的功能中的某個子集,例如追蹤程序但沒有廣播更新,我們建議你查看 phoenix_pubsub
計畫中的 Phoenix.Tracker
功能。
範例用法
從在你應用程式內定義一個臨在狀態模組開始,該模組使用 Phoenix.Presence
並提供包含你的組態的 :otp_app
,以及 :pubsub_server
。
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end
此 :pubsub_server
必須指向一個在你的應用程式中執行的現有 pubsub 伺服器,對於新的應用程式,此伺服器預設包含在 MyApp.PubSub
中。
接下來,在 lib/my_app/application.ex
的監視程式樹中新增新的監視程式。它必須在 PubSub 子項的後面,端點的前面
children = [
...
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Presence,
MyAppWeb.Endpoint
]
新增後,就能在加入你的頻道後追蹤臨在狀態
defmodule MyAppWeb.MyChannel do
use MyAppWeb, :channel
alias MyAppWeb.Presence
def join("some:topic", _params, socket) do
send(self(), :after_join)
{:ok, assign(socket, :user_id, ...)}
end
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
push(socket, "presence_state", Presence.list(socket))
{:noreply, socket}
end
end
在上方的範例中,Presence.track
用於註冊此頻道的程序作為一個符有使用者 ID 的 socket 的臨在狀態,並提供一份臨在狀態資料。接下來,socket 的主題的目前臨在狀態資訊會以 "presence_state"
事件推送到用戶端。
最後,臨在狀態的加入和離開事件的差異會在即時發生時以 "presence_diff" 事件傳送給用戶端。差異結構會是 :joins
和 :leaves
的映射,格式如下
%{
joins: %{"123" => %{metas: [%{status: "away", phx_ref: ...}]}},
leaves: %{"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
},
請參閱 list/1
瞭解臨在狀態資料結構的詳細資訊。
提取臨在狀態資訊
應該將身分資料最小化,用來儲存較小、暫時性的狀態,例如使用者的「線上」或「離開中」狀態。較詳細資料,例如需要從資料庫取得的使用者詳細資料,可透過覆寫 fetch/2
函式完成。
fetch/2
回呼會在使用 list/1
和每次更新時觸發,並作為一種機制,可以在將資訊廣播給所有頻道訂閱者之前單次擷取身分資訊。這可避免 N 個查詢問題,並提供單一處可將孤立資料擷取分組以延伸身分資料的位置。
此函式必須回傳符合概述的身分資料結構的資料映射表,包含 :metas
金鑰,但可延伸資訊映射表以包含任何其他資訊。例如
def fetch(_topic, presences) do
users = presences |> Map.keys() |> Accounts.get_users_map()
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[String.to_integer(key)]}}
end
end
Account.get_users_map/1
可實作如下
def get_users_map(ids) do
query =
from u in User,
where: u.id in ^ids,
select: {u.id, u}
query |> Repo.all() |> Enum.into(%{})
end
上述 fetch/2
函式會從已註冊提供給定主題的身分狀態資料庫中取得所有使用者。接著會將身分狀態資訊延伸到使用者的資訊身分狀態的 :user
金鑰,同時仍保留原始身分狀態資料中的必備 :metas
欄位。
將 Elixir 作為身分狀態用戶端使用
身分狀態非常適合外部用戶端使用,例如 JavaScript 應用程式,但它也能用於從 Elixir 用戶端處理作業,在伺服器發生時追蹤身分狀態變更。這可藉由實作身分狀態模組中的非必備 init/1
和 handle_metas/4
回呼來完成。例如,下列回呼會接收身分狀態資料變更,並廣播其他 Elixir 處理作業使用者加入和離開
defmodule MyApp.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
def init(_opts) do
{:ok, %{}} # user-land state
end
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
# fetch existing presence information for the joined users and broadcast the
# event to all subscribers
for {user_id, presence} <- joins do
user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
msg = {MyApp.PresenceClient, {:join, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
# fetch existing presence information for the left users and broadcast the
# event to all subscribers
for {user_id, presence} <- leaves do
metas =
case Map.fetch(presences, user_id) do
{:ok, presence_metas} -> presence_metas
:error -> []
end
user_data = %{user: presence.user, metas: metas}
msg = {MyApp.PresenceClient, {:leave, user_data}}
Phoenix.PubSub.local_broadcast(MyApp.PubSub, topic, msg)
end
{:ok, state}
end
end
handle_metas/4
回呼會接收主題、身分狀態差異、具有其身分資料的該主題的目前的狀態,以及從 init 和後續 handle_metas/4
呼叫累積的任何使用者端狀態。在我們的實作範例中,會在 diff 的 :joins
和 :leaves
中存取,並根據已知身分資料資訊填妥一個完整的狀態。然後會廣播給本機節點訂閱者關於使用者加入和離開。
身分狀態測試
每次呼叫 fetch
回呼,都會從個別處理作業中進行呼叫。由於這些處理作業會非同步執行,因此通常需要確保在每個測試結束時關閉它們。可透過使用 ExUnit 的 on_exit
鉤子加上 fetchers_pids
函式來達成
on_exit(fn ->
for pid <- MyAppWeb.Presence.fetchers_pids() do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, _, _, _}, 1000
end
end)
摘要
回呼
使用附加數據延伸在場訊息。
傳回 Socket/主題金鑰配對的在場資料的對應。
接收在場資料的變更。
初始化在場客戶端狀態。
傳回 Socket/主題的在場訊息。
追蹤頻道的處理程序為一個在場狀態。
追蹤任意處理程序為一個在場狀態。
停止追蹤頻道的處理程序。
停止追蹤一個處理程序。
更新頻道在場狀態的資料。
更新處理程序在場狀態的資料。
類型
回呼
使用附加數據延伸在場訊息。
當 list/1
用於列出指定 topic
的所有在場狀態時,此回呼會觸發一次,以在廣播給所有頻道用戶端之前,修改結果。此舉可以避免 N 次查詢問題,並提供一個擴充在場資料的單一位置。您必須傳回一組與原始結果相符的數據,包含 :metas
金鑰,但也可以延伸對應,包含任何額外資訊。
預設實作只會將 presences
直接傳遞,而不做任何變更。
範例
def fetch(_topic, presences) do
query =
from u in User,
where: u.id in ^Map.keys(presences),
select: {u.id, u}
users = query |> Repo.all() |> Enum.into(%{})
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: metas, user: users[key]}}
end
end
@callback get_by_key(Phoenix.Socket.t() | topic(), key :: String.t()) :: [presence()]
傳回 Socket/主題金鑰配對的在場資料的對應。
範例
使用與 list/1
中每個在場狀態相同的數據格式,但僅傳回主題和金鑰配對下在場狀態的資料。例如,金鑰為 "user1"
的用戶,從兩台設備連線到同一個聊天室 "room:1"
,可以傳回
iex> MyPresence.get_by_key("room:1", "user1")
[%{name: "User 1", metas: [%{device: "Desktop"}, %{device: "Mobile"}]}]
如同 list/1
,在場資料會傳遞至您的在場模組的 fetch
回呼,以取得任何附加資訊。
@callback handle_metas( topic :: String.t(), diff :: map(), presences :: map(), state :: term() ) :: {:ok, term()}
接收在場資料的變更。
初始化在場客戶端狀態。
當你的 `presence` 模組啟動時呼叫,允許動態提供處理 `presence` 資訊的初始狀態。
@callback list(socket_or_topic :: Phoenix.Socket.t() | topic()) :: presences()
傳回 Socket/主題的在場訊息。
`presence` 資料結構
`presence` 資訊會被回傳為一個 `map`,其中 `presence` 是以字串形式做為主鍵並加以群組,並累積以下形式的資訊
%{key => %{metas: [%{phx_ref: ..., ...}, ...]}}
例如,假設有一個用戶 ID 為 123
,他從兩個不同的裝置上線,以及一個用戶 ID 為 456
,他只從一個裝置上線。以下為可能回傳的 `presence` 資訊
%{"123" => %{metas: [%{status: "away", phx_ref: ...},
%{status: "online", phx_ref: ...}]},
"456" => %{metas: [%{status: "online", phx_ref: ...}]}}
`map` 的主鍵通常會指向一個資源 ID。該值會包含一個含有 :metas
鍵的 `map`,其中包含每個資源的一系列資訊。此外,每個資訊項都會包含一個 :phx_ref
鍵,可用於針對指定鍵唯一地識別資訊。如果資訊先前已更新,將會出現包含先前 :phx_ref
值的 :phx_ref_prev
鍵。
@callback track(socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map()) :: {:ok, ref :: binary()} | {:error, reason :: term()}
追蹤頻道的處理程序為一個在場狀態。
追蹤的 `presence` 會依據轉換為字串的 key
加以群組。例如,若要群組每個使用者的頻道,請使用使用者 ID 作為鍵。每個 `presence` 可以與資訊 `map` 關聯在一起,以儲存暫時的小狀態,例如使用者的上線狀態。若要儲存詳細資訊,請參閱 fetch/2
。
範例
alias MyApp.Presence
def handle_info(:after_join, socket) do
{:ok, _} = Presence.track(socket, socket.assigns.user_id, %{
online_at: inspect(System.system_time(:second))
})
{:noreply, socket}
end
@callback track(pid(), topic(), key :: String.t(), meta :: map()) :: {:ok, ref :: binary()} | {:error, reason :: term()}
追蹤任意處理程序為一個在場狀態。
與 track/3
相同,除了透過 topic
和 key
追蹤任何程序。
@callback untrack(socket :: Phoenix.Socket.t(), key :: String.t()) :: :ok
停止追蹤頻道的處理程序。
停止追蹤一個處理程序。
@callback update( socket :: Phoenix.Socket.t(), key :: String.t(), meta :: map() | (map() -> map()) ) :: {:ok, ref :: binary()} | {:error, reason :: term()}
更新頻道在場狀態的資料。
透過傳遞一個新的 `map` 或一個接收當前 `map` 並回傳一個新的 `map` 的函式,取代 `presence` 的資訊。
@callback update(pid(), topic(), key :: String.t(), meta :: map() | (map() -> map())) :: {:ok, ref :: binary()} | {:error, reason :: term()}
更新處理程序在場狀態的資料。
與 update/3
相同,但使用一個任意的程序。