檢視原始碼 通道

先決條件:本指南預設您已完成入門指南並已完成Phoenix 應用程式的設定。

通道是 Phoenix 中令人振奮的部分,它能支援與數百萬個已連線的客戶之間的即時通訊。

以下是幾個可能的用例:

  • 聊天室和訊息應用程式 API
  • 最新消息,例如「進球了」或「地震即將來臨」
  • 在地圖上追蹤火車、卡車或比賽參加者
  • 多人遊戲中的事件
  • 監控感測器和控制燈光
  • 通知瀏覽器網頁的 CSS 或 JavaScript 已變更(這在開發時很方便)

在概念上,通道非常簡單。

首先,客戶端使用 Websocket 等傳輸方式連線至伺服器。連線後,客戶端會加入一個或多個主題。舉例來說,要與公開聊天室互動,客戶端可能會加入名為 public_chat 的主題,而要收到 ID 為 7 的商品更新,他們可能需要加入名為 product_updates:7 的主題。

客戶端可以將訊息推播至其已加入的主題,也可以從中接收訊息。反過來說,通道伺服器會收到已連線客戶端的訊息,也能將訊息推播至客戶端。

伺服器能夠將訊息廣播至訂閱特定主題的所有客戶端。下圖說明了此情況:

                                                                  +----------------+
                                                     +--Topic X-->| Mobile Client  |
                                                     |            +----------------+
                              +-------------------+  |
+----------------+            |                   |  |            +----------------+
| Browser Client |--Topic X-->| Phoenix Server(s) |--+--Topic X-->| Desktop Client |
+----------------+            |                   |  |            +----------------+
                              +-------------------+  |
                                                     |            +----------------+
                                                     +--Topic X-->|   IoT Client   |
                                                                  +----------------+

廣播功能即使應用程式運行在多個節點/電腦上仍能正常運作。換句話說,如果兩個客戶端的 Socket 連線到不同的應用程式節點,並訂閱同一個主題 T,則這兩個客戶端都會收到廣播至 T 的訊息。這要歸功於內部的 PubSub 機制。

通道支援任何類型的客戶端:瀏覽器、原生應用程式、智慧手錶、嵌入式裝置或任何其他能連上網路的裝置。客戶端只需要合適的程式庫;請參閱下方客戶端程式庫部分。每個客戶端程式庫會使用通道所理解的其中一種「傳輸」方式進行通訊。目前為 Websockets 或長輪詢,但未來可能會加入其他傳輸方式。

與無狀態的 HTTP 連線不同,通道支援長時間連線,每個連線都由一個輕量的 BEAM 程序做後端支撐,同時進行平行處理和維護其自己的狀態。

這種架構具有良好的擴充性;Phoenix 通道 可以在單一裝置上容納數百萬個訂閱者,同時還能維持合理的延遲,每秒發送數十萬則訊息。而這種容量可以透過在叢集中新增更多節點的方式加以擴充。

各個組成部分

儘管對於客戶端使用者來說,通道很簡單易用,但路由訊息到客戶端(跨越叢集中的伺服器)牽涉到不少組成部分。讓我們來了解它們吧。

概觀

在開始溝通之前,客戶端會使用某種傳輸方式(例如,Websocket 或長輪詢)連接到某個節點(某個 Phoenix 伺服器),並使用這個網路連線加入一個或多個頻道。針對每個客戶端和每個主題,會建立一個頻道伺服器輕量級程序。每個頻道會保留 `%Phoenix.Socket{}`,並可以在其 `socket.assigns` 中維護任何需要的狀態。

在建立連線之後,來自客戶端的每則訊息都會根據其主題,路由到正確的頻道伺服器。如果頻道伺服器要求廣播某則訊息,這則訊息會被傳送到本機 PubSub,然後由後者把它傳送到連接到同一台伺服器且訂閱了這個主題的任何客戶端。

如果這則訊息在叢集中有其他節點,本機 PubSub 也會將訊息轉發到這些節點的 PubSub,而這些 PubSub 會將其傳送到自己的訂閱者。由於每增加一個節點只會傳送一則訊息,因此新增節點的效能成本可以忽略不計,而且每個新節點都能支援更多訂閱者。

