檢視原始碼 任務和 gen_tcp
在本章中,我們將學習如何使用 Erlang 的 :gen_tcp
模組 來服務請求。這提供了探索 Elixir 的 Task
模組的絕佳機會。在後面的章節中,我們將擴充我們的伺服器,以便它實際上可以服務命令。
Echo 伺服器
我們將首先實作一個 echo 伺服器來啟動我們的 TCP 伺服器。它將傳送一個回應,其中包含在請求中收到的文字。我們將逐步改善我們的伺服器,直到它受到監督並準備好處理多個連線。
廣義來說,TCP 伺服器執行下列步驟
- 監聽埠口,直到埠口可用並取得 socket
- 在該埠口等待用戶端連線並接受它
- 讀取用戶端請求並寫入回應
讓我們實作這些步驟。移至 apps/kv_server
應用程式,開啟 lib/kv_server.ex
,並新增下列函式
defmodule KVServer do
require Logger
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
# 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
#
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
我們將透過呼叫 KVServer.accept(4040)
來啟動我們的伺服器,其中 4040 是埠口。accept/1
中的第一個步驟是監聽埠口,直到 socket 可用,然後呼叫 loop_acceptor/1
。loop_acceptor/1
是接受用戶端連線的迴圈。對於每個已接受的連線,我們呼叫 serve/1
。
serve/1
是另一個從 socket 讀取一行並將這些行寫回 socket 的迴圈。請注意,serve/1
函數使用管道運算子 |>/2
來表達此作業流程。管道運算子會評估左側並將其結果作為右側函數的第一個引數傳遞。以上的範例
socket |> read_line() |> write_line(socket)
等於
write_line(read_line(socket), socket)
read_line/1
實作使用 :gen_tcp.recv/2
從 socket 接收資料,而 write_line/2
使用 :gen_tcp.send/2
寫入 socket。
請注意,serve/1
是在 loop_acceptor/1
內部循序呼叫的無限迴圈,因此永遠不會到達對 loop_acceptor/1
的尾端呼叫,而且可以避免。不過,正如我們將看到的,我們需要在一個獨立的程序中執行 serve/1
,因此我們很快就會需要那個尾端呼叫。
這幾乎就是我們實作 echo 伺服器所需要的一切。讓我們來試試看!
使用 iex -S mix
在 kv_server
應用程式內啟動一個 IEx 會話。在 IEx 內,執行
iex> KVServer.accept(4040)
伺服器現在正在執行,你甚至會注意到主控台被封鎖了。讓我們使用 一個 telnet
客户端 來存取我們的伺服器。大多數作業系統都有可用的客户端,而且它們的命令列通常類似
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
輸入「hello」,按下 Enter,你就會收到「hello」。太棒了!
我可以透過輸入 ctrl + ]
、輸入 quit
,然後按下 <Enter>
來退出我的特定 telnet 客户端,但你的客户端可能需要不同的步驟。
一旦你退出 telnet 客户端,你可能會在 IEx 會話中看到一個錯誤
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:45: KVServer.read_line/1
(kv_server) lib/kv_server.ex:37: KVServer.serve/1
(kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1
那是因為我們預期會從 :gen_tcp.recv/2
接收資料,但客户端關閉了連線。我們需要在伺服器的未來版本中更好地處理此類情況。
現在,有一個更重要的錯誤需要我們修復:如果我們的 TCP 接收器崩潰了會發生什麼事?由於沒有監督,伺服器會掛掉,我們將無法處理更多要求,因為它不會重新啟動。這就是我們必須將伺服器移到一個監督樹的原因。
任務
我們已經了解代理、通用伺服器和監督者。它們都是用於處理多則訊息或管理狀態。但當我們只需要執行某項任務時,我們使用什麼呢?
Task
模組確切地提供了此功能。例如,它有一個 Task.start_link/1
函式,它接收一個匿名函式,並在一個新程序中執行它,而該程序將成為監督樹的一部分。
讓我們嘗試一下。開啟 lib/kv_server/application.ex
,並將 start/2
函式中的監督者變更為以下內容
def start(_type, _args) do
children = [
{Task, fn -> KVServer.accept(4040) end}
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
和往常一樣,我們已傳遞一個二元組作為子項規格,而它又會呼叫 Task.start_link/1
。
透過這個變更,我們表示我們想要執行 KVServer.accept(4040)
作為一個任務。我們現在對埠進行硬編碼,但這可以用幾種方式變更,例如,在啟動應用程式時從系統環境中讀取埠
port = String.to_integer(System.get_env("PORT") || "4040")
# ...
{Task, fn -> KVServer.accept(port) end}
在您的程式碼中插入這些變更,現在您可以使用以下命令啟動您的應用程式 PORT=4321 mix run --no-halt
,請注意我們如何將埠作為變數傳遞,但如果沒有給定埠,則預設為 4040。
現在伺服器已成為監督樹的一部分,當我們執行應用程式時,它應該會自動啟動。啟動您的伺服器,現在傳遞埠,並再次使用 telnet
用戶端以確保一切仍然正常運作
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
是的,它運作了!但是,它能擴充嗎?
嘗試同時連接兩個 telnet 用戶端。當您這樣做時,您會注意到第二個用戶端不會產生回音
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
它似乎完全無法運作。這是因為我們在接受連線的同一個程序中服務要求。當一個用戶端連線時,我們無法接受另一個用戶端。
任務監督者
為了讓我們的伺服器處理同時連線,我們需要一個程序作為接受者,它會產生其他程序來服務要求。一個解決方案是變更
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
以同時使用 Task.start_link/1
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end
我們正在直接從接受者程序啟動一個連結的任務。但我們已經犯過一次這個錯誤了。您還記得嗎?
這類似於我們直接從登錄檔呼叫 KV.Bucket.start_link/1
時所犯的錯誤。這表示任何儲存區的失敗都會導致整個登錄檔中斷。
上述程式碼會有相同的缺陷:如果我們將 serve(client)
任務連結到接受者,則在服務要求時發生崩潰會導致接受者和所有其他連線中斷。
我們透過使用一個簡單的一對一監督者來修復登錄檔的問題。我們將在此處使用相同的策略,只不過這個模式在任務中很常見,因此 Task
已經附帶了一個解決方案:一個簡單的一對一監督者,它會將臨時任務作為我們監督樹的一部分啟動。
讓我們再次變更 start/2
,以將監控器新增至我們的樹狀結構
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
{Task, fn -> KVServer.accept(port) end}
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
我們現在將以 KVServer.TaskSupervisor
名稱啟動 Task.Supervisor
程序。請記住,由於接受器任務取決於此監控器,因此必須先啟動監控器。
現在,我們需要變更 loop_acceptor/1
以使用 Task.Supervisor
來服務每個請求
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
您可能會注意到我們新增了一行,:ok = :gen_tcp.controlling_process(client, pid)
。這會讓子程序成為 client
socket 的「控制程序」。如果我們沒有執行此操作,則如果接受器發生故障,接受器會中斷所有用戶端,因為 socket 會繫結到已接受它們的程序 (這是預設行為)。
使用 PORT=4040 mix run --no-halt
啟動新的伺服器,現在我們可以開啟許多並行的 Telnet 用戶端。您還會注意到,退出用戶端不會中斷接受器。太棒了!
以下是完整的回音伺服器實作
defmodule KVServer do
require Logger
@doc """
Starts accepting connections on the given `port`.
"""
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
由於我們已變更監控器規格,因此我們需要詢問:我們的監控策略是否仍然正確?
在這種情況下,答案是肯定的:如果接受器發生故障,則不需要中斷現有的連線。另一方面,如果任務監控器發生故障,則不需要也中斷接受器。
不過,仍有一個問題尚未解決,那就是重新啟動策略。預設情況下,任務的 :restart
值設定為 :temporary
,這表示它們不會重新啟動。這是透過 Task.Supervisor
啟動的連線的絕佳預設值,因為重新啟動失敗的連線沒有意義,但對於接受器來說卻是一個糟糕的選擇。如果接受器發生故障,我們希望讓接受器重新啟動並執行。
讓我們解決這個問題。我們知道對於形狀為 {Task, fun}
的子代,Elixir 會呼叫 Task.child_spec(fun)
來擷取底層子代規格。因此,有人可能會想像,若要將 {Task, fun}
規格變更為 :restart
為 :permanent
,我們需要變更 Task
模組。然而,這是不可能的,因為 Task
模組定義為 Elixir 標準函式庫的一部分(即使有可能,這也不太可能是個好主意)。幸運的是,這可以使用 Supervisor.child_spec/2
來完成,它允許我們使用新值來設定子代規格。讓我們再次在 KVServer.Application
中改寫 start/2
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
現在我們有一個持續運作的接收器,它會在持續運作的任務監督器下啟動暫時任務程序。
在下一章,我們將開始剖析客戶端要求並傳送回應,完成我們的伺服器。