檢視原始碼 任務和 gen_tcp

在本章中,我們將學習如何使用 Erlang 的 :gen_tcp 模組 來服務請求。這提供了探索 Elixir 的 Task 模組的絕佳機會。在後面的章節中,我們將擴充我們的伺服器,以便它實際上可以服務命令。

Echo 伺服器

我們將首先實作一個 echo 伺服器來啟動我們的 TCP 伺服器。它將傳送一個回應,其中包含在請求中收到的文字。我們將逐步改善我們的伺服器,直到它受到監督並準備好處理多個連線。

廣義來說,TCP 伺服器執行下列步驟

  1. 監聽埠口,直到埠口可用並取得 socket
  2. 在該埠口等待用戶端連線並接受它
  3. 讀取用戶端請求並寫入回應

讓我們實作這些步驟。移至 apps/kv_server 應用程式,開啟 lib/kv_server.ex,並新增下列函式

defmodule KVServer do
  require Logger

  def accept(port) do
    # The options below mean:
    #
    # 1. `:binary` - receives data as binaries (instead of lists)
    # 2. `packet: :line` - receives data line by line
    # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
    # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
    #
    {:ok, socket} =
      :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info("Accepting connections on port #{port}")
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    serve(client)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

我們將透過呼叫 KVServer.accept(4040) 來啟動我們的伺服器,其中 4040 是埠口。accept/1 中的第一個步驟是監聽埠口,直到 socket 可用,然後呼叫 loop_acceptor/1loop_acceptor/1 是接受用戶端連線的迴圈。對於每個已接受的連線,我們呼叫 serve/1

serve/1 是另一個從 socket 讀取一行並將這些行寫回 socket 的迴圈。請注意,serve/1 函數使用管道運算子 |>/2 來表達此作業流程。管道運算子會評估左側並將其結果作為右側函數的第一個引數傳遞。以上的範例

socket |> read_line() |> write_line(socket)

等於

write_line(read_line(socket), socket)

read_line/1 實作使用 :gen_tcp.recv/2 從 socket 接收資料,而 write_line/2 使用 :gen_tcp.send/2 寫入 socket。

請注意,serve/1 是在 loop_acceptor/1 內部循序呼叫的無限迴圈,因此永遠不會到達對 loop_acceptor/1 的尾端呼叫,而且可以避免。不過,正如我們將看到的,我們需要在一個獨立的程序中執行 serve/1,因此我們很快就會需要那個尾端呼叫。

這幾乎就是我們實作 echo 伺服器所需要的一切。讓我們來試試看!

使用 iex -S mixkv_server 應用程式內啟動一個 IEx 會話。在 IEx 內,執行

iex> KVServer.accept(4040)

伺服器現在正在執行,你甚至會注意到主控台被封鎖了。讓我們使用 一個 telnet 客户端 來存取我們的伺服器。大多數作業系統都有可用的客户端,而且它們的命令列通常類似

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

輸入「hello」,按下 Enter,你就會收到「hello」。太棒了!

我可以透過輸入 ctrl + ]、輸入 quit,然後按下 <Enter> 來退出我的特定 telnet 客户端,但你的客户端可能需要不同的步驟。