訊息流的流程類似於下圖:

                                 Channel   +-------------------------+      +--------+
                                  route    | Sending Client, Topic 1 |      | Local  |
                              +----------->|     Channel.Server      |----->| PubSub |--+
+----------------+            |            +-------------------------+      +--------+  |
| Sending Client |-Transport--+                                                  |      |
+----------------+                         +-------------------------+           |      |
                                           | Sending Client, Topic 2 |           |      |
                                           |     Channel.Server      |           |      |
                                           +-------------------------+           |      |
                                                                                 |      |
                                           +-------------------------+           |      |
+----------------+                         | Browser Client, Topic 1 |           |      |
| Browser Client |<-------Transport--------|     Channel.Server      |<----------+      |
+----------------+                         +-------------------------+                  |
                                                                                        |
                                                                                        |
                                                                                        |
                                           +-------------------------+                  |
+----------------+                         |  Phone Client, Topic 1  |                  |
|  Phone Client  |<-------Transport--------|     Channel.Server      |<-+               |
+----------------+                         +-------------------------+  |   +--------+  |
                                                                        |   | Remote |  |
                                           +-------------------------+  +---| PubSub |<-+
+----------------+                         |  Watch Client, Topic 1  |  |   +--------+  |
|  Watch Client  |<-------Transport--------|     Channel.Server      |<-+               |
+----------------+                         +-------------------------+                  |
                                                                                        |
                                                                                        |
                                           +-------------------------+      +--------+  |
+----------------+                         |   IoT Client, Topic 1   |      | Remote |  |
|   IoT Client   |<-------Transport--------|     Channel.Server      |<-----| PubSub |<-+
+----------------+                         +-------------------------+      +--------+

終端點

在你的 Phoenix 應用程式的 `Endpoint` 模組中,`socket` 聲明指定哪個 socket 處理器會在給定的網址上接收連線。

socket "/socket", HelloWeb.UserSocket,
  websocket: true,
  longpoll: false

Phoenix 提供兩種預設的傳輸方式:websocket 和 longpoll。你可以透過 `socket` 聲明直接設定它們。

Socket 處理器

在客戶端,你將建立一個 socket 連線到上述路由。

let socket = new Socket("/socket", {params: {token: window.userToken}})

在伺服器上,Phoenix 會呼叫 `HelloWeb.UserSocket.connect/2`,傳遞你的參數和最初的 socket 狀態。在 socket 中,你可以驗證和辨識 socket 連線,並設定 socket 預設值。socket 同時也是你定義頻道路由的地方。

頻道路由

頻道路由會與主題字串相符,並將相符要求傳送到指定的頻道模組。

星號字元 * 用作萬用字元匹配器,因此在以下範例路徑中,會將對於 room:lobbyroom:123 的要求都派發到 RoomChannel。在 UserSocket 中,將會有

channel "room:*", HelloWeb.RoomChannel

頻道

頻道會處理來自客戶端的事件,因此類似於控制器,但有兩個關鍵差異。頻道事件可以朝兩個方向進行-傳入和傳出。頻道連線也會持續存在,不僅限於單一的要求/回應週期。頻道是 Phoenix 中用於即時通訊元件的最高層次抽象。

每個頻道將實作這些四個呼叫回函式的其中一個或多個子句 - join/3terminate/2handle_in/3handle_out/3

主題

主題是字串識別碼 - 各種層級所使用的名稱,用來確保訊息會傳送到正確的位置。如上所述,主題可以使用萬用字元。這允許使用一個有用的 "topic:subtopic" 慣例。通常,會使用應用程式層級中的記錄識別碼組成主題,例如 "users:123"

訊息

Phoenix.Socket.Message 模組定義一個包含以下關鍵字的結構,表示一則有效訊息。來自於 Phoenix.Socket.Message 文件

  • topic - 字串主題或 "topic:subtopic" 對的名稱空間,例如 "messages""messages:123"
  • event - 字串事件名稱,例如 "phx_join"
  • payload - 訊息的有效負載
  • ref - 唯一的字串參照

PubSub

PubSub 由 Phoenix.PubSub 模組提供。有興趣的一方可以透過訂閱主題來接收事件。其他程序可以對特定主題廣播事件。

這有助於在頻道上廣播訊息,也適用於一般的應用程式開發。例如,讓所有已連線的即時檢視了解已在貼文中新增新的留言。

