檢視原始碼 Phoenix.Tracker 行為 (Phoenix.PubSub v2.1.3)

提供分散式存在追蹤處理程序。

Tracker 分片使用心跳通訊協定與 CRDT 來複寫叢集中存在資訊,使其最終保持一致且無衝突。依此設計,不存在單一的真實資訊來源或全域處理程序。每個節點都執行 Tracker 池,且節點本地的變更會在叢集中複寫,並當作變更的 diff 在本地處理。

implementing-a-tracker

實作 Tracker

若要開始 Tracker,請先將 Tracker 加入管理監督樹

children = [
  # ...
  {MyTracker, [name: MyTracker, pubsub_server: MyApp.PubSub]}
]

接著實作 MyTracker 並支援 Phoenix.Tracker 行為回呼。此為基本 tracker 的範例,可能會包含

defmodule MyTracker do
  use Phoenix.Tracker

  def start_link(opts) do
    opts = Keyword.merge([name: __MODULE__], opts)
    Phoenix.Tracker.start_link(__MODULE__, opts, opts)
  end

  def init(opts) do
    server = Keyword.fetch!(opts, :pubsub_server)
    {:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
  end

  def handle_diff(diff, state) do
    for {topic, {joins, leaves}} <- diff do
      for {key, meta} <- joins do
        IO.puts "presence join: key \"#{key}\" with meta #{inspect meta}"
        msg = {:join, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
      for {key, meta} <- leaves do
        IO.puts "presence leave: key \"#{key}\" with meta #{inspect meta}"
        msg = {:leave, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
    end
    {:ok, state}
  end
end

Tracker 必須實作 start_link/1init/1handle_diff/2init/1 回呼讓 Tracker 能管理自身狀態,且需在 Phoenix.Tracker 伺服器執行內執行。handle_diff 回呼會呼叫存在參與和離開事件的 diff,並依主題進行分組。當複製品心跳和複製資料時,本機 Tracker 狀態會與遠端資料合併,且 diff 會傳送給回呼。這些處理常式可利用上述資訊來通知訂閱者事件發生。

也可選擇呼叫 handle_info/2 回呼,以在 Tracker 中處理應用程式特定訊息。

special-considerations

特別注意事項

handle_diff/2 內的運作發生於 Tracker 伺服器上下文中。因此,應盡量避免區塊運作,並於需要時卸載到管理監督任務。另外,handle_diff/2 的異常終止會造成 Tracker 伺服器異常終止,因此應將可能會造成伺服器異常終止的運作卸載至產生 Task.Supervisor 處理程序

連結至該區段 總結

函數

傳回在監督員底下啟動這個模組的規格。

取得在給定主題與金鑰配對底下追蹤的臨場感。

透過廣播 permdown 到所有複製品來從容關閉。

列出所有在給定主題底下追蹤的臨場感。

取消追蹤臨場感。

更新臨場感的元資料。

連結到這個區塊 類型

@type presence() :: {key :: String.t(), meta :: map()}
@type topic() :: String.t()

連結到這個區塊 回呼函數

@callback handle_diff(
  %{required(topic()) => {joins :: [presence()], leaves :: [presence()]}},
  state :: term()
) :: {:ok, state :: term()}
連結到這個回呼函數

handle_info(message, state)

檢視原始程式碼 (可選)
@callback handle_info(message :: term(), state :: term()) :: {:noreply, state :: term()}
@callback init(Keyword.t()) :: {:ok, state :: term()} | {:error, reason :: term()}

連結到這個區塊 函數

傳回在監督員底下啟動這個模組的規格。

參閱 Supervisor

連結到這個函數

get_by_key(tracker_name, topic, key)

檢視原始程式碼
@spec get_by_key(atom(), topic(), term()) :: [presence()]

取得在給定主題與金鑰配對底下追蹤的臨場感。

  • server_name - 追蹤器伺服器的註冊名稱
  • topic - Phoenix.PubSub 主題
  • key - 臨場感的金鑰

傳回臨場元資料清單。

舉例

舉例

iex> Phoenix.Tracker.get_by_key(MyTracker, "lobby", "user1")
[{#PID<0.88.0>, %{name: "User 1"}, {#PID<0.89.0>, %{name: "User 1"}]
連結到這個函數

graceful_permdown(tracker_name)

檢視原始程式碼
@spec graceful_permdown(atom()) :: :ok

透過廣播 permdown 到所有複製品來從容關閉。

舉例

舉例

iex> Phoenix.Tracker.graceful_permdown(MyTracker)
:ok
@spec list(atom(), topic()) :: [presence()]

列出所有在給定主題底下追蹤的臨場感。

  • server_name - 追蹤器伺服器的註冊名稱
  • topic - Phoenix.PubSub 主題

傳回金鑰/元資料組配對中的臨場感清單。

舉例

舉例

iex> Phoenix.Tracker.list(MyTracker, "lobby")
[{123, %{name: "user 123"}}, {456, %{name: "user 456"}}]
連結到這個函數

start_link(tracker, tracker_arg, pool_opts)

檢視原始程式碼

啟動追蹤池。

  • tracker - 實作 Phoenix.Tracker 行為的追蹤器模組
  • tracker_arg - 傳遞給追蹤器處理函數 init/1 的引數
  • pool_opts - 建構分片池時使用的選項清單

必需的-pool_opts

pool_opts 必需參數

  • :name - 伺服器名稱,例如:MyApp.Tracker,這也會形成所有分片名稱的共用字首
  • :pubsub_server - PubSub 伺服器名稱,例如:MyApp.PubSub

可選的-pool_opts

pool_opts 可選參數

  • :broadcast_period - 在叢集間發送 delta 廣播的毫秒數間隔。預設 1500
  • :max_silent_periods - 沒有發送 delta 廣播的最大廣播週期整數。預設 10(15 秒心跳)
  • :down_period - 將複寫旗標為暫時關閉的毫秒數間隔。預設 broadcast_period * max_silent_periods * 2(30 秒關閉偵測)。請注意:這必須至少是 broadcast_period 的 2 倍。
  • :permdown_period - 將複寫旗標為永久關閉,並捨棄其狀態的毫秒數間隔。請注意:這必須至少大於 down_period。預設 1_200_000(20 分鐘)
  • :clock_sample_periods - 在合併和要求轉移之前採樣遠端時鐘的心跳視窗數量。預設 2
  • :max_delta_sizes - 在退回傳送整個狀態之前要保留的 delta 產生大小清單。預設 [100, 1000, 10_000]
  • :log_level - 記錄事件的紀錄層級,預設為 :debug,並可以使用 false 停用
  • :pool_size - 要啟動的追蹤器分片的數量。預設 1
連結到這個函數

track(tracker_name, pid, topic, key, meta)

檢視原始程式碼
@spec track(atom(), pid(), topic(), term(), map()) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

追蹤臨場感。

  • server_name - 追蹤器伺服器的註冊名稱
  • pid - 要追蹤的 Pid
  • topic - 這個臨場事件的 Phoenix.PubSub 主題
  • key - 識別這個臨場事件的金鑰
  • meta - 要附加到這個臨場事件的元資料映射

只要對特定行程的任何先前呼叫來說,主題和金鑰配對是唯一的,就可以多次追蹤一個行程。

範例

範例

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}}
連結到這個函數

untrack(tracker_name, pid, topic, key)

檢視原始程式碼
@spec untrack(atom(), pid(), topic(), term()) :: :ok

取消追蹤臨場感。

  • server_name - 追蹤器伺服器的註冊名稱
  • pid - 要解除追蹤的 Pid
  • topic - 要為這個臨場事件解除追蹤的 Phoenix.PubSub 主題
  • key - 識別這個臨場事件的金鑰

可以在給定 Pid 的情況下,使用此函式的 Phoenix.Tracker.untrack/2 簽章解除追蹤所有臨場事件。

範例

範例

iex> Phoenix.Tracker.untrack(MyTracker, self(), "lobby", u.id)
:ok
iex> Phoenix.Tracker.untrack(MyTracker, self())
:ok
連結到這個函數

update(tracker_name, pid, topic, key, meta)

檢視原始程式碼
@spec update(atom(), pid(), topic(), term(), map() | (map() -> map())) ::
  {:ok, ref :: binary()} | {:error, reason :: term()}

更新臨場感的元資料。

  • server_name - 追蹤器伺服器的註冊名稱
  • pid - 正在追蹤的 Pid
  • topic - 要針對此臨場狀況更新的 Phoenix.PubSub 主題
  • key - 識別這個臨場事件的金鑰
  • meta - 附加到此臨場狀況的新元資料映射或函數。此函數會接收目前的元資料作為輸入,而回傳值會作為新的元資料

範例

範例

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, %{stat: "zzz"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, fn meta -> Map.put(meta, :away, true) end)
{:ok, "1WpAofWYIAA="}