檢視原始碼 使用 GenServer 進行用戶端伺服器通訊

前一章節 中,我們使用代理來表示我們的儲存區。在 mix 簡介 中,我們指定我們想要命名每個儲存區,以便我們可以執行下列操作

CREATE shopping
OK

PUT shopping milk 1
OK

GET shopping milk
1
OK

在上述的階段中,我們與「購物」儲存區互動。

由於代理是程序,因此每個儲存區都有程序識別碼 (PID),但儲存區沒有名稱。回到 程序章節 中,我們已經瞭解到,我們可以透過賦予原子名稱來在 Elixir 中註冊程序

iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.43.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1

但是,使用原子來命名動態程序是一個可怕的想法!如果我們使用原子,我們需要將儲存區名稱(通常從外部用戶端接收)轉換為原子,而且 我們絕不應該將使用者輸入轉換為原子。這是因為原子不會被垃圾回收。一旦建立原子,它就永遠不會被回收。從使用者輸入中產生原子表示使用者可以注入足夠不同的名稱來耗盡我們的系統記憶體!

實際上,在記憶體用盡之前,你更有可能達到 Erlang VM 對原子最大數量的限制,無論如何,這都會使你的系統當機。

我們不會濫用內建名稱功能,而是會建立我們自己的 程序登錄檔,將儲存區名稱與儲存區程序關聯起來。

登錄檔需要保證它始終是最新的。例如,如果其中一個儲存區程序因錯誤而崩潰,登錄檔必須注意到這個變更,並避免提供過時的項目。在 Elixir 中,我們說登錄檔需要 監控 每個儲存區。由於我們的 登錄檔 需要能夠接收和處理來自系統的臨時訊息,因此 Agent API 不夠用。

我們將使用 GenServer 來建立一個可以監控儲存區程序的登錄檔程序。GenServer 提供工業級功能,用於在 Elixir 和 OTP 中建構伺服器。

如果您尚未閱讀 GenServer 模組文件,請先閱讀以取得概觀。閱讀完畢後,我們就可以繼續進行了。

GenServer 回呼

GenServer 是一個在特定條件下呼叫有限函數集的程序。當我們使用 Agent 時,我們會將客戶端程式碼和伺服器程式碼並排放置,如下所示

def put(bucket, key, value) do
  Agent.update(bucket, &Map.put(&1, key, value))
end

讓我們將該程式碼分解一下

def put(bucket, key, value) do
  # Here is the client code
  Agent.update(bucket, fn state ->
    # Here is the server code
    Map.put(state, key, value)
  end)
  # Back to the client code
end

在上面的程式碼中,我們有一個程序,我們稱之為「客戶端」,它向代理程式「伺服器」發送請求。請求包含一個匿名函數,必須由伺服器執行。

在 GenServer 中,上面的程式碼將是兩個獨立的函數,大致如下

def put(bucket, key, value) do
  # Send the server a :put "instruction"
  GenServer.call(bucket, {:put, key, value})
end

# Server callback

def handle_call({:put, key, value}, _from, state) do
  {:reply, :ok, Map.put(state, key, value)}
end

GenServer 程式碼中還有更多繁瑣的程式,但正如我們將看到的,它也帶來了一些好處。

現在,我們只會為我們的儲存區註冊邏輯撰寫伺服器回呼,而不會提供適當的 API,我們稍後會執行此操作。

lib/kv/registry.ex 中建立一個新檔案,其內容如下

defmodule KV.Registry do
  use GenServer

  ## Missing Client API - will add this later

  ## Defining GenServer Callbacks

  @impl true
  def init(:ok) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

  @impl true
  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end
end

您可以向 GenServer 發送兩種類型的請求:呼叫和投遞。呼叫是同步的,伺服器必須對此類請求發送回應。當伺服器計算回應時,客戶端正在等待。投遞是異步的:伺服器不會發送回應,因此客戶端不會等待回應。這兩個請求都是傳送給伺服器的訊息,並且將按順序處理。在上述實作中,我們對 :create 訊息進行模式比對,以作為投遞處理,並對 :lookup 訊息進行模式比對,以作為呼叫處理。

