檢視原始碼 Phoenix.Channel 行為 (Phoenix v1.7.14)
定義 Phoenix Channel。
Channels 提供雙向通訊的管道給客戶端使用,它與 Phoenix.PubSub
這個模組整合在一起,可執行軟性的即時功能。
概念概要請看 Channels 指南。
主題及回呼
每次加入一個 Channel 時,您需要自訂您要收聽的特定主題。主題只是一個識別碼,但根據慣例,它經常由兩個部份組成: "topic:subtopic"
。使用 "topic:subtopic"
的方法與 Phoenix.Socket.channel/3
搭配非常好,因為這允許您與所有以某個前輟字開頭的主題匹配,而且您可以在主題模式中將星號 ( *
字元) 當作最後一個字元才能進行匹配
channel "room:*", MyAppWeb.RoomChannel
在上例中,任何以 "room:"
前輟字輸入路由的主題都會分配給 MyAppWeb.RoomChannel
。主題也可以在您的 Channel 的 join/3
回呼中使用模式匹配,以找出特定的模式
# handles the special `"lobby"` subtopic
def join("room:lobby", _payload, socket) do
{:ok, socket}
end
# handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
def join("room:" <> room_id, _payload, socket) do
{:ok, socket}
end
授權
客戶端必須加入某個 Channel 才能在該 Channel 上傳送及接收 PubSub 事件。您的 Channel 必須實作 join/3
回呼,才能對應於特定主題授權 Socket。例如,您可以檢查使用者是否有權利加入某個特定房間。
要授權 join/3
中的 Socket,請返回 {:ok, socket}
。若要拒絕在 join/3
中授權,請返回 {:error, reply}
。
輸入事件
客戶端成功加入 Channel 之後,透過 Channel 的 handle_in/3
回呼,路由輸入事件從客戶端而來。在這些回呼中,您可以執行任何動作。輸入回呼必須返回 socket
來維護短暫狀態。
通常,您會使用 broadcast!/3
將訊息轉發給所有收聽者,或直接回應某個客戶端事件,以進行請求/回應模式訊息傳送。
一般訊息資訊會以地圖集接收到
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
...
{:reply, :ok, socket}
end
二進制資料酬載傳遞成 {:binary, data}
元組
def handle_in("file_chunk", {:binary, chunk}, socket) do
...
{:reply, :ok, socket}
end
廣播
以下是從一個用戶端接收 "new_msg"
事件範例,並將訊息廣播給此插座所有主題訂閱者。
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
broadcast!(socket, "new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
回覆
回覆有助於確認用戶端訊息或回應操作結果。回覆僅傳送給連線到目前頻道程序的用戶端。幕後包含用戶端訊息 ref
,讓用戶端可以將收到的回覆與傳送的訊息作關聯。
例如,想像建立資源並用建立的記錄回覆
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
post = Repo.insert!(changeset)
response = MyAppWeb.PostView.render("show.json", %{post: post})
{:reply, {:ok, response}, socket}
else
response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
{:reply, {:error, response}, socket}
end
end
或者,你可能只想要確認操作成功
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
Repo.insert!(changeset)
{:reply, :ok, socket}
else
{:reply, :error, socket}
end
end
透過 {:binary, data}
元組,回覆也支援二進制資料
{:reply, {:ok, {:binary, bin}}, socket}
如果你不想要傳送回覆給用戶端,你可以回傳
{:noreply, socket}
你可以這麼做的情況之一是,如果你需要稍後再回覆;請參閱 reply/2
。
推送
呼叫 push/3
允許你傳送訊息給用戶端,這並非特定用戶端訊息的回覆。由於非回覆,所以推播訊息不包含用戶端訊息 ref
;沒有先前用戶端訊息可以與之關聯。
可能的用例包括通知用戶端
- 已自動儲存使用者的文件
- 使用者的遊戲即將結束
- 應該更新 IoT 設備的設定
例如,你可以在收到 PubSub
與其相關訊息後,在 handle_info/3
中 push/3
訊息給用戶端。
alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
push(socket, event, payload)
{:noreply, socket}
end
推播資料可以提供成地圖或標記 {:binary, data}
元組形式
# client asks for their current rank. reply contains it, and client
# is also pushed a leader board and a badge image
def handle_in("current_rank", _, socket) do
push(socket, "leaders", %{leaders: Game.get_leaders(socket.assigns.game_id)})
push(socket, "badge", {:binary, File.read!(socket.assigns.badge_path)})
{:reply, %{val: Game.get_rank(socket.assigns[:user])}, socket}
end
注意,在此範例中,push/3
是從 handle_in/3
呼叫;透過這種方式,你基本上可以針對來自用戶端的單一訊息回覆 N 次。請參閱 reply/2
,瞭解為何這可能是必要的。
攔截傳出事件
當使用 broadcast/3
廣播事件時,每個頻道訂閱者都可以選擇攔截事件,並讓他們的 handle_out/3
回呼觸發。這允許根據 socket 來自訂事件的訊息負載,以附加額外資訊,或有條件地篩選訊息傳送。如果事件沒有使用 Phoenix.Channel.intercept/1
攔截,則訊息將會直接推播至用戶端
intercept ["new_msg", "user_joined"]
# for every socket subscribing to this topic, append an `is_editable`
# value for client metadata.
def handle_out("new_msg", msg, socket) do
push(socket, "new_msg", Map.merge(msg,
%{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
))
{:noreply, socket}
end
# do not send broadcasted `"user_joined"` events if this socket's user
# is ignoring the user who joined.
def handle_out("user_joined", msg, socket) do
unless User.ignoring?(socket.assigns[:user], msg.user_id) do
push(socket, "user_joined", msg)
end
{:noreply, socket}
end
廣播至外部主題
在某些情況下,您將會希望廣播訊息而沒有 socket
的內容。這可能是為了從您的頻道內廣播至外部主題,或從您的應用程式其他位置(例如控制器或其他流程)廣播。這些都可以透過您的端點來執行
# within channel
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
...
broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
"new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
# within controller
def create(conn, params) do
...
MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
redirect(conn, to: "/")
end
終止
在終止時,頻道回呼 terminate/2
將使用錯誤原因和 socket 來呼叫。
如果我們終止是因為用戶端離開,原因將會是 {:shutdown, :left}
。類似地,如果我們終止是因為用戶端連線已關閉,原因將會是 {:shutdown, :closed}
。
如果任一回呼傳回一個 :stop
tuple,它也會呼叫原因列在 tuple 中的終止。
terminate/2
然而,在錯誤情況或退出情況下不會呼叫。這與您在像 GenServer
等 Elixir 抽象中找到的行為相同。與 GenServer
相似,您也可以做 :trap_exit
來確保呼叫 terminate/2
。不過不鼓勵這種作法。
一般來說,如果您想清除某些東西,最好監控您的頻道流程,並從另一個流程進行清除。所有頻道回呼,包括 join/3
,都是從頻道流程內呼叫。因此,self()
在任何這些回呼中傳回您要監控的 PID。
停止頻道時的退出原因
當頻道回呼傳回一個 :stop
tuple 時,例如
{:stop, :shutdown, socket}
{:stop, {:error, :enoent}, socket}
第二個引數是退出原因,它遵循與標準 GenServer
退出相同的行為。
關閉頻道時,您有三個選項可供選擇
:normal
- 在這種情況下,退出不會被記錄,且連結的流程不會退出:shutdown
或{:shutdown, 終結}
- 在這種情況,結束時不會被記錄且連接程序以相同原因結束,除非它們正在捕捉結束任何其他終結 - 在這種情況,結束時會被記錄且連接程序以相同原因結束,除非它們正在捕捉結束
訂閱外部主題
有時你可能需要對套接字透過程式訂閱外部主題,除了內部 套接字。主題
。例如,想像你有一個競標系統,其中遠端客戶端動態設定偏好,關於他們想要收到競標通知的商品。不用為每個偏好需要一個獨特管道程式的管道路徑,取而代之的方法是透過端點為一個管道訂閱相關通知,會更有效率且簡單。例如
defmodule MyAppWeb.Endpoint.NotificationChannel do
use Phoenix.Channel
def join("notification:" <> user_id, %{"ids" => ids}, socket) do
topics = for product_id <- ids, do: "product:#{product_id}"
{:ok, socket
|> assign(:topics, [])
|> put_new_topics(topics)}
end
def handle_in("watch", %{"product_id" => id}, socket) do
{:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
end
def handle_in("unwatch", %{"product_id" => id}, socket) do
{:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
end
defp put_new_topics(socket, topics) do
Enum.reduce(topics, socket, fn topic, acc ->
topics = acc.assigns.topics
if topic in topics do
acc
else
:ok = MyAppWeb.Endpoint.subscribe(topic)
assign(acc, :topics, [topic | topics])
end
end)
end
end
注意:呼叫者必須負責防止重複訂閱。從你的端點呼叫 訂閱/1
之後,相同的流程套用於在你的管道處理常規 Elixir 訊息。大部分情況下,你會只傳送 %Phoenix.Socket.Broadcast{}
事件和載荷
alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
push(socket, event, payload)
{:noreply, socket}
end
休眠
從 Erlang/OTP 20 後,管道會自動休眠以節省記憶體,在 15_000 毫秒的非活動之後。這可以透過傳遞 :hibernate_after
選項至 使用 Phoenix.Channel
來自訂
use Phoenix.Channel, hibernate_after: 60_000
你也可以設定為 :infinity
來完全停用它
關閉
你可以設定套用於應用程式關閉時,每個管道關閉行為,方法是在使用時設定 :shutdown
值
use Phoenix.Channel, shutdown: 5_000
它預設為 5_000。支援的值在 Supervisor
模組文件中有說明。
記錄
預設情況下,管道 "加入"
和 "處理_中"
事件會被記錄,分別使用程度 :資訊
和 :除錯
。你可以變更用於每個事件的程度,或停用記錄,透過事件類型,在使用 Phoenix.Channel
時設定 :log_join
和 :log_handle_in
選項。例如,以下組態記錄加入事件為 :資訊
,但停用對於傳入事件的記錄
use Phoenix.Channel, log_join: :info, log_handle_in: false
請注意,變更事件類型的程度不會影響所記錄的內容,除非你將其設為 假
,它會影響關聯程度。
摘要
回呼
處理常規 GenServer 呼叫訊息。
處理常規 GenServer 呼叫訊息。
處理傳入的 event
訊息。
處理常規 Elixir 程序訊息。
攔截傳出的 event
訊息。
以 topic
處理頻道加入的動作。
在頻道程序即將結束時呼叫。
功能
將事件廣播給 socket 主題的所有訂閱者。
與 broadcast/3
相同,但廣播失敗時會引發錯誤。
將事件從 pid 廣播給 socket 主題的所有訂閱者。
與 broadcast_from/3
相同,但廣播失敗時會引發錯誤。
定義要攔截哪些頻道事件,以進行 handle_out/3
回呼。
直接將事件傳送給已連線的用戶端,而不需要用戶端先傳送訊息。
非同步回復至 socket 推播。
為非同步回覆產生 socket_ref
。
類型
回呼
@callback code_change(old_vsn, Phoenix.Socket.t(), extra :: term()) :: {:ok, Phoenix.Socket.t()} | {:error, reason :: term()} when old_vsn: term() | {:down, term()}
@callback handle_call( msg :: term(), from :: {pid(), tag :: term()}, socket :: Phoenix.Socket.t() ) :: {:reply, response :: term(), Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
處理常規 GenServer 呼叫訊息。
@callback handle_cast(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
處理常規 GenServer 呼叫訊息。
@callback handle_in( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:reply, reply(), Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()} | {:stop, reason :: term(), reply(), Phoenix.Socket.t()}
處理傳入的 event
訊息。
在傳送前使用設定的序列化器對負載進行序列化。
範例
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
@callback handle_info(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
處理常規 Elixir 程序訊息。
@callback handle_out( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:stop, reason :: term(), Phoenix.Socket.t()}
攔截傳出的 event
訊息。
請參閱 intercept/1
。
@callback join(topic :: binary(), payload :: payload(), socket :: Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | {:ok, reply :: payload(), Phoenix.Socket.t()} | {:error, reason :: map()}
以 topic
處理頻道加入的動作。
若要授權 Socket,請傳回 {:ok, socket}
或 {:ok, reply, socket}
。若要拒絕授權,請傳回 {:error, reason}
。
在傳送前使用設定的序列化器對負載進行序列化。
範例
def join("room:lobby", payload, socket) do
if authorized?(payload) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
@callback terminate( reason :: :normal | :shutdown | {:shutdown, :left | :closed | term()}, Phoenix.Socket.t() ) :: term()
在頻道程序即將結束時呼叫。
函式
將事件廣播給 socket 主題的所有訂閱者。
事件訊息必須是序列化對應或標記 {:binary, data}
tuple,其中 data
是二進位資料。
範例
iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast(socket, "new_message", {:binary, "hello"})
:ok
與 broadcast/3
相同,但廣播失敗時會引發錯誤。
將事件從 pid 廣播給 socket 主題的所有訂閱者。
擁有 Socket 的頻道不會收到發佈的訊息。事件訊息必須是序列化對應或標記 {:binary, data}
tuple,其中 data
是二進位資料。
範例
iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast_from(socket, "new_message", {:binary, "hello"})
:ok
與 broadcast_from/3
相同,但廣播失敗時會引發錯誤。
定義要攔截哪些頻道事件,以進行 handle_out/3
回呼。
預設情況下,廣播事件直接推送到用戶端,但攔截事件可讓你的頻道有機會客製化用戶端的事件,以附加額外資訊或過濾不再傳送的訊息。
注意:許多訂閱者必須客製化訊息時,攔截事件可能會帶來更多額外負擔,因為廣播將被編碼 N 次,而不是在所有訂閱者間共用單一分享編碼。
範例
intercept ["new_msg"]
def handle_out("new_msg", payload, socket) do
push(socket, "new_msg", Map.merge(payload,
is_editable: User.can_edit_message?(socket.assigns[:user], payload)
))
{:noreply, socket}
end
handle_out/3
回呼必須傳回其中一個
{:noreply, Socket.t} |
{:noreply, Socket.t, timeout | :hibernate} |
{:stop, reason :: term, Socket.t}
直接將事件傳送給已連線的用戶端,而不需要用戶端先傳送訊息。
事件訊息必須是序列化對應或標記 {:binary, data}
tuple,其中 data
是二進位資料。
請注意,與某些用戶端程式庫不同,伺服器端的 push/3
不會傳回指令。如果您需要從用戶端取得回覆,並將該回覆與您已推播的訊息相關聯,則需要在訊息中包含特定識別碼,並在 Channel 的狀態中追蹤它,讓用戶端能夠將它包含在回覆中,當回覆送至 handle_in/3
時,則檢查 ref。
範例
iex> push(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> push(socket, "new_message", {:binary, "hello"})
:ok
@spec reply(socket_ref(), reply()) :: :ok
非同步回復至 socket 推播。
回覆用戶端訊息的常見方法是從 handle_in/3
傳回元組,例如
{:reply, {status, payload}, socket}
但有時您需要非同步地回覆推播,這表示在 handle_in/3
回呼完成後。例如,您可能需要在另一個處理序中執行工作,並在完成後回覆。
您可以透過使用 socket_ref/1
產生一個對應插座的指令,然後在您準備回覆時,呼叫 reply/2
搭配該 ref。
注意: 需要一個 socket_ref
,這樣 socket
本身就不會洩漏到頻道外部。socket
含有許多資訊,例如分配和傳輸配置,因此,將這個資訊複製到擁有它的頻道之外是很重要的。
技術上而言,reply/2
允許您對同一用戶端訊息回覆多次,且每次回覆都會包含用戶端訊息 ref
。但用戶端可能只會預期一個回覆;在這種情況下,會比較適合使用 push/3
來傳送其他訊息。
在傳送前使用設定的序列化器對負載進行序列化。
範例
def handle_in("work", payload, socket) do
Worker.perform(payload, socket_ref(socket))
{:noreply, socket}
end
def handle_info({:work_complete, result, ref}, socket) do
reply(ref, {:ok, result})
{:noreply, socket}
end
@spec socket_ref(Phoenix.Socket.t()) :: socket_ref()
為非同步回覆產生 socket_ref
。
請參閱 reply/2
以取得使用範例。