每次我们需要查找存储容器时,我们都需要向注册表发送一条消息。如果我们的注册表被多个进程同时访问,注册表可能会成为瓶颈!
在本章中,我们将了解 ETS(Erlang Term Storage)以及如何将其用作缓存机制。
警告!不要过早地将 ETS 用作缓存!记录并分析应用程序性能并确定哪些部分是瓶颈,这样您就知道是否应该缓存以及应该缓存什么。本章仅仅是一个示例,说明一旦您确定了需求,就可以如何使用 ETS。
ETS 作为缓存
ETS 允许我们将任何 Elixir 术语存储在内存表中。使用 ETS 表是通过 Erlang 的 :ets 模块完成的:
创建 ETS 表时,需要两个参数:表名和一组选项。从可用选项中,我们传递了表类型及其访问规则。我们选择了 :set 类型,这意味着键不能重复。我们还将表的访问权限设置为 :protected,这意味着只有创建表的进程可以写入它,但所有进程都可以从中读取。可能的访问控制:
:public — 所有进程都可以读取/写入。
:protected — 所有进程都可以读取。只有所有者进程可以写入。这是默认值。
:private — 仅限于所有者进程读取/写入。
请注意,如果您的读取/写入调用违反了访问控制,则操作将引发 ArgumentError。最后,由于 :set 和 :protected 是默认值,因此我们将从现在开始跳过它们。
ETS 表也可以命名,这样我们就可以通过给定的名称来访问它们:
让我们将 KV.Registry 更改为使用 ETS 表。第一个更改是修改我们的注册表以要求使用名称参数,我们将使用它来命名 ETS 表和注册表进程本身。ETS 名称和进程名称存储在不同的位置,因此不会发生冲突。
打开 lib/kv/registry.ex,让我们更改其实现。我们在源代码中添加了注释以突出显示我们所做的更改:
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry with the given options.
`:name` is always required.
"""
def start_link(opts) do
# 1. Pass the name to GenServer's init
server = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, server, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
# 2. Lookup is now done directly in ETS, without accessing the server
case :ets.lookup(server, name) do
[{^name, pid}] -> {:ok, pid}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
@impl true
def init(table) do
# 3. We have replaced the names map by the ETS table
names = :ets.new(table, [:named_table, read_concurrency: true])
refs = %{}
{:ok, {names, refs}}
end
# 4. The previous handle_call callback for lookup was removed
@impl true
def handle_cast({:create, name}, {names, refs}) do
# 5. Read and write to the ETS table instead of the map
case lookup(names, name) do
{:ok, _pid} ->
{:noreply, {names, refs}}
:error ->
{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
{:noreply, {names, refs}}
end
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
# 6. Delete from the ETS table instead of the map
{name, refs} = Map.pop(refs, ref)
:ets.delete(names, name)
{:noreply, {names, refs}}
end
@impl true
def handle_info(_msg, state) do
{:noreply, state}
end
end
请注意,在我们进行更改之前,KV.Registry.lookup/2 会向服务器发送请求,但现在它直接从 ETS 表中读取,该表在所有进程之间共享。这就是我们正在实施的缓存机制背后的主要思想。
为了使缓存机制发挥作用,创建的 ETS 表需要具有访问权限 :protected(默认值),因此所有客户端都可以从中读取,而只有 KV.Registry 进程可以写入它。我们还在启动表时设置了 read_concurrency: true,针对并发读取操作的常见场景优化了表。
我们上面执行的更改破坏了我们的测试,因为注册表在启动时需要 :name 选项。此外,某些注册表操作(例如 lookup/2)需要将名称作为参数而不是 PID 给出,因此我们可以进行 ETS 表查找。让我们更改 test/kv/registry_test.exs 中的设置函数以修复这两个问题:
由于每个测试都有一个唯一的名称,我们使用测试名称来命名我们的注册表。这样,我们不再需要传递注册表 PID,而是通过测试名称来识别它。还请注意,我们将 start_supervised! 的结果分配给下划线 (_)。这个习语通常用于表示我们对 start_supervised! 的结果不感兴趣。
一旦我们更改设置,某些测试将继续失败。您甚至可能会注意到测试在运行之间不一致地通过和失败。例如,“生成存储容器”测试:
可能在这一行失败:
如果我们刚刚在上一行创建了 bucket,这一行怎么会失败呢?
失败的原因在于,出于教学目的,我们犯了两个错误:
我们过早地进行了优化(通过添加这个缓存层)
我们使用 cast/2(而我们应该使用 call/2)
竞争条件?
使用 Elixir 进行开发并不能让您的代码摆脱竞争条件。但是,Elixir 的抽象(默认情况下不共享任何内容)使发现竞争条件的根本原因变得更加容易。
在我们的测试中,操作和我们可以在 ETS 表中观察到此更改的时间之间存在延迟。以下是我们预期会发生的情况:
1.我们调用 KV.Registry.create(registry, "shopping")
2.注册表创建存储桶并更新缓存表
3.我们使用 KV.Registry.lookup(registry, "shopping") 从表中访问信息
4.上面的命令返回 {:ok, bucket}
但是,由于 KV.Registry.create/2 是一个强制转换操作,因此该命令将在我们实际写入表之前返回!换句话说,发生了以下情况:
1.我们调用 KV.Registry.create(registry, "shopping")
2.我们使用 KV.Registry.lookup(registry, "shopping") 从表中访问信息
3.上面的命令返回 :error
4.注册表创建存储容器并更新缓存表
要修复故障,我们需要使用 call/2 而不是 cast/2 使 KV.Registry.create/2 同步。这将保证客户端仅在对表进行更改后才能继续。让我们回到 lib/kv/registry.ex 并更改函数及其回调,如下所示:
我们将回调从 handle_cast/2 更改为 handle_call/3,并将其更改为使用所创建存储容器的 PID 进行回复。一般来说,Elixir 开发人员更喜欢使用 call/2 而不是 cast/2,因为它也提供背压 — 您会阻塞直到收到回复。在不必要时使用 cast/2 也可以被视为过早优化。
让我们再次运行测试。不过这次,我们将传递 --trace 选项:
当您的测试出现死锁或存在竞争条件时,--trace 选项非常有用,因为它会同步运行所有测试(async: true 无效)并显示有关每个测试的详细信息。如果您多次运行测试,您可能会看到此间歇性故障:
根据失败消息,我们预期存储容器不再存在于表中,但它仍然存在!这个问题与我们刚刚解决的问题相反:虽然以前创建存储容器的命令和更新表之间存在延迟,但现在存储容器进程终止和其条目从表中删除之间存在延迟。由于这是一种竞争条件,您可能无法在您的机器上重现它,但它确实存在。
上次我们通过将异步操作(强制转换)替换为同步调用来修复竞争条件。不幸的是,我们用于接收 :DOWN 消息并从 ETS 表中删除条目的 handle_info/2 回调没有同步等效项。这次,我们需要找到一种方法来保证注册表已处理存储容器进程终止时发送的 :DOWN 通知。
一个简单的方法是在执行存储容器查找之前向注册表发送同步请求。 Agent.stop/2 操作是同步的,仅在存储容器进程终止后返回。因此,一旦 Agent.stop/2 返回,注册表就收到了 :DOWN 消息,但可能尚未处理该消息。为了保证 :DOWN 消息的处理,我们可以执行同步请求。由于消息是按顺序处理的,因此一旦注册表回复同步请求,则 :DOWN 消息肯定已经处理完毕。
让我们通过在 test/kv/registry_test.exs 的两个“删除”测试中的 Agent.stop/2 之后创建一个“虚假”存储容器(即同步请求)来实现此目的:
我们的测试现在应该(总是)通过!
这就是我们的优化章节。我们使用 ETS 作为缓存机制,其中读取可以从任何进程发生,但写入仍然通过单个进程序列化。更重要的是,我们还了解到,一旦可以异步读取数据,我们就需要注意它可能引入的竞争条件。
在实践中,如果您发现自己需要动态进程的注册表,则应使用 Elixir 提供的 Registry 模块。它提供的功能类似于我们使用 GenServer + :ets 构建的功能,同时还能够同时执行写入和读取。它已经过基准测试,即使在具有 40 个核心的机器上也可以扩展到所有核心。
接下来,让我们讨论外部和内部依赖关系以及 Mix 如何帮助我们管理大型代码库。