為了呼叫上述回呼,我們需要使用對應的 GenServer 函數。讓我們啟動一個註冊表,建立一個命名儲存區,然後查詢它

iex> {:ok, registry} = GenServer.start_link(KV.Registry, :ok)
{:ok, #PID<0.136.0>}
iex> GenServer.cast(registry, {:create, "shopping"})
:ok
iex> {:ok, bk} = GenServer.call(registry, {:lookup, "shopping"})
{:ok, #PID<0.174.0>}

我們的 KV.Registry 程序收到一個帶有 {:create, "shopping"} 的 cast 和一個帶有 {:lookup, "shopping"} 的呼叫,依此順序。一旦訊息傳送給 registryGenServer.cast 將立即回傳。另一方面,GenServer.call 是我們會等待答案的地方,由上述 KV.Registry.handle_call 回呼提供。

你可能也注意到我們在每個回呼之前都加入了 @impl true@impl true 會告知編譯器我們後續函數定義的目的是定義回呼。如果我們在函數名稱或參數數量上不小心出錯,例如我們定義一個 handle_call/2,編譯器會警告我們沒有定義任何 handle_call/2,並提供 GenServer 模組所有已知回呼的完整清單。

這些都很好,但我們仍然想為使用者提供一個 API,讓我們可以隱藏我們的實作細節。

用戶端 API

GenServer 是由兩個部分實作的:用戶端 API 和伺服器回呼。你可以將這兩個部分合併到一個模組中,或者將它們分開成一個用戶端模組和一個伺服器模組。用戶端是呼叫用戶端函數的任何程序。伺服器永遠是我們會明確傳遞給用戶端 API 的程序識別碼或程序名稱。這裡我們將為伺服器回呼和用戶端 API 使用一個模組。

編輯 lib/kv/registry.ex 中的檔案,填入用戶端 API 的空白處

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

第一個函數是 start_link/1,它會啟動一個新的 GenServer,傳遞一個選項清單。 start_link/1 會呼叫 GenServer.start_link/3,它會接收三個參數

  1. 實作伺服器回呼的模組,在本例中是 __MODULE__(表示目前的模組)

  2. 初始化參數,在本例中是原子 :ok

  3. 一個選項清單,可用於指定伺服器名稱等事項。目前,我們將在 start_link/1 上收到的選項清單轉發給 GenServer.start_link/3

接下來的兩個函式 lookup/2create/2,負責將這些要求傳送至伺服器。在這個案例中,我們分別使用了 {:lookup, name}{:create, name}。要求通常指定為像這樣的元組,以便在第一個參數槽中提供多個「參數」。通常將要求的動作指定為元組的第一個元素,而將該動作的參數指定在剩下的元素中。請注意,要求必須與 handle_call/3handle_cast/2 的第一個參數相符。

這就是客戶端 API 的全部內容。在伺服器端,我們可以實作各種回呼,以保證伺服器初始化、終止和要求處理。這些回呼是選用的,目前我們只實作了我們關心的那些。讓我們回顧一下。

第一個是 init/1 回呼,它接收傳給 GenServer.start_link/3 的第二個參數,並傳回 {:ok, state},其中 state 是新的映射。我們已經注意到 GenServer API 如何讓客戶端/伺服器分離更為明顯。 start_link/3 發生在客戶端,而 init/1 是在伺服器上執行的對應回呼。

對於 call/2 要求,我們實作了一個 handle_call/3 回呼,它接收 request、我們從中收到要求的程序 (_from) 和目前的伺服器狀態 (names)。 handle_call/3 回呼傳回格式為 {:reply, reply, new_state} 的元組。元組的第一個元素 :reply 表示伺服器應將回覆傳送回客戶端。第二個元素 reply 是將傳送至客戶端的內容,而第三個元素 new_state 是新的伺服器狀態。

對於 cast/2 要求,我們實作了一個 handle_cast/2 回呼,它接收 request 和目前的伺服器狀態 (names)。 handle_cast/2 回呼傳回格式為 {:noreply, new_state} 的元組。請注意,在實際應用中,我們可能會使用同步呼叫而不是非同步 cast 來實作 :create 的回呼。我們這樣做是為了說明如何實作 cast 回呼。

還有其他元組格式,handle_call/3handle_cast/2 回呼函式可能會傳回。還有其他回呼函式,例如 terminate/2code_change/3,我們可以實作。歡迎您探索完整的 GenServer 文件,以深入了解這些內容。

現在,讓我們撰寫一些測試,以確保我們的 GenServer 能如預期般運作。

測試 GenServer

測試 GenServer 與測試代理程式沒有太大不同。我們將在設定回呼函式中產生伺服器,並在所有測試中使用它。在 test/kv/registry_test.exs 中建立一個檔案,內容如下

defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    registry = start_supervised!(KV.Registry)
    %{registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    assert KV.Registry.lookup(registry, "shopping") == :error

    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end

我們的測試案例首先斷言我們的註冊表中沒有儲存區,建立一個命名儲存區,檢索它,並斷言它的行為就像一個儲存區。

我們為 KV.Registry 編寫的 setup 區塊與為 KV.Bucket 編寫的區塊之間有一個重要的差異。我們沒有透過呼叫 KV.Registry.start_link/1 手動啟動註冊表,而是呼叫 ExUnit.Callbacks.start_supervised!/2 函式,傳遞 KV.Registry 模組。

透過 use ExUnit.Casestart_supervised! 函式注入到我們的測試模組中。它會透過呼叫 start_link/1 函式來啟動 KV.Registry 程序。使用 start_supervised! 的好處是 ExUnit 會保證在開始下一個測試之前關閉註冊表程序。換句話說,它有助於保證一個測試的狀態不會干擾下一個測試,以防它們依賴共用資源。

在測試期間啟動程序時,我們應該始終優先使用 start_supervised!。我們建議您變更 bucket_test.exs 中的 setup 區塊,以使用 start_supervised!

執行測試,它們應該全部通過!

監控需求

到目前為止,我們所做的一切都可以使用 Agent 來實作。在本節中,我們將看到 GenServer 可以實現的許多事情之一,而 Agent 無法實現。

讓我們從一個測試開始,描述當儲存區停止或崩潰時我們希望登錄檔如何運作

test "removes buckets on exit", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  Agent.stop(bucket)
  assert KV.Registry.lookup(registry, "shopping") == :error
end

以上的測試會在最後一個斷言失敗,因為即使我們停止儲存區程序,儲存區名稱仍留在登錄檔中。

為了修復這個錯誤,我們需要登錄檔監控它產生的每個儲存區。一旦我們設定監控,登錄檔就會在每次儲存區程序結束時收到通知,讓我們可以清除登錄檔。

讓我們先透過使用 iex -S mix 啟動新的控制台來使用監控

iex> {:ok, pid} = KV.Bucket.start_link([])
{:ok, #PID<0.66.0>}
iex> Process.monitor(pid)
#Reference<0.0.0.551>
iex> Agent.stop(pid)
:ok
iex> flush()
{:DOWN, #Reference<0.0.0.551>, :process, #PID<0.66.0>, :normal}

請注意 Process.monitor(pid) 會傳回一個唯一的參考,讓我們可以將即將到來的訊息與該監控參考配對。在我們停止代理程式後,我們可以 flush/0 所有訊息,並注意到 :DOWN 訊息已抵達,其參考與 monitor 傳回的參考完全相同,通知儲存區程序已退出,原因為 :normal

讓我們重新實作伺服器回呼以修復錯誤並讓測試通過。首先,我們會將 GenServer 狀態修改為兩個字典:一個包含 name -> pid,另一個包含 ref -> name。然後我們需要在 handle_cast/2 上監控儲存區,並實作 handle_info/2 回呼來處理監控訊息。完整的伺服器回呼實作如下所示

## Server callbacks

@impl true
def init(:ok) do
  names = %{}
  refs = %{}
  {:ok, {names, refs}}
end

@impl true
def handle_call({:lookup, name}, _from, state) do
  {names, _} = state
  {:reply, Map.fetch(names, name), state}
end

@impl true
def handle_cast({:create, name}, {names, refs}) do
  if Map.has_key?(names, name) do
    {:noreply, {names, refs}}
  else
    {:ok, bucket} = KV.Bucket.start_link([])
    ref = Process.monitor(bucket)
    refs = Map.put(refs, ref, name)
    names = Map.put(names, name, bucket)
    {:noreply, {names, refs}}
  end
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  {name, refs} = Map.pop(refs, ref)
  names = Map.delete(names, name)
  {:noreply, {names, refs}}
end

@impl true
def handle_info(msg, state) do
  require Logger
  Logger.debug("Unexpected message in KV.Registry: #{inspect(msg)}")
  {:noreply, state}
end

請注意,我們可以在不變更任何用戶端 API 的情況下大幅變更伺服器實作。這是明確區分伺服器和用戶端的好處之一。

最後,與其他回呼不同,我們已為 handle_info/2 定義一個「萬用」子句,用來捨棄並記錄任何未知訊息。為了了解原因,讓我們繼續下一個區段。

callcastinfo

到目前為止,我們已使用三個回呼:handle_call/3handle_cast/2handle_info/2。以下是我們在決定何時使用每個回呼時應考量的內容

  1. handle_call/3 必須用於同步要求。這應該是預設選項,因為等待伺服器回覆是一個有用的反壓機制。

  2. handle_cast/2 必須用於非同步請求,當您不關心回覆時。投送不保證伺服器已收到訊息,因此應謹慎使用。例如,本章中我們定義的 create/2 函數應使用 call/2。我們已使用 cast/2 作為教學目的。

  3. handle_info/2 必須用於伺服器可能接收到的所有其他訊息,這些訊息並非透過 GenServer.call/2GenServer.cast/2 傳送,包括使用 send/2 傳送的常規訊息。監控 :DOWN 訊息就是一個例子。

由於任何訊息,包括透過 send/2 傳送的訊息,都會傳送至 handle_info/2,因此伺服器可能會收到意外訊息。因此,如果我們未定義萬用子句,這些訊息可能會導致我們的註冊表崩潰,因為沒有子句會匹配。不過,我們不必擔心 handle_call/3handle_cast/2 的此類情況。呼叫和投送僅透過 GenServer API 進行,因此未知訊息很可能是開發人員錯誤。

為了幫助開發人員記住呼叫、投送和資訊之間的差異、支援的回傳值等,我們有一個小型的 GenServer 參考手冊

我們先前已在 程序章節 中瞭解連結。現在,註冊表已完成,您可能會想:我們什麼時候應該使用監控器,什麼時候應該使用連結?

連結是雙向的。如果您連結兩個程序,其中一個崩潰,另一個也會崩潰(除非它正在攔截退出)。監控器是單向的:只有監控程序會收到有關受監控程序的通知。換句話說:當您想要連結崩潰時使用連結,當您只想收到崩潰、退出等的通知時使用監控器。

回到我們的 handle_cast/2 實作,您可以看到註冊表正在連結和監控儲存區

{:ok, bucket} = KV.Bucket.start_link([])
ref = Process.monitor(bucket)

這是一個壞主意,因為我們不希望註冊表在儲存區崩潰時崩潰。正確的修復方法實際上是不將儲存區連結到註冊表。相反地,我們會將每個儲存區連結到一種稱為監控器的特殊程序類型,它們明確設計用於處理故障和崩潰。我們將在下一個章節中瞭解更多關於它們的資訊。