檢視原始碼 Phoenix.Channel 行為 (Phoenix v1.7.14)

定義 Phoenix Channel。

Channels 提供雙向通訊的管道給客戶端使用,它與 Phoenix.PubSub 這個模組整合在一起,可執行軟性的即時功能。

概念概要請看 Channels 指南。

主題及回呼

每次加入一個 Channel 時,您需要自訂您要收聽的特定主題。主題只是一個識別碼,但根據慣例,它經常由兩個部份組成: "topic:subtopic"。使用 "topic:subtopic" 的方法與 Phoenix.Socket.channel/3 搭配非常好,因為這允許您與所有以某個前輟字開頭的主題匹配,而且您可以在主題模式中將星號 ( * 字元) 當作最後一個字元才能進行匹配

channel "room:*", MyAppWeb.RoomChannel

在上例中,任何以 "room:" 前輟字輸入路由的主題都會分配給 MyAppWeb.RoomChannel。主題也可以在您的 Channel 的 join/3 回呼中使用模式匹配,以找出特定的模式

# handles the special `"lobby"` subtopic
def join("room:lobby", _payload, socket) do
  {:ok, socket}
end

# handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
def join("room:" <> room_id, _payload, socket) do
  {:ok, socket}
end

授權

客戶端必須加入某個 Channel 才能在該 Channel 上傳送及接收 PubSub 事件。您的 Channel 必須實作 join/3 回呼,才能對應於特定主題授權 Socket。例如,您可以檢查使用者是否有權利加入某個特定房間。

要授權 join/3 中的 Socket,請返回 {:ok, socket}。若要拒絕在 join/3 中授權,請返回 {:error, reply}

輸入事件

客戶端成功加入 Channel 之後,透過 Channel 的 handle_in/3 回呼,路由輸入事件從客戶端而來。在這些回呼中,您可以執行任何動作。輸入回呼必須返回 socket 來維護短暫狀態。

通常,您會使用 broadcast!/3 將訊息轉發給所有收聽者,或直接回應某個客戶端事件,以進行請求/回應模式訊息傳送。

一般訊息資訊會以地圖集接收到

def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  ...
  {:reply, :ok, socket}
end

二進制資料酬載傳遞成 {:binary, data} 元組

def handle_in("file_chunk", {:binary, chunk}, socket) do
  ...
  {:reply, :ok, socket}
end

廣播

以下是從一個用戶端接收 "new_msg" 事件範例,並將訊息廣播給此插座所有主題訂閱者。

def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  broadcast!(socket, "new_msg", %{uid: uid, body: body})
  {:noreply, socket}
end

回覆

回覆有助於確認用戶端訊息或回應操作結果。回覆僅傳送給連線到目前頻道程序的用戶端。幕後包含用戶端訊息 ref,讓用戶端可以將收到的回覆與傳送的訊息作關聯。

例如,想像建立資源並用建立的記錄回覆

def handle_in("create:post", attrs, socket) do
  changeset = Post.changeset(%Post{}, attrs)

  if changeset.valid? do
    post = Repo.insert!(changeset)
    response = MyAppWeb.PostView.render("show.json", %{post: post})
    {:reply, {:ok, response}, socket}
  else
    response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
    {:reply, {:error, response}, socket}
  end
end

或者,你可能只想要確認操作成功

def handle_in("create:post", attrs, socket) do
  changeset = Post.changeset(%Post{}, attrs)

  if changeset.valid? do
    Repo.insert!(changeset)
    {:reply, :ok, socket}
  else
    {:reply, :error, socket}
  end
end

透過 {:binary, data} 元組,回覆也支援二進制資料

{:reply, {:ok, {:binary, bin}}, socket}

如果你不想要傳送回覆給用戶端,你可以回傳

{:noreply, socket}

你可以這麼做的情況之一是,如果你需要稍後再回覆;請參閱 reply/2

推送