一旦你退出 telnet 客户端,你可能會在 IEx 會話中看到一個錯誤

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:45: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:37: KVServer.serve/1
    (kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1

那是因為我們預期會從 :gen_tcp.recv/2 接收資料,但客户端關閉了連線。我們需要在伺服器的未來版本中更好地處理此類情況。

現在,有一個更重要的錯誤需要我們修復:如果我們的 TCP 接收器崩潰了會發生什麼事?由於沒有監督,伺服器會掛掉,我們將無法處理更多要求,因為它不會重新啟動。這就是我們必須將伺服器移到一個監督樹的原因。

任務

我們已經了解代理、通用伺服器和監督者。它們都是用於處理多則訊息或管理狀態。但當我們只需要執行某項任務時,我們使用什麼呢?

Task 模組確切地提供了此功能。例如,它有一個 Task.start_link/1 函式,它接收一個匿名函式,並在一個新程序中執行它,而該程序將成為監督樹的一部分。

讓我們嘗試一下。開啟 lib/kv_server/application.ex,並將 start/2 函式中的監督者變更為以下內容

  def start(_type, _args) do
    children = [
      {Task, fn -> KVServer.accept(4040) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

和往常一樣,我們已傳遞一個二元組作為子項規格,而它又會呼叫 Task.start_link/1

透過這個變更,我們表示我們想要執行 KVServer.accept(4040) 作為一個任務。我們現在對埠進行硬編碼,但這可以用幾種方式變更,例如,在啟動應用程式時從系統環境中讀取埠

port = String.to_integer(System.get_env("PORT") || "4040")
# ...
{Task, fn -> KVServer.accept(port) end}

在您的程式碼中插入這些變更,現在您可以使用以下命令啟動您的應用程式 PORT=4321 mix run --no-halt,請注意我們如何將埠作為變數傳遞,但如果沒有給定埠,則預設為 4040。

現在伺服器已成為監督樹的一部分,當我們執行應用程式時,它應該會自動啟動。啟動您的伺服器,現在傳遞埠,並再次使用 telnet 用戶端以確保一切仍然正常運作

$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

是的,它運作了!但是,它能擴充嗎?

嘗試同時連接兩個 telnet 用戶端。當您這樣做時,您會注意到第二個用戶端不會產生回音

$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

它似乎完全無法運作。這是因為我們在接受連線的同一個程序中服務要求。當一個用戶端連線時,我們無法接受另一個用戶端。

任務監督者

為了讓我們的伺服器處理同時連線,我們需要一個程序作為接受者,它會產生其他程序來服務要求。一個解決方案是變更

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

以同時使用 Task.start_link/1

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

我們正在直接從接受者程序啟動一個連結的任務。但我們已經犯過一次這個錯誤了。您還記得嗎?

這類似於我們直接從登錄檔呼叫 KV.Bucket.start_link/1 時所犯的錯誤。這表示任何儲存區的失敗都會導致整個登錄檔中斷。

上述程式碼會有相同的缺陷:如果我們將 serve(client) 任務連結到接受者,則在服務要求時發生崩潰會導致接受者和所有其他連線中斷。

我們透過使用一個簡單的一對一監督者來修復登錄檔的問題。我們將在此處使用相同的策略,只不過這個模式在任務中很常見,因此 Task 已經附帶了一個解決方案:一個簡單的一對一監督者,它會將臨時任務作為我們監督樹的一部分啟動。

讓我們再次變更 start/2,以將監控器新增至我們的樹狀結構

  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || "4040")

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      {Task, fn -> KVServer.accept(port) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

我們現在將以 KVServer.TaskSupervisor 名稱啟動 Task.Supervisor 程序。請記住,由於接受器任務取決於此監控器,因此必須先啟動監控器。

現在,我們需要變更 loop_acceptor/1 以使用 Task.Supervisor 來服務每個請求

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end

您可能會注意到我們新增了一行,:ok = :gen_tcp.controlling_process(client, pid)。這會讓子程序成為 client socket 的「控制程序」。如果我們沒有執行此操作,則如果接受器發生故障,接受器會中斷所有用戶端,因為 socket 會繫結到已接受它們的程序 (這是預設行為)。

使用 PORT=4040 mix run --no-halt 啟動新的伺服器,現在我們可以開啟許多並行的 Telnet 用戶端。您還會注意到,退出用戶端不會中斷接受器。太棒了!

以下是完整的回音伺服器實作

defmodule KVServer do
  require Logger

  @doc """
  Starts accepting connections on the given `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

由於我們已變更監控器規格,因此我們需要詢問:我們的監控策略是否仍然正確?

在這種情況下,答案是肯定的:如果接受器發生故障,則不需要中斷現有的連線。另一方面,如果任務監控器發生故障,則不需要也中斷接受器。

不過,仍有一個問題尚未解決,那就是重新啟動策略。預設情況下,任務的 :restart 值設定為 :temporary,這表示它們不會重新啟動。這是透過 Task.Supervisor 啟動的連線的絕佳預設值,因為重新啟動失敗的連線沒有意義,但對於接受器來說卻是一個糟糕的選擇。如果接受器發生故障,我們希望讓接受器重新啟動並執行。

讓我們解決這個問題。我們知道對於形狀為 {Task, fun} 的子代,Elixir 會呼叫 Task.child_spec(fun) 來擷取底層子代規格。因此,有人可能會想像,若要將 {Task, fun} 規格變更為 :restart:permanent,我們需要變更 Task 模組。然而,這是不可能的,因為 Task 模組定義為 Elixir 標準函式庫的一部分(即使有可能,這也不太可能是個好主意)。幸運的是,這可以使用 Supervisor.child_spec/2 來完成,它允許我們使用新值來設定子代規格。讓我們再次在 KVServer.Application 中改寫 start/2

  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || "4040")

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

現在我們有一個持續運作的接收器,它會在持續運作的任務監督器下啟動暫時任務程序。

在下一章,我們將開始剖析客戶端要求並傳送回應,完成我們的伺服器。