檢視原始程式碼 Phoenix.ChannelTest (Phoenix v1.7.14)

用於測試 Phoenix 通道的便利工具。

在管道測試中,我們透過程序通訊來與管道互動,傳送和接收訊息。訂閱管道所訂閱的主題也很常見,這樣讓我們能夠確認特定訊息是否已廣播。

管道測試

要開始,請在測試案例中定義模組屬性 @endpoint,並指向應用程式端點。

接著,您可以直接建立一個 socket 並 subscribe_and_join/4 主題和管道

{:ok, _, socket} =
  socket(UserSocket, "user:id", %{some_assigns: 1})
  |> subscribe_and_join(RoomChannel, "room:lobby", %{"id" => 3})

您通常想設定相同的 ID,並指定 UserSocket.connect/3 回呼會設定的那個叫用。或者,您可以使用 connect/3 幫手程式呼叫 UserSocket.connect/3 回呼,並使用 socket id 初始化 socket。

{:ok, socket} = connect(UserSocket, %{"some" => "params"}, %{})
{:ok, _, socket} = subscribe_and_join(socket, "room:lobby", %{"id" => 3})

呼叫 subscribe_and_join/4 之後,它將訂閱目前的測試程序至「room:lobby」主題,並在另一個程序中啟動管道。它會傳回 {:ok, reply, socket}{:error, reply}

現在,就像管道有代表要推播到客戶端的通訊的 socket 一樣。我們的測試有代表要推播到伺服器的通訊的 socket。

