檢視原始碼 Phoenix.Tracker 行為 (Phoenix.PubSub v2.1.3)
提供分散式存在追蹤處理程序。
Tracker 分片使用心跳通訊協定與 CRDT 來複寫叢集中存在資訊,使其最終保持一致且無衝突。依此設計,不存在單一的真實資訊來源或全域處理程序。每個節點都執行 Tracker 池,且節點本地的變更會在叢集中複寫,並當作變更的 diff 在本地處理。
implementing-a-tracker
實作 Tracker
若要開始 Tracker,請先將 Tracker 加入管理監督樹
children = [
# ...
{MyTracker, [name: MyTracker, pubsub_server: MyApp.PubSub]}
]
接著實作 MyTracker
並支援 Phoenix.Tracker
行為回呼。此為基本 tracker 的範例,可能會包含
defmodule MyTracker do
use Phoenix.Tracker
def start_link(opts) do
opts = Keyword.merge([name: __MODULE__], opts)
Phoenix.Tracker.start_link(__MODULE__, opts, opts)
end
def init(opts) do
server = Keyword.fetch!(opts, :pubsub_server)
{:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
end
def handle_diff(diff, state) do
for {topic, {joins, leaves}} <- diff do
for {key, meta} <- joins do
IO.puts "presence join: key \"#{key}\" with meta #{inspect meta}"
msg = {:join, key, meta}
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end
for {key, meta} <- leaves do
IO.puts "presence leave: key \"#{key}\" with meta #{inspect meta}"
msg = {:leave, key, meta}
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end
end
{:ok, state}
end
end
Tracker 必須實作 start_link/1
、init/1
和 handle_diff/2
。init/1
回呼讓 Tracker 能管理自身狀態,且需在 Phoenix.Tracker
伺服器執行內執行。handle_diff
回呼會呼叫存在參與和離開事件的 diff,並依主題進行分組。當複製品心跳和複製資料時,本機 Tracker 狀態會與遠端資料合併,且 diff 會傳送給回呼。這些處理常式可利用上述資訊來通知訂閱者事件發生。
也可選擇呼叫 handle_info/2
回呼,以在 Tracker 中處理應用程式特定訊息。
special-considerations
特別注意事項
handle_diff/2
內的運作發生於 Tracker 伺服器上下文中。因此,應盡量避免區塊運作,並於需要時卸載到管理監督任務。另外,handle_diff/2
的異常終止會造成 Tracker 伺服器異常終止,因此應將可能會造成伺服器異常終止的運作卸載至產生 Task.Supervisor
處理程序
連結至該區段 總結
函數
傳回在監督員底下啟動這個模組的規格。
取得在給定主題與金鑰配對底下追蹤的臨場感。
透過廣播 permdown 到所有複製品來從容關閉。
列出所有在給定主題底下追蹤的臨場感。
更新臨場感的元資料。
連結到這個區塊 類型
連結到這個區塊 回呼函數
連結到這個區塊 函數
傳回在監督員底下啟動這個模組的規格。
參閱 Supervisor
。
取得在給定主題與金鑰配對底下追蹤的臨場感。
server_name
- 追蹤器伺服器的註冊名稱topic
-Phoenix.PubSub
主題key
- 臨場感的金鑰
傳回臨場元資料清單。
舉例
舉例
iex> Phoenix.Tracker.get_by_key(MyTracker, "lobby", "user1")
[{#PID<0.88.0>, %{name: "User 1"}, {#PID<0.89.0>, %{name: "User 1"}]
@spec graceful_permdown(atom()) :: :ok
透過廣播 permdown 到所有複製品來從容關閉。
舉例
舉例
iex> Phoenix.Tracker.graceful_permdown(MyTracker)
:ok
列出所有在給定主題底下追蹤的臨場感。
server_name
- 追蹤器伺服器的註冊名稱topic
-Phoenix.PubSub
主題
傳回金鑰/元資料組配對中的臨場感清單。
舉例
舉例
iex> Phoenix.Tracker.list(MyTracker, "lobby")
[{123, %{name: "user 123"}}, {456, %{name: "user 456"}}]
啟動追蹤池。
tracker
- 實作Phoenix.Tracker
行為的追蹤器模組tracker_arg
- 傳遞給追蹤器處理函數init/1
的引數pool_opts
- 建構分片池時使用的選項清單
必需的-pool_opts
pool_opts
必需參數
:name
- 伺服器名稱,例如:MyApp.Tracker
,這也會形成所有分片名稱的共用字首:pubsub_server
- PubSub 伺服器名稱,例如:MyApp.PubSub
可選的-pool_opts
pool_opts
可選參數
:broadcast_period
- 在叢集間發送 delta 廣播的毫秒數間隔。預設1500
:max_silent_periods
- 沒有發送 delta 廣播的最大廣播週期整數。預設10
(15 秒心跳):down_period
- 將複寫旗標為暫時關閉的毫秒數間隔。預設broadcast_period * max_silent_periods * 2
(30 秒關閉偵測)。請注意:這必須至少是broadcast_period
的 2 倍。:permdown_period
- 將複寫旗標為永久關閉,並捨棄其狀態的毫秒數間隔。請注意:這必須至少大於down_period
。預設1_200_000
(20 分鐘):clock_sample_periods
- 在合併和要求轉移之前採樣遠端時鐘的心跳視窗數量。預設2
:max_delta_sizes
- 在退回傳送整個狀態之前要保留的 delta 產生大小清單。預設[100, 1000, 10_000]
。:log_level
- 記錄事件的紀錄層級,預設為:debug
,並可以使用false
停用:pool_size
- 要啟動的追蹤器分片的數量。預設1
@spec track(atom(), pid(), topic(), term(), map()) :: {:ok, ref :: binary()} | {:error, reason :: term()}
追蹤臨場感。
server_name
- 追蹤器伺服器的註冊名稱pid
- 要追蹤的 Pidtopic
- 這個臨場事件的Phoenix.PubSub
主題key
- 識別這個臨場事件的金鑰meta
- 要附加到這個臨場事件的元資料映射
只要對特定行程的任何先前呼叫來說,主題和金鑰配對是唯一的,就可以多次追蹤一個行程。
範例
範例
iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:ok, "1WpAofWYIAA="}
iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}}
取消追蹤臨場感。
server_name
- 追蹤器伺服器的註冊名稱pid
- 要解除追蹤的 Pidtopic
- 要為這個臨場事件解除追蹤的Phoenix.PubSub
主題key
- 識別這個臨場事件的金鑰
可以在給定 Pid 的情況下,使用此函式的 Phoenix.Tracker.untrack/2
簽章解除追蹤所有臨場事件。
範例
範例
iex> Phoenix.Tracker.untrack(MyTracker, self(), "lobby", u.id)
:ok
iex> Phoenix.Tracker.untrack(MyTracker, self())
:ok
@spec update(atom(), pid(), topic(), term(), map() | (map() -> map())) :: {:ok, ref :: binary()} | {:error, reason :: term()}
更新臨場感的元資料。
server_name
- 追蹤器伺服器的註冊名稱pid
- 正在追蹤的 Pidtopic
- 要針對此臨場狀況更新的Phoenix.PubSub
主題key
- 識別這個臨場事件的金鑰meta
- 附加到此臨場狀況的新元資料映射或函數。此函數會接收目前的元資料作為輸入,而回傳值會作為新的元資料
範例
範例
iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, %{stat: "zzz"})
{:ok, "1WpAofWYIAA="}
iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, fn meta -> Map.put(meta, :away, true) end)
{:ok, "1WpAofWYIAA="}