PubSub 系統負責從一個節點傳送訊息到另一個節點,以便它們可以傳送給叢集中的所有訂閱者。預設會使用Phoenix.PubSub.PG2來執行此作業,其中使用原生 BEAM 訊息傳遞。

如果部署環境不支援 Elixir 分散式或者伺服器之間直接通訊,Phoenix 也會提供一個 Redis Adapter,使用 Redis 交換 PubSub 資料。欲了解更多資訊,請參閱 Phoenix.PubSub 文件

用戶端程式庫

只要有用戶端程式庫,任何網路裝置都可以連線到 Phoenix Channels。目前有以下程式庫,而且我們隨時歡迎新的;若要撰寫自己的程式庫,請參閱我們的操作指南 撰寫 Channels 用戶端

官方

Phoenix 提供一個 JavaScript 用戶端,它在產生新的 Phoenix 專案時可用。JavaScript 模組的文件可在 https://hexdocs.dev.org.tw/phoenix/js/ 找到,程式碼位於 多個 js 檔案

第三方

統整所有這些

讓我們透過建立一個簡單的聊天應用程式來統整所有這些概念。請確定 你已經建立了一個新的 Phoenix 應用程式,現在我們準備產生 UserSocket

產生一個 Socket

讓我們呼叫 Socket 產生器來開始

$ mix phx.gen.socket User

它會產生兩個檔案,客戶端程式碼在 assets/js/user_socket.js,伺服器端對應程式碼在 lib/hello_web/channels/user_socket.ex。執行後,產生器還會要求將以下程式碼加入 lib/hello_web/endpoint.ex

defmodule HelloWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :hello

  socket "/socket", HelloWeb.UserSocket,
    websocket: true,
    longpoll: false

  ...
end

產生器也會要求我們匯入用戶端程式碼,我們稍後再進行。

接下來,我們將設定 Socket 以確保訊息會導向到正確的頻道。為此,我們會取消註解 "room:*" 頻道定義

defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "room:*", HelloWeb.RoomChannel
  ...

現在,每當用戶端傳送訊息時,如果其主題以 "room:" 開頭,它會導向我們的 RoomChannel。接下來,我們會定義 HelloWeb.RoomChannel 模組來管理我們的聊天室訊息。

加入頻道

頻道的首要任務是授權用戶端加入特定主題。為了授權,我們必須在 lib/hello_web/channels/room_channel.ex 中實作 join/3

defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end

  def join("room:" <> _private_room_id, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end
end

在我們的聊天應用程式中,我們允許所有人加入 "room:lobby" 主題,但其他任何房間都被視為私人,需要特殊授權 (例如來自資料庫)。(在這個練習中,我們不會擔心私人聊天室,但我們完成後,歡迎你自行探索。)

使用就位的頻道,讓我們讓用戶端和伺服器開始交談。

產生的 assets/js/user_socket.js 定義了一個簡單的用戶端,基於 Phoenix 附帶的 Socket 實作。

我們可以使用那個函式庫連線到我們的 Socket 並加入我們的頻道,我們只需要在該檔案中將我們的房間名稱設定為 "room:lobby"

// assets/js/user_socket.js
// ...
socket.connect()

// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("room:lobby", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

在設定好之後,我們需要確認 assets/js/user_socket.js 已導入我們的應用程式 JavaScript 檔案。若要執行此動作,請取消 assets/js/app.js 中這行的註解。

// ...
import "./user_socket.js"

儲存該檔案,瀏覽器應會自動重新整理,感謝 Phoenix 的即時重新整理器。如果一切順利,我們應會在瀏覽器的 JavaScript 主控台中看見「加入成功」。我們的用戶端和伺服器現在透過持續連線進行交談。讓我們透過啟用聊天功能,讓它發揮作用。

lib/hello_web/controllers/page_html/home.html.heex 中,我們會以一個容器來取代現有的程式碼,以儲存我們的聊天訊息,以及一個輸入欄位來傳送它們

<div id="messages" role="log" aria-live="polite"></div>
<input id="chat-input" type="text">

現在我們要在 assets/js/user_socket.js 中新增幾個事件聆聽器

// ...
let channel           = socket.channel("room:lobby", {})
let chatInput         = document.querySelector("#chat-input")
let messagesContainer = document.querySelector("#messages")

chatInput.addEventListener("keypress", event => {
  if(event.key === 'Enter'){
    channel.push("new_msg", {body: chatInput.value})
    chatInput.value = ""
  }
})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

我們所要做的就是偵測是否按了 Enter,然後藉由頻道 push 一個事件,並包含訊息主體。我們把事件命名為 "new_msg"。這麼做後,讓我們處理聊天應用程式的另一部分,也就是聆聽新訊息,並將它們附加到我們的訊息容器中。

// ...
let channel           = socket.channel("room:lobby", {})
let chatInput         = document.querySelector("#chat-input")
let messagesContainer = document.querySelector("#messages")

chatInput.addEventListener("keypress", event => {
  if(event.key === 'Enter'){
    channel.push("new_msg", {body: chatInput.value})
    chatInput.value = ""
  }
})

channel.on("new_msg", payload => {
  let messageItem = document.createElement("p")
  messageItem.innerText = `[${Date()}] ${payload.body}`
  messagesContainer.appendChild(messageItem)
})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

我們使用 channel.on 聆聽 "new_msg" 事件,然後將訊息主體附加到 DOM 中。現在讓我們處理伺服器上的傳入和傳出事件,以完成這個畫面。

傳入事件

我們使用 handle_in/3 處理傳入事件。我們可以對像 "new_msg" 這樣的事件名稱進行樣式比對,然後取得用戶端經由頻道傳遞的有效負載。對於我們的聊天應用程式,我們只需要使用 broadcast!/3 來通知所有其他 room:lobby 訂閱者新的訊息即可。

defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end

  def join("room:" <> _private_room_id, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end

  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast!(socket, "new_msg", %{body: body})
    {:noreply, socket}
  end
end

broadcast!/3 會通知此 socket 主題上所有已加入的用戶端,並呼叫他們的 handle_out/3 回呼。 handle_out/3 不是一個強制性的回呼,但它讓我們在 Broadcast 傳送給每個用戶端之前,自訂並篩選它。預設情況下, handle_out/3 已為我們實作,只需將訊息傳送給用戶端即可。連線到傳出事件有助於強大的訊息自訂和篩選。讓我們看看如何執行。

攔截傳出事件

我們不會在我們的應用程式中實作這個,但想像我們的聊天應用程式允許使用者忽略有關新使用者加入房間的訊息。我們可以用這種方式實作那個行為,說明 Phoenix 我們想要攔截哪個傳出的事件,接著為那些事件定義一個 handle_out/3 回呼。 (當然,這假設我們有 Accounts 環境,它有 ignoring_user?/2 函式,並且我們透過 assigns 地圖傳遞使用者)。很重要的一點是 handle_out/3 回呼會針對訊息的每個接收者呼叫,所以在包含在 handle_out/3 之前,應仔細考慮像存取資料庫這種較花費成本的操作。

intercept ["user_joined"]

def handle_out("user_joined", msg, socket) do
  if Accounts.ignoring_user?(socket.assigns[:user], msg.user_id) do
    {:noreply, socket}
  else
    push(socket, "user_joined", msg)
    {:noreply, socket}
  end
end

這就是我們的基本聊天應用程式的全部內容。開啟多個瀏覽器索引標籤,然後你會看到你的訊息被推播並廣播到所有視窗!

使用權杖驗證

當我們連線時,通常需要驗證客戶端。幸運的是,透過 Phoenix.Token,這是一個 4 步驟的程序。

步驟 1 - 在連線中指定權杖

假設我們的應用程式有一個稱為 OurAuth 的驗證外掛程式。當 OurAuth 驗證使用者時,它會設定 conn.assigns:current_user 鍵的值。由於 current_user 存在,我們可以簡單地在連線中指定使用者的權杖供配置使用。我們可以將那個行為包在一個私人函式外掛程式 put_user_token/2 中。也可以把它放在它自己的模組中。為了讓這一切運作,我們只要將 OurAuthput_user_token/2 新增到瀏覽器管線中就可以了。

pipeline :browser do
  ...
  plug OurAuth
  plug :put_user_token
end

defp put_user_token(conn, _) do
  if current_user = conn.assigns[:current_user] do
    token = Phoenix.Token.sign(conn, "user socket", current_user.id)
    assign(conn, :user_token, token)
  else
    conn
  end
end

現在我們的 conn.assigns 包含 current_useruser_token

步驟 2 - 將權杖傳遞給 JavaScript

接下來,我們需要將這個權杖傳遞給 JavaScript。我們可以在 lib/hello_web/components/layouts/app.html.heex 中的 app.js 腳本正上方的腳本標籤中這樣做

<script>window.userToken = "<%= assigns[:user_token] %>";</script>
<script src={~p"/assets/app.js"}></script>

步驟 3 - 將權杖傳遞給 Socket 建構函式並驗證

我們還需要將 :params 傳遞給 Socket 建構函式,並在 connect/3 函式中驗證使用者權杖。為此,請編輯 lib/hello_web/channels/user_socket.ex,如下所示

def connect(%{"token" => token}, socket, _connect_info) do
  # max_age: 1209600 is equivalent to two weeks in seconds
  case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
    {:ok, user_id} ->
      {:ok, assign(socket, :current_user, user_id)}
    {:error, reason} ->
      :error
  end
end

在我們的 JavaScript 中,我們可以在建構 Socket 時使用先前設定的權杖

let socket = new Socket("/socket", {params: {token: window.userToken}})

我們使用 Phoenix.Token.verify/4 來驗證客戶端提供的使用者權杖。 Phoenix.Token.verify/4 會傳回 {:ok, user_id}{:error, reason}。我們可以在 case 語句中建立回傳結果的模式比對。使用已驗證的權杖時,我們會設定使用者的 ID 為 socket 中 :current_user 的值。否則,我們會傳回 :error

步驟 4 - 在 JavaScript 中連線至 socket

當身分驗證設定好之後,我們就可以從 JavaScript 連線至 socket 和通道。

let socket = new Socket("/socket", {params: {token: window.userToken}})
socket.connect()

現在我們已經連線,可以加入具有主題的通道

let channel = socket.channel("topic:subtopic", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

請注意權杖身分驗證是較好的方式,因為它不會在傳輸中變動,且非常適合長時間連線,例如通道,而非使用工作階段或其他身分驗證方式。

容錯能力和可靠性保證

伺服器會重新啟動,網路會中斷,而客戶端會失去連線功能。我們需要了解 Phoenix 如何回應這些事件及它提供的保證,才能設計出強固的系統。

處理重新連線

客戶端訂閱主題,而 Phoenix 會將這些訂閱儲存在記憶體中的 ETS 表格中。如果通道發生問題,客戶端需要重新連線至他們之前訂閱的主題。所幸,Phoenix JavaScript 客戶端知道如何執行此動作。伺服器會通知所有客戶端發生問題。這會觸發每個客戶端 Channel.onError 的回呼。客戶端會嘗試使用指數退避策略重新連線至伺服器。他們只要重新連線後,就會嘗試重新加入他們之前訂閱的主題。如果成功,他們就會像之前一樣開始收到來自這些主題的訊息。

重新傳送客戶端訊息

通道客戶端會將傳送訊息放入 PushBuffer,並在有連線時將它們傳送至伺服器。如果沒有任何可用連線,客戶端會暫停訊息,直到它可以建立新的連線。客戶端會在記憶體中暫停訊息,直到它建立連線或收到 timeout 事件為止。預設逾時時間設定為 5000 毫秒。客戶端不會在瀏覽器的儲存空間中保留訊息,因此如果瀏覽器分頁關閉,訊息就會消失。

重新傳送伺服器訊息

在將訊息傳送給客戶端時,Phoenix 使用至多一次策略。如果客戶端離線並遺漏訊息,則 Phoenix 就不會重新傳送它。Phoenix 沒有在伺服器上保留訊息。如果伺服器重新啟動,未傳送的訊息就會消失。如果我們的應用程式需要更強的訊息傳遞保證,就需要自行撰寫該程式碼。常見方法包括在伺服器上保留訊息,並讓客戶端要求取得遺失的訊息。舉例來說,請參閱 Chris McCord 的 Phoenix 訓練課程:客戶端程式碼伺服器程式碼

範例應用程式

若要查看我們剛剛建立的應用程式範例,請檢查專案 phoenix_chat_example