例如,我們可以在測試中使用 push/3 函數將訊息推播到管道(它會呼叫 handle_in/3

push(socket, "my_event", %{"some" => "data"})

類似地,我們可以從測試本身在測試和管道都已訂閱的主題上廣播訊息,並在管道上觸發 handle_out/3

broadcast_from(socket, "my_event", %{"some" => "data"})

請注意,只有 broadcast_from/3broadcast_from!/3 可在測試中使用,以避免廣播訊息重複傳送至測試程序。

當以上函數將資料推播至管道(伺服器)時,我們可以使用 assert_push/3 來驗證管道已將訊息推播至客戶端

assert_push "my_event", %{"some" => "data"}

甚至可以斷言有些訊息已廣播至 pubsub

assert_broadcast "my_event", %{"some" => "data"}

最後,每當訊息推播至管道時,就會傳回一個參考。我們可以使用此參考來斷言伺服器已傳送特定回復

ref = push(socket, "counter", %{})
assert_reply ref, :ok, %{"counter" => 1}

檢查副作用

常常會需要在傳播訊道中執行副作用,例如寫入資料庫,並在測試中驗證這些副作用。

想像下面的 handle_in/3 在一個傳播訊道中

def handle_in("publish", %{"id" => id}, socket) do
  Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
  {:noreply, socket}
end

由於整個通訊是異步的,因此下面的測試會非常脆弱

push(socket, "publish", %{"id" => 3})
assert Repo.get_by(Post, id: 3, published: true)

問題在於我們沒有保證在呼叫 push/3 之後,傳播訊道已處理我們的訊息。最好的解決方案是在執行任何其他斷言之前,斷言傳播訊道已傳送回覆給我們。首先,讓傳播訊道開始傳送回覆

def handle_in("publish", %{"id" => id}, socket) do
  Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
  {:reply, :ok, socket}
end

然後在測試中期待它們

ref = push(socket, "publish", %{"id" => 3})
assert_reply ref, :ok
assert Repo.get_by(Post, id: 3, published: true)

離開和關閉

這個模組也提供了模擬離開和關閉傳播訊道的功能。一旦你離開或關閉一個傳播訊道,因為傳播訊道在加入時連結到測試程序,所以它會中斷測試程序

leave(socket)
** (EXIT from #PID<...>) {:shutdown, :leave}

你可以在測試中,透過取消連結傳播訊道程序來避免這個問題

Process.unlink(socket.channel_pid)

請注意,leave/1 是異步的,因此它也會傳回一個參考,你可以用來檢查回覆

ref = leave(socket)
assert_reply ref, :ok

另一方面,關閉總是同步的,而且它只會在保證傳播訊道程序已終止後傳回

:ok = close(socket)

這模擬了客戶端中的既有行為。

為了斷言傳播訊道以異步的方式關閉或發生錯誤,你可以使用 Elixir 提供的工具來監控傳播訊道程序,並等待 :DOWN 訊息。想像一下 handle_info/2 的實作會在收到 :some_message 時關閉傳播訊道的功能

def handle_info(:some_message, socket) do
  {:stop, :normal, socket}
end

在你的測試中,你可以透過下列方式斷言關閉已發生

Process.monitor(socket.channel_pid)
send(socket.channel_pid, :some_message)
assert_receive {:DOWN, _, _, _, :normal}

摘要

函式

斷言傳播訊道已在 timeout 內,將包含指定事件和負載的訊息推回給客戶端。

從 pid 到所有 socket 主題訂閱者的廣播事件。

broadcast_from/3 相同,但如果廣播失敗,則會引發例外。

模擬客戶端關閉 socket。

為 socket 處理常式啟動傳輸連線。

加入指定主題和資訊的頻道。

模擬用戶端離開頻道。

推送訊息至頻道。

斷言頻道尚未在 timeout 內向用戶端推送與指定事件和資訊相符的訊息。

使用指定的 socket_module 建立 socket。

使用指定的 socket_module、id 和 assigns 建立 socket。

訂閱指定的專題並加入指定專題和資訊下的頻道。

subscribe_and_join/4 相同,但會傳回 socket 或擲回錯誤。

函數

連結到此巨集

assert_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

查看原始碼 (巨集)

斷言傳播訊道已在 timeout 內廣播訊息。

在斷言任一廣播訊息之前,我們必須先在測試過程中訂閱頻道的專題

@endpoint.subscribe("foo:ok")

現在我們可以透過模式比對事件和資訊

assert_broadcast "some_event", %{"data" => _}

上述斷言中,我們不必特別在乎所傳送的資料為何,只要有傳送資料即可。

計時單位為毫秒,預設為 :ex_unit 應用程式上設定的 :assert_receive_timeout (預設為 100 毫秒)。

連結到此巨集

assert_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

查看原始碼 (巨集)

斷言傳播訊道已在 timeout 內,將包含指定事件和負載的訊息推回給客戶端。

請注意,事件和資訊都是模式。這表示程式可以撰寫

assert_push "some_event", %{"data" => _}

上述斷言中,我們不必特別在乎所傳送的資料為何,只要有傳送資料即可。

計時單位為毫秒,預設為 :ex_unit 應用程式上設定的 :assert_receive_timeout (預設為 100 毫秒)。

注意:由於 event 和 payload 是模式,因此它們將會被比對。這表示如果你希望斷言接收到的 payload 等同於既有的變數,那麼你需要在斷言表達式中固定這個變數。

expected_payload = %{foo: "bar"}
assert_push "some_event", ^expected_payload

expected_payload = %{foo: "bar"}
assert_push "some_event", expected_payload
# The code above does not assert the payload matches the described map.
連結到此巨集

assert_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))

檢視原始碼 (巨集)

斷言傳播訊道已在 timeout 內回覆給指定的訊息。

請注意,狀態和 payload 是模式。這表示可以撰寫

ref = push(channel, "some_event")
assert_reply ref, :ok, %{"data" => _}

在上面的斷言中,我們不會特別在意傳送的資料,而只在意是否有回應到。

計時單位為毫秒,預設為 :ex_unit 應用程式上設定的 :assert_receive_timeout (預設為 100 毫秒)。

連結到此函式

broadcast_from(socket, event, message)

檢視原始碼

從 pid 到所有 socket 主題訂閱者的廣播事件。

測試程序不會收到已發佈的訊息。這會觸發通道中的 handle_out/3 回呼。

範例

iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
連結到此函式

broadcast_from!(socket, event, message)

檢視原始碼

broadcast_from/3 相同,但如果廣播失敗,則會引發例外。

連結到此函式

close(socket, timeout \\ 5000)

檢視原始碼

模擬客戶端關閉 socket。

關閉 socket 是同步的,且預設逾時時間為 5000 毫秒。

連結到此巨集

connect(handler, params, options \\ quote do [] end)

檢視原始碼 (巨集)

為 socket 處理常式啟動傳輸連線。

適用於 UserSocket 驗證的測試。傳回處理常式 connect/3 的結果。

