檢視原始碼 使用 ETS 加速

每次我們需要查詢一個儲存區時,我們都需要傳送一則訊息給註冊表。如果我們的註冊表同時被多個程序存取,註冊表可能會變成瓶頸!

在這一章,我們將學習 ETS(Erlang Term Storage)以及如何將它用作快取機制。

警告!不要過早將 ETS 用作快取!記錄並分析您的應用程式效能,找出哪些部分是瓶頸,這樣您就知道是否應該快取,以及應該快取什麼。本章僅是一個範例,說明在您確定需要後,如何使用 ETS。

ETS 作為快取

ETS 讓我們可以將任何 Elixir 項目儲存在記憶體中的表格中。使用 ETS 表格是透過 Erlang 的 :ets 模組

iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.1885502827.460455937.234656>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]

在建立 ETS 表格時,需要兩個引數:表格名稱和一組選項。從可用選項中,我們傳遞了表格類型及其存取規則。我們選擇了 :set 類型,這表示鍵值不能重複。我們也將表格的存取設定為 :protected,表示只有建立表格的程序可以寫入,但所有程序都可以讀取。可能的存取控制

:public — 所有程序都可以讀取/寫入。

:protected — 所有程序都可以讀取。只有擁有者程序可以寫入。這是預設值。

:private — 只有擁有者程序可以讀取/寫入。

請注意,如果您的讀取/寫入呼叫違反了存取控制,操作會引發 ArgumentError。最後,由於 :set:protected 是預設值,我們從現在開始將略過它們。

ETS 表格也可以命名,讓我們可以使用給定的名稱存取它們

iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]

讓我們變更 KV.Registry 以使用 ETS 表格。第一個變更是要修改我們的註冊表以需要一個名稱引數,我們會使用它來命名 ETS 表格和註冊表程序本身。ETS 名稱和程序名稱儲存在不同的位置,因此不會有衝突。

開啟 lib/kv/registry.ex,並讓我們變更其實作。我們已在原始碼中加入註解,以突顯我們所做的變更

defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry with the given options.

  `:name` is always required.
  """
  def start_link(opts) do
    # 1. Pass the name to GenServer's init
    server = Keyword.fetch!(opts, :name)
    GenServer.start_link(__MODULE__, server, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # 2. Lookup is now done directly in ETS, without accessing the server
    case :ets.lookup(server, name) do
      [{^name, pid}] -> {:ok, pid}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  @impl true
  def init(table) do
    # 3. We have replaced the names map by the ETS table
    names = :ets.new(table, [:named_table, read_concurrency: true])
    refs  = %{}
    {:ok, {names, refs}}
  end

  # 4. The previous handle_call callback for lookup was removed

  @impl true
  def handle_cast({:create, name}, {names, refs}) do
    # 5. Read and write to the ETS table instead of the map
    case lookup(names, name) do
      {:ok, _pid} ->
        {:noreply, {names, refs}}

      :error ->
        {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
        ref = Process.monitor(pid)
        refs = Map.put(refs, ref, name)
        :ets.insert(names, {name, pid})
        {:noreply, {names, refs}}
    end
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # 6. Delete from the ETS table instead of the map
    {name, refs} = Map.pop(refs, ref)
    :ets.delete(names, name)
    {:noreply, {names, refs}}
  end

  @impl true
  def handle_info(_msg, state) do
    {:noreply, state}
  end
end

請注意,在我們變更之前,KV.Registry.lookup/2 會將要求傳送給伺服器,但現在它會直接從 ETS 表格中讀取,而此表格會在所有程序間共用。這是我們正在實作的快取機制的背後主旨。

為了讓快取機制運作,所建立的 ETS 表格需要有 :protected(預設值)存取權,因此所有用戶端都能從中讀取,而只有 KV.Registry 程序會寫入其中。我們在啟動表格時也設定 read_concurrency: true,針對並發讀取作業的常見情況最佳化表格。

我們在上方所做的變更已中斷我們的測試,因為註冊表在啟動時需要 :name 選項。此外,某些註冊表作業(例如 lookup/2)需要將名稱作為引數提供,而不是 PID,因此我們可以執行 ETS 表格查詢。讓我們變更 test/kv/registry_test.exs 中的設定函式,以修正這兩個問題

  setup context do
    _ = start_supervised!({KV.Registry, name: context.test})
    %{registry: context.test}
  end

由於每個測試都有獨一無二的名稱,我們使用測試名稱來命名我們的註冊表。這樣一來,我們不再需要傳遞註冊表 PID,而是透過測試名稱來識別它。另請注意,我們將 start_supervised! 的結果指定給底線 (_)。此慣用語通常用於表示我們對 start_supervised! 的結果不感興趣。

一旦我們變更 setup,某些測試仍會繼續失敗。您甚至會注意到測試在執行之間傳遞時會不一致地通過和失敗。例如,「spawns buckets」測試

test "spawns buckets", %{registry: registry} do
  assert KV.Registry.lookup(registry, "shopping") == :error

  KV.Registry.create(registry, "shopping")
  assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

  KV.Bucket.put(bucket, "milk", 1)
  assert KV.Bucket.get(bucket, "milk") == 1
end

可能會在此行失敗

{:ok, bucket} = KV.Registry.lookup(registry, "shopping")

如果我們剛在前一行建立儲存區,這行怎麼會失敗?

這些失敗發生的原因是因為,出於教學目的,我們犯了兩個錯誤

  1. 我們過早進行最佳化(透過新增此快取層)
  2. 我們使用 cast/2(而我們應該使用 call/2

競爭條件?

使用 Elixir 進行開發並不會讓您的程式碼免於競爭條件。然而,Elixir 的抽象化(預設情況下不共用任何項目)讓找出競爭條件的根本原因變得更加容易。

在我們的測試中發生的事情是,在操作和我們可以在 ETS 表中觀察到此變更的時間之間存在延遲。以下是我們預期會發生的事情

  1. 我們呼叫 KV.Registry.create(registry, "shopping")
  2. 註冊表會建立儲存區並更新快取表
  3. 我們使用 KV.Registry.lookup(registry, "shopping") 從表中存取資訊
  4. 上述指令會傳回 {:ok, bucket}

然而,由於 KV.Registry.create/2 是傳送操作,因此指令會在我們實際寫入表之前傳回!換句話說,會發生以下情況

  1. 我們呼叫 KV.Registry.create(registry, "shopping")
  2. 我們使用 KV.Registry.lookup(registry, "shopping") 從表中存取資訊
  3. 上述指令會傳回 :error
  4. 註冊表會建立儲存區並更新快取表

若要修正失敗,我們需要使用 call/2 而不是 cast/2,讓 KV.Registry.create/2 同步。這將保證客戶端僅在對表進行變更後才會繼續。讓我們回到 lib/kv/registry.ex 並變更函數及其回呼,如下所示

def create(server, name) do
  GenServer.call(server, {:create, name})
end
@impl true
def handle_call({:create, name}, _from, {names, refs}) do
  case lookup(names, name) do
    {:ok, pid} ->
      {:reply, pid, {names, refs}}

    :error ->
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      :ets.insert(names, {name, pid})
      {:reply, pid, {names, refs}}
  end
end

我們將回呼從 handle_cast/2 變更為 handle_call/3,並將其變更為以建立的儲存區的 PID 回覆。一般來說,Elixir 開發人員偏好使用 call/2 而不是 cast/2,因為它也提供反壓力,您會封鎖直到收到回覆。在不必要時使用 cast/2 也可視為過早最佳化。

讓我們再次執行測試。不過,這次我們將傳遞 --trace 選項

$ mix test --trace

當您的測試發生死結或有競爭條件時,--trace 選項很有用,因為它會同步執行所有測試(async: true 無效),並顯示每個測試的詳細資訊。如果您多次執行測試,您可能會看到此間歇性失敗

  1) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:19
     Assertion with == failed
     code:  assert KV.Registry.lookup(registry, "shopping") == :error
     left:  {:ok, #PID<0.109.0>}
     right: :error
     stacktrace:
       test/kv/registry_test.exs:23

根據失敗訊息,我們預期儲存區不再存在於表中,但它仍然存在!這個問題與我們剛剛解決的問題相反:雖然先前在建立儲存區和更新表的指令之間有延遲,但現在在儲存區程序終止和從表中移除其條目之間有延遲。由於這是競爭條件,您可能無法在您的機器上重現它,但它確實存在。

上次我們透過將非同步操作,一個 cast,替換為同步的 call 來修復競爭條件。不幸的是,我們用於接收 :DOWN 訊息並從 ETS 表格中刪除條目的 handle_info/2 回呼沒有同步的等效項。這次,我們需要找出一個方法來保證註冊表已處理在 bucket 程序終止時傳送的 :DOWN 通知。

一個簡單的方法是在我們進行 bucket 查詢之前,傳送一個同步請求到註冊表。Agent.stop/2 操作是同步的,並且只會在 bucket 程序終止後才回傳。因此,一旦 Agent.stop/2 回傳,註冊表已收到 :DOWN 訊息,但它可能尚未處理它。為了保證處理 :DOWN 訊息,我們可以進行一個同步請求。由於訊息會依序處理,一旦註冊表回覆同步請求,那麼 :DOWN 訊息肯定已經被處理了。

讓我們在 test/kv/registry_test.exs 中的兩個「移除」測試中,在 Agent.stop/2 之後,透過建立一個同步請求的「虛假」bucket 來這麼做

  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Stop the bucket with non-normal reason
    Agent.stop(bucket, :shutdown)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

我們的測試現在(總是)應該會通過!

這結束了我們的最佳化章節。我們已將 ETS 用作快取機制,其中讀取可以從任何程序發生,但寫入仍透過單一程序序列化。更重要的是,我們還了解到,一旦資料可以非同步讀取,我們需要知道它可能會引發的競爭條件。

在實務上,如果你發現自己需要一個動態程序的註冊表,你應該使用 Elixir 提供的 Registry 模組。它提供與我們使用 GenServer + :ets 建立的類似功能,同時也能夠同時執行寫入和讀取。它已被基準測試,即使在有 40 個核心的機器上也能在所有核心上擴充

接下來,讓我們討論外部和內部依賴項,以及 Mix 如何幫助我們管理大型程式碼庫。