呼叫 push/3 允許你傳送訊息給用戶端,這並非特定用戶端訊息的回覆。由於非回覆,所以推播訊息不包含用戶端訊息 ref;沒有先前用戶端訊息可以與之關聯。

可能的用例包括通知用戶端

  • 已自動儲存使用者的文件
  • 使用者的遊戲即將結束
  • 應該更新 IoT 設備的設定

例如,你可以在收到 PubSub 與其相關訊息後,在 handle_info/3push/3 訊息給用戶端。

alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
  push(socket, event, payload)
  {:noreply, socket}
end

推播資料可以提供成地圖或標記 {:binary, data} 元組形式

# client asks for their current rank. reply contains it, and client
# is also pushed a leader board and a badge image
def handle_in("current_rank", _, socket) do
  push(socket, "leaders", %{leaders: Game.get_leaders(socket.assigns.game_id)})
  push(socket, "badge", {:binary, File.read!(socket.assigns.badge_path)})
  {:reply, %{val: Game.get_rank(socket.assigns[:user])}, socket}
end

注意,在此範例中,push/3 是從 handle_in/3 呼叫;透過這種方式,你基本上可以針對來自用戶端的單一訊息回覆 N 次。請參閱 reply/2,瞭解為何這可能是必要的。

攔截傳出事件

當使用 broadcast/3 廣播事件時,每個頻道訂閱者都可以選擇攔截事件,並讓他們的 handle_out/3 回呼觸發。這允許根據 socket 來自訂事件的訊息負載,以附加額外資訊,或有條件地篩選訊息傳送。如果事件沒有使用 Phoenix.Channel.intercept/1 攔截,則訊息將會直接推播至用戶端

intercept ["new_msg", "user_joined"]

# for every socket subscribing to this topic, append an `is_editable`
# value for client metadata.
def handle_out("new_msg", msg, socket) do
  push(socket, "new_msg", Map.merge(msg,
    %{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
  ))
  {:noreply, socket}
end

# do not send broadcasted `"user_joined"` events if this socket's user
# is ignoring the user who joined.
def handle_out("user_joined", msg, socket) do
  unless User.ignoring?(socket.assigns[:user], msg.user_id) do
    push(socket, "user_joined", msg)
  end
  {:noreply, socket}
end

廣播至外部主題

在某些情況下,您將會希望廣播訊息而沒有 socket 的內容。這可能是為了從您的頻道內廣播至外部主題,或從您的應用程式其他位置(例如控制器或其他流程)廣播。這些都可以透過您的端點來執行

# within channel
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
  ...
  broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
  MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
    "new_msg", %{uid: uid, body: body})
  {:noreply, socket}
end

# within controller
def create(conn, params) do
  ...
  MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
  MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
  redirect(conn, to: "/")
end

終止

在終止時,頻道回呼 terminate/2 將使用錯誤原因和 socket 來呼叫。

如果我們終止是因為用戶端離開,原因將會是 {:shutdown, :left}。類似地,如果我們終止是因為用戶端連線已關閉,原因將會是 {:shutdown, :closed}

如果任一回呼傳回一個 :stop tuple,它也會呼叫原因列在 tuple 中的終止。

terminate/2 然而,在錯誤情況或退出情況下不會呼叫。這與您在像 GenServer 等 Elixir 抽象中找到的行為相同。與 GenServer 相似,您也可以做 :trap_exit 來確保呼叫 terminate/2。不過不鼓勵這種作法。

一般來說,如果您想清除某些東西,最好監控您的頻道流程,並從另一個流程進行清除。所有頻道回呼,包括 join/3,都是從頻道流程內呼叫。因此,self() 在任何這些回呼中傳回您要監控的 PID。

停止頻道時的退出原因

當頻道回呼傳回一個 :stop tuple 時,例如

{:stop, :shutdown, socket}
{:stop, {:error, :enoent}, socket}

第二個引數是退出原因,它遵循與標準 GenServer 退出相同的行為。