請參閱 join/4

連結到此函式

join(socket, topic, payload)

檢視原始碼

請參閱 join/4

連結到此函式

join(socket, channel, topic, payload \\ %{})

檢視原始碼

加入指定主題和資訊的頻道。

指定的通道在一個獨立的程序中加入,該程序連結到測試程序。

它傳回 {:ok, reply, socket}{:error, reply}

@spec leave(Phoenix.Socket.t()) :: reference()

模擬用戶端離開頻道。

連結到此函式

push(socket, event, payload \\ %{})

檢視原始碼
@spec push(Phoenix.Socket.t(), String.t(), map()) :: reference()

推送訊息至頻道。

這會觸發通道中的 handle_in/3 回呼。

範例

iex> push(socket, "new_message", %{id: 1, content: "hello"})
reference
連結到此巨集

refute_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

檢視原始碼 (巨集)

斷言頻道尚未在 timeout 內廣播訊息。

assert_broadcast 類似,event 和 payload 都是模式。

逾時時間以毫秒為單位,預設值為 :ex_unit 應用程式設定的 :refute_receive_timeout (預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。

連結到此巨集

refute_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

檢視原始碼 (巨集)

斷言頻道尚未在 timeout 內向用戶端推送與指定事件和資訊相符的訊息。

assert_push 類似,event 和 payload 都是模式。

逾時時間以毫秒為單位,預設值為 :ex_unit 應用程式設定的 :refute_receive_timeout (預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。

連結到此巨集

refute_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))

檢視原始碼 (巨集)

斷言頻道尚未在 timeout 內以相符的資訊回覆訊息。

assert_reply 相似,事件和負載都是樣式。

逾時時間以毫秒為單位,預設值為 :ex_unit 應用程式設定的 :refute_receive_timeout (預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。

連結到此巨集

socket(socket_module)

檢視原始碼 (宏)

使用指定的 socket_module 建立 socket。

然後使用套接字訂閱和加入頻道。當您想要建立空白套接字傳遞給函式(例如 UserSocket.connect/3)時,使用這個函式。

否則,如果您想要使用現有 ID 和指定項建立套接字,請使用 socket/4

範例

socket(MyApp.UserSocket)
連結到此巨集

socket(socket_module, socket_id, socket_assigns, options \\ [])

檢視原始碼 (宏)

使用指定的 socket_module、id 和 assigns 建立 socket。

範例

socket(MyApp.UserSocket, "user_id", %{some: :assign})

如果您需要在測試程序以外的程序中存取套接字,可以在第 4 個引數中提供測試程序的 pid

範例

test "connect in a task" do
  pid = self()
  task = Task.async(fn -> 
    socket = socket(MyApp.UserSocket, "user_id", %{some: :assign}, test_process: pid)
    broadcast_from!(socket, "default", %{"foo" => "bar"})
    assert_push "default", %{"foo" => "bar"}
  end)
  Task.await(task)
end
連結到此函式

subscribe_and_join(socket, topic)

檢視原始碼

請見 subscribe_and_join/4

連結到此函式

subscribe_and_join(socket, topic, payload)

檢視原始碼

請見 subscribe_and_join/4

連結到此函式

subscribe_and_join(socket, channel, topic, payload \\ %{})

檢視原始碼

訂閱指定的專題並加入指定專題和資訊下的頻道。

透過訂閱主題,我們可以使用 assert_broadcast/3 來驗證訊息已透過 pubsub 層傳送。

透過加入頻道,我們可以直接與之互動。給定的頻道會在與測試程序連結的獨立程序中加入。

如果未提供頻道模組,套接字的處理常式會用於對給定的主題尋找相符頻道。

它傳回 {:ok, reply, socket}{:error, reply}

連結到此函式

subscribe_and_join!(socket, topic)

檢視原始碼

請見 subscribe_and_join!/4

連結到此函式

subscribe_and_join!(socket, topic, payload)

檢視原始碼

請見 subscribe_and_join!/4

連結到此函式

subscribe_and_join!(socket, channel, topic, payload \\ %{})

檢視原始碼

subscribe_and_join/4 相同,但會傳回 socket 或擲回錯誤。

當您不會測試加入頻道,而僅需要套接字時,這會很有用處。