檢視原始程式碼 Phoenix.ChannelTest (Phoenix v1.7.14)
用於測試 Phoenix 通道的便利工具。
在管道測試中,我們透過程序通訊來與管道互動,傳送和接收訊息。訂閱管道所訂閱的主題也很常見,這樣讓我們能夠確認特定訊息是否已廣播。
管道測試
要開始,請在測試案例中定義模組屬性 @endpoint
,並指向應用程式端點。
接著,您可以直接建立一個 socket 並 subscribe_and_join/4
主題和管道
{:ok, _, socket} =
socket(UserSocket, "user:id", %{some_assigns: 1})
|> subscribe_and_join(RoomChannel, "room:lobby", %{"id" => 3})
您通常想設定相同的 ID,並指定 UserSocket.connect/3
回呼會設定的那個叫用。或者,您可以使用 connect/3
幫手程式呼叫 UserSocket.connect/3
回呼,並使用 socket id 初始化 socket。
{:ok, socket} = connect(UserSocket, %{"some" => "params"}, %{})
{:ok, _, socket} = subscribe_and_join(socket, "room:lobby", %{"id" => 3})
呼叫 subscribe_and_join/4
之後,它將訂閱目前的測試程序至「room:lobby」主題,並在另一個程序中啟動管道。它會傳回 {:ok, reply, socket}
或 {:error, reply}
。
現在,就像管道有代表要推播到客戶端的通訊的 socket 一樣。我們的測試有代表要推播到伺服器的通訊的 socket。
例如,我們可以在測試中使用 push/3
函數將訊息推播到管道(它會呼叫 handle_in/3
)
push(socket, "my_event", %{"some" => "data"})
類似地,我們可以從測試本身在測試和管道都已訂閱的主題上廣播訊息,並在管道上觸發 handle_out/3
broadcast_from(socket, "my_event", %{"some" => "data"})
請注意,只有
broadcast_from/3
和broadcast_from!/3
可在測試中使用,以避免廣播訊息重複傳送至測試程序。
當以上函數將資料推播至管道(伺服器)時,我們可以使用 assert_push/3
來驗證管道已將訊息推播至客戶端
assert_push "my_event", %{"some" => "data"}
甚至可以斷言有些訊息已廣播至 pubsub
assert_broadcast "my_event", %{"some" => "data"}
最後,每當訊息推播至管道時,就會傳回一個參考。我們可以使用此參考來斷言伺服器已傳送特定回復
ref = push(socket, "counter", %{})
assert_reply ref, :ok, %{"counter" => 1}
檢查副作用
常常會需要在傳播訊道中執行副作用,例如寫入資料庫,並在測試中驗證這些副作用。
想像下面的 handle_in/3
在一個傳播訊道中
def handle_in("publish", %{"id" => id}, socket) do
Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
{:noreply, socket}
end
由於整個通訊是異步的,因此下面的測試會非常脆弱
push(socket, "publish", %{"id" => 3})
assert Repo.get_by(Post, id: 3, published: true)
問題在於我們沒有保證在呼叫 push/3
之後,傳播訊道已處理我們的訊息。最好的解決方案是在執行任何其他斷言之前,斷言傳播訊道已傳送回覆給我們。首先,讓傳播訊道開始傳送回覆
def handle_in("publish", %{"id" => id}, socket) do
Repo.get!(Post, id) |> Post.publish() |> Repo.update!()
{:reply, :ok, socket}
end
然後在測試中期待它們
ref = push(socket, "publish", %{"id" => 3})
assert_reply ref, :ok
assert Repo.get_by(Post, id: 3, published: true)
離開和關閉
這個模組也提供了模擬離開和關閉傳播訊道的功能。一旦你離開或關閉一個傳播訊道,因為傳播訊道在加入時連結到測試程序,所以它會中斷測試程序
leave(socket)
** (EXIT from #PID<...>) {:shutdown, :leave}
你可以在測試中,透過取消連結傳播訊道程序來避免這個問題
Process.unlink(socket.channel_pid)
請注意,leave/1
是異步的,因此它也會傳回一個參考,你可以用來檢查回覆
ref = leave(socket)
assert_reply ref, :ok
另一方面,關閉總是同步的,而且它只會在保證傳播訊道程序已終止後傳回
:ok = close(socket)
這模擬了客戶端中的既有行為。
為了斷言傳播訊道以異步的方式關閉或發生錯誤,你可以使用 Elixir 提供的工具來監控傳播訊道程序,並等待 :DOWN
訊息。想像一下 handle_info/2
的實作會在收到 :some_message
時關閉傳播訊道的功能
def handle_info(:some_message, socket) do
{:stop, :normal, socket}
end
在你的測試中,你可以透過下列方式斷言關閉已發生
Process.monitor(socket.channel_pid)
send(socket.channel_pid, :some_message)
assert_receive {:DOWN, _, _, _, :normal}
摘要
函式
斷言傳播訊道已在 timeout
內廣播訊息。
斷言傳播訊道已在 timeout
內,將包含指定事件和負載的訊息推回給客戶端。
斷言傳播訊道已在 timeout
內回覆給指定的訊息。
從 pid 到所有 socket 主題訂閱者的廣播事件。
與 broadcast_from/3
相同,但如果廣播失敗,則會引發例外。
模擬客戶端關閉 socket。
為 socket 處理常式啟動傳輸連線。
加入指定主題和資訊的頻道。
模擬用戶端離開頻道。
推送訊息至頻道。
斷言頻道尚未在 timeout
內廣播訊息。
斷言頻道尚未在 timeout
內向用戶端推送與指定事件和資訊相符的訊息。
斷言頻道尚未在 timeout
內以相符的資訊回覆訊息。
使用指定的 socket_module
建立 socket。
使用指定的 socket_module
、id 和 assigns 建立 socket。
訂閱指定的專題並加入指定專題和資訊下的頻道。
與 subscribe_and_join/4
相同,但會傳回 socket 或擲回錯誤。
函數
assert_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
查看原始碼 (巨集)斷言傳播訊道已在 timeout
內廣播訊息。
在斷言任一廣播訊息之前,我們必須先在測試過程中訂閱頻道的專題
@endpoint.subscribe("foo:ok")
現在我們可以透過模式比對事件和資訊
assert_broadcast "some_event", %{"data" => _}
上述斷言中,我們不必特別在乎所傳送的資料為何,只要有傳送資料即可。
計時單位為毫秒,預設為 :ex_unit
應用程式上設定的 :assert_receive_timeout
(預設為 100 毫秒)。
assert_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
查看原始碼 (巨集)斷言傳播訊道已在 timeout
內,將包含指定事件和負載的訊息推回給客戶端。
請注意,事件和資訊都是模式。這表示程式可以撰寫
assert_push "some_event", %{"data" => _}
上述斷言中,我們不必特別在乎所傳送的資料為何,只要有傳送資料即可。
計時單位為毫秒,預設為 :ex_unit
應用程式上設定的 :assert_receive_timeout
(預設為 100 毫秒)。
注意:由於 event 和 payload 是模式,因此它們將會被比對。這表示如果你希望斷言接收到的 payload 等同於既有的變數,那麼你需要在斷言表達式中固定這個變數。
好
expected_payload = %{foo: "bar"}
assert_push "some_event", ^expected_payload
差
expected_payload = %{foo: "bar"}
assert_push "some_event", expected_payload
# The code above does not assert the payload matches the described map.
assert_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :assert_receive_timeout))
檢視原始碼 (巨集)斷言傳播訊道已在 timeout
內回覆給指定的訊息。
請注意,狀態和 payload 是模式。這表示可以撰寫
ref = push(channel, "some_event")
assert_reply ref, :ok, %{"data" => _}
在上面的斷言中,我們不會特別在意傳送的資料,而只在意是否有回應到。
計時單位為毫秒,預設為 :ex_unit
應用程式上設定的 :assert_receive_timeout
(預設為 100 毫秒)。
從 pid 到所有 socket 主題訂閱者的廣播事件。
測試程序不會收到已發佈的訊息。這會觸發通道中的 handle_out/3
回呼。
範例
iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
與 broadcast_from/3
相同,但如果廣播失敗,則會引發例外。
模擬客戶端關閉 socket。
關閉 socket 是同步的,且預設逾時時間為 5000 毫秒。
為 socket 處理常式啟動傳輸連線。
適用於 UserSocket 驗證的測試。傳回處理常式 connect/3
的結果。
請參閱 join/4
。
請參閱 join/4
。
加入指定主題和資訊的頻道。
指定的通道在一個獨立的程序中加入,該程序連結到測試程序。
它傳回 {:ok, reply, socket}
或 {:error, reply}
。
@spec leave(Phoenix.Socket.t()) :: reference()
模擬用戶端離開頻道。
@spec push(Phoenix.Socket.t(), String.t(), map()) :: reference()
推送訊息至頻道。
這會觸發通道中的 handle_in/3
回呼。
範例
iex> push(socket, "new_message", %{id: 1, content: "hello"})
reference
refute_broadcast(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
檢視原始碼 (巨集)斷言頻道尚未在 timeout
內廣播訊息。
與 assert_broadcast
類似,event 和 payload 都是模式。
逾時時間以毫秒為單位,預設值為 :ex_unit
應用程式設定的 :refute_receive_timeout
(預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。
refute_push(event, payload, timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
檢視原始碼 (巨集)斷言頻道尚未在 timeout
內向用戶端推送與指定事件和資訊相符的訊息。
與 assert_push
類似,event 和 payload 都是模式。
逾時時間以毫秒為單位,預設值為 :ex_unit
應用程式設定的 :refute_receive_timeout
(預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。
refute_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ Application.fetch_env!(:ex_unit, :refute_receive_timeout))
檢視原始碼 (巨集)斷言頻道尚未在 timeout
內以相符的資訊回覆訊息。
與 assert_reply
相似,事件和負載都是樣式。
逾時時間以毫秒為單位,預設值為 :ex_unit
應用程式設定的 :refute_receive_timeout
(預設為 100 毫秒)。請記得此巨集會按逾時值封鎖測試,因此僅在必要時使用,因過度使用肯定會讓你的測試組件速度變慢。
使用指定的 socket_module
建立 socket。
然後使用套接字訂閱和加入頻道。當您想要建立空白套接字傳遞給函式(例如 UserSocket.connect/3
)時,使用這個函式。
否則,如果您想要使用現有 ID 和指定項建立套接字,請使用 socket/4
。
範例
socket(MyApp.UserSocket)
使用指定的 socket_module
、id 和 assigns 建立 socket。
範例
socket(MyApp.UserSocket, "user_id", %{some: :assign})
如果您需要在測試程序以外的程序中存取套接字,可以在第 4 個引數中提供測試程序的 pid
。
範例
test "connect in a task" do
pid = self()
task = Task.async(fn ->
socket = socket(MyApp.UserSocket, "user_id", %{some: :assign}, test_process: pid)
broadcast_from!(socket, "default", %{"foo" => "bar"})
assert_push "default", %{"foo" => "bar"}
end)
Task.await(task)
end
訂閱指定的專題並加入指定專題和資訊下的頻道。
透過訂閱主題,我們可以使用 assert_broadcast/3
來驗證訊息已透過 pubsub 層傳送。
透過加入頻道,我們可以直接與之互動。給定的頻道會在與測試程序連結的獨立程序中加入。
如果未提供頻道模組,套接字的處理常式會用於對給定的主題尋找相符頻道。
它傳回 {:ok, reply, socket}
或 {:error, reply}
。
與 subscribe_and_join/4
相同,但會傳回 socket 或擲回錯誤。
當您不會測試加入頻道,而僅需要套接字時,這會很有用處。