關閉頻道時,您有三個選項可供選擇

  • :normal - 在這種情況下,退出不會被記錄,且連結的流程不會退出

  • :shutdown{:shutdown, 終結} - 在這種情況,結束時不會被記錄且連接程序以相同原因結束,除非它們正在捕捉結束

  • 任何其他終結 - 在這種情況,結束時會被記錄且連接程序以相同原因結束,除非它們正在捕捉結束

訂閱外部主題

有時你可能需要對套接字透過程式訂閱外部主題,除了內部 套接字。主題。例如,想像你有一個競標系統,其中遠端客戶端動態設定偏好,關於他們想要收到競標通知的商品。不用為每個偏好需要一個獨特管道程式的管道路徑,取而代之的方法是透過端點為一個管道訂閱相關通知,會更有效率且簡單。例如

defmodule MyAppWeb.Endpoint.NotificationChannel do
  use Phoenix.Channel

  def join("notification:" <> user_id, %{"ids" => ids}, socket) do
    topics = for product_id <- ids, do: "product:#{product_id}"

    {:ok, socket
          |> assign(:topics, [])
          |> put_new_topics(topics)}
  end

  def handle_in("watch", %{"product_id" => id}, socket) do
    {:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
  end

  def handle_in("unwatch", %{"product_id" => id}, socket) do
    {:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
  end

  defp put_new_topics(socket, topics) do
    Enum.reduce(topics, socket, fn topic, acc ->
      topics = acc.assigns.topics
      if topic in topics do
        acc
      else
        :ok = MyAppWeb.Endpoint.subscribe(topic)
        assign(acc, :topics, [topic | topics])
      end
    end)
  end
end

注意:呼叫者必須負責防止重複訂閱。從你的端點呼叫 訂閱/1 之後,相同的流程套用於在你的管道處理常規 Elixir 訊息。大部分情況下,你會只傳送 %Phoenix.Socket.Broadcast{} 事件和載荷

alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
  push(socket, event, payload)
  {:noreply, socket}
end

休眠

從 Erlang/OTP 20 後,管道會自動休眠以節省記憶體,在 15_000 毫秒的非活動之後。這可以透過傳遞 :hibernate_after 選項至 使用 Phoenix.Channel 來自訂

use Phoenix.Channel, hibernate_after: 60_000

你也可以設定為 :infinity 來完全停用它

關閉

你可以設定套用於應用程式關閉時,每個管道關閉行為,方法是在使用時設定 :shutdown

use Phoenix.Channel, shutdown: 5_000

它預設為 5_000。支援的值在 Supervisor 模組文件中有說明。

記錄

預設情況下,管道 "加入""處理_中" 事件會被記錄,分別使用程度 :資訊:除錯。你可以變更用於每個事件的程度,或停用記錄,透過事件類型,在使用 Phoenix.Channel 時設定 :log_join:log_handle_in 選項。例如,以下組態記錄加入事件為 :資訊,但停用對於傳入事件的記錄

use Phoenix.Channel, log_join: :info, log_handle_in: false

請注意,變更事件類型的程度不會影響所記錄的內容,除非你將其設為 ,它會影響關聯程度。

摘要

回呼

處理常規 GenServer 呼叫訊息。

處理常規 GenServer 呼叫訊息。

處理傳入的 event 訊息。

處理常規 Elixir 程序訊息。

攔截傳出的 event 訊息。

topic 處理頻道加入的動作。

在頻道程序即將結束時呼叫。

功能

將事件廣播給 socket 主題的所有訂閱者。

broadcast/3 相同,但廣播失敗時會引發錯誤。

將事件從 pid 廣播給 socket 主題的所有訂閱者。

broadcast_from/3 相同,但廣播失敗時會引發錯誤。

定義要攔截哪些頻道事件,以進行 handle_out/3 回呼。

直接將事件傳送給已連線的用戶端,而不需要用戶端先傳送訊息。

非同步回復至 socket 推播。

為非同步回覆產生 socket_ref

類型

@type payload() :: map() | term() | {:binary, binary()}
@type reply() :: status :: atom() | {status :: atom(), response :: payload()}
@type socket_ref() ::
  {transport_pid :: Pid, serializer :: module(), topic :: binary(),
   ref :: binary(), join_ref :: binary()}

回呼

連結此回呼

code_change(old_vsn, t, extra)

查看來源 (選用)
@callback code_change(old_vsn, Phoenix.Socket.t(), extra :: term()) ::
  {:ok, Phoenix.Socket.t()} | {:error, reason :: term()}
when old_vsn: term() | {:down, term()}
連結此回呼

handle_call(msg, from, socket)

查看來源 (選用)
@callback handle_call(
  msg :: term(),
  from :: {pid(), tag :: term()},
  socket :: Phoenix.Socket.t()
) ::
  {:reply, response :: term(), Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t()}
  | {:stop, reason :: term(), Phoenix.Socket.t()}

處理常規 GenServer 呼叫訊息。

請參閱 GenServer.handle_call/3

連結此回呼

handle_cast(msg, socket)

查看來源 (選用)
@callback handle_cast(msg :: term(), socket :: Phoenix.Socket.t()) ::
  {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

處理常規 GenServer 呼叫訊息。

請參閱 GenServer.handle_cast/2

連結此回呼

handle_in(event, payload, socket)

查看來源 (選用)
@callback handle_in(
  event :: String.t(),
  payload :: payload(),
  socket :: Phoenix.Socket.t()
) ::
  {:noreply, Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
  | {:reply, reply(), Phoenix.Socket.t()}
  | {:stop, reason :: term(), Phoenix.Socket.t()}
  | {:stop, reason :: term(), reply(), Phoenix.Socket.t()}

處理傳入的 event 訊息。

在傳送前使用設定的序列化器對負載進行序列化。

範例

def handle_in("ping", payload, socket) do
  {:reply, {:ok, payload}, socket}
end
連結此回呼

handle_info(msg, socket)

檢視原始碼 (optional)
@callback handle_info(msg :: term(), socket :: Phoenix.Socket.t()) ::
  {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

處理常規 Elixir 程序訊息。

請參閱 GenServer.handle_info/2

連結此回呼

handle_out(event, payload, socket)

檢視原始碼 (optional)
@callback handle_out(
  event :: String.t(),
  payload :: payload(),
  socket :: Phoenix.Socket.t()
) ::
  {:noreply, Phoenix.Socket.t()}
  | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
  | {:stop, reason :: term(), Phoenix.Socket.t()}

攔截傳出的 event 訊息。

請參閱 intercept/1

連結此回呼

join(topic, payload, socket)

查看來源
@callback join(topic :: binary(), payload :: payload(), socket :: Phoenix.Socket.t()) ::
  {:ok, Phoenix.Socket.t()}
  | {:ok, reply :: payload(), Phoenix.Socket.t()}
  | {:error, reason :: map()}

topic 處理頻道加入的動作。

若要授權 Socket,請傳回 {:ok, socket}{:ok, reply, socket}。若要拒絕授權,請傳回 {:error, reason}

在傳送前使用設定的序列化器對負載進行序列化。

範例

def join("room:lobby", payload, socket) do
  if authorized?(payload) do
    {:ok, socket}
  else
    {:error, %{reason: "unauthorized"}}
  end
end
連結此回呼

terminate(reason, t)

檢視原始碼 (optional)
@callback terminate(
  reason :: :normal | :shutdown | {:shutdown, :left | :closed | term()},
  Phoenix.Socket.t()
) :: term()

在頻道程序即將結束時呼叫。

請參閱 GenServer.terminate/2

函式

連結至這個函式

broadcast(socket, event, message)

查看來源

將事件廣播給 socket 主題的所有訂閱者。

事件訊息必須是序列化對應或標記 {:binary, data} tuple,其中 data 是二進位資料。

範例

iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> broadcast(socket, "new_message", {:binary, "hello"})
:ok
連結至這個函式

broadcast!(socket, event, message)

查看來源

broadcast/3 相同,但廣播失敗時會引發錯誤。

連結至這個函式

broadcast_from(socket, event, message)

查看來源

將事件從 pid 廣播給 socket 主題的所有訂閱者。

擁有 Socket 的頻道不會收到發佈的訊息。事件訊息必須是序列化對應或標記 {:binary, data} tuple,其中 data 是二進位資料。

範例

iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> broadcast_from(socket, "new_message", {:binary, "hello"})
:ok
連結至這個函式

broadcast_from!(socket, event, message)

查看來源

broadcast_from/3 相同,但廣播失敗時會引發錯誤。

連結至這個巨集

intercept(events)

檢視原始碼 (巨集)

定義要攔截哪些頻道事件,以進行 handle_out/3 回呼。

預設情況下,廣播事件直接推送到用戶端,但攔截事件可讓你的頻道有機會客製化用戶端的事件,以附加額外資訊或過濾不再傳送的訊息。

注意:許多訂閱者必須客製化訊息時,攔截事件可能會帶來更多額外負擔,因為廣播將被編碼 N 次,而不是在所有訂閱者間共用單一分享編碼。

範例

intercept ["new_msg"]

def handle_out("new_msg", payload, socket) do
  push(socket, "new_msg", Map.merge(payload,
    is_editable: User.can_edit_message?(socket.assigns[:user], payload)
  ))
  {:noreply, socket}
end

handle_out/3 回呼必須傳回其中一個

{:noreply, Socket.t} |
{:noreply, Socket.t, timeout | :hibernate} |
{:stop, reason :: term, Socket.t}
連結至這個函式

push(socket, event, message)

查看來源

直接將事件傳送給已連線的用戶端,而不需要用戶端先傳送訊息。

事件訊息必須是序列化對應或標記 {:binary, data} tuple,其中 data 是二進位資料。

請注意,與某些用戶端程式庫不同,伺服器端的 push/3 不會傳回指令。如果您需要從用戶端取得回覆,並將該回覆與您已推播的訊息相關聯,則需要在訊息中包含特定識別碼,並在 Channel 的狀態中追蹤它,讓用戶端能夠將它包含在回覆中,當回覆送至 handle_in/3 時,則檢查 ref。

範例

iex> push(socket, "new_message", %{id: 1, content: "hello"})
:ok

iex> push(socket, "new_message", {:binary, "hello"})
:ok
連結至這個函式

reply(socket_ref, status)

查看來源
@spec reply(socket_ref(), reply()) :: :ok

非同步回復至 socket 推播。

回覆用戶端訊息的常見方法是從 handle_in/3 傳回元組,例如

{:reply, {status, payload}, socket}

但有時您需要非同步地回覆推播,這表示在 handle_in/3 回呼完成後。例如,您可能需要在另一個處理序中執行工作,並在完成後回覆。

您可以透過使用 socket_ref/1 產生一個對應插座的指令,然後在您準備回覆時,呼叫 reply/2 搭配該 ref。

注意: 需要一個 socket_ref,這樣 socket 本身就不會洩漏到頻道外部。socket 含有許多資訊,例如分配和傳輸配置,因此,將這個資訊複製到擁有它的頻道之外是很重要的。

技術上而言,reply/2 允許您對同一用戶端訊息回覆多次,且每次回覆都會包含用戶端訊息 ref。但用戶端可能只會預期一個回覆;在這種情況下,會比較適合使用 push/3 來傳送其他訊息。

在傳送前使用設定的序列化器對負載進行序列化。

範例

def handle_in("work", payload, socket) do
  Worker.perform(payload, socket_ref(socket))
  {:noreply, socket}
end

def handle_info({:work_complete, result, ref}, socket) do
  reply(ref, {:ok, result})
  {:noreply, socket}
end
@spec socket_ref(Phoenix.Socket.t()) :: socket_ref()

為非同步回覆產生 socket_ref

請參閱 reply/2 以取得使用範例。