开源应用程序架构(卷 1)
Riak 和 Erlang/OTP

Francesco Cesarini、Andy Gross 和 Justin Sheehy

Riak 是一款分布式、容错的开源数据库,它展示了如何使用 Erlang/OTP 构建大型系统。在很大程度上得益于 Erlang 对大规模可扩展分布式系统的支持,Riak 提供了数据库中不常见的特性,例如高可用性和容量和吞吐量的线性可扩展性。

Erlang/OTP 为开发像 Riak 这样的系统提供了理想的平台,因为它开箱即用地提供了节点间通信、消息队列、故障检测器和客户端-服务器抽象。更重要的是,Erlang 中最常用的模式已在库模块中实现,通常称为 OTP 行为。它们包含用于并发和错误处理的通用代码框架,简化了并发编程并保护开发人员免受许多常见的陷阱。行为由主管监控,主管本身也是一种行为,并分组在主管树中。主管树被打包在一个应用程序中,创建了 Erlang 程序的构建块。

像 Riak 这样的完整 Erlang 系统是一组松散耦合的应用程序,它们相互交互。其中一些应用程序是由开发人员编写的,一些是标准 Erlang/OTP 发行版的一部分,另一些可能是其他开源组件。它们按顺序由从应用程序和版本列表生成的引导脚本加载和启动。

启动的发布中包含的应用程序不同。在标准 Erlang 发行版中,引导文件将启动KernelStdLib(标准库)应用程序。在某些安装中,还会启动SASL(系统体系结构支持库)应用程序。SASL 包含发布和软件升级工具以及日志记录功能。Riak 没什么不同,只是它也会启动 Riak 特定的应用程序以及它们的运行时依赖项,包括KernelStdLibSASL。Riak 的完整且可运行的构建实际上嵌入这些 Erlang/OTP 发行版的标准元素,并在命令行上调用riak start 时同时启动它们。Riak 包含许多复杂的应用程序,因此本章不应被解释为完整指南。它应该被视为 OTP 的介绍,其中使用了来自 Riak 源代码的示例。出于演示目的,图和示例已被缩短和简化。

15.1. Erlang 简介

Erlang 是一种并发函数式编程语言,它编译成字节码并在虚拟机中运行。程序由相互调用的函数组成,通常会导致副作用,例如进程间消息传递、I/O 和数据库操作。Erlang 变量是单赋值的,即一旦它们被赋予了值,就不能再更新了。该语言广泛使用模式匹配,如下面的阶乘示例所示

-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
   Prev = fac(N-1),
   N*Prev.

这里,第一个子句给出零的阶乘,第二个子句给出正数的阶乘。每个子句的主体是表达式序列,主体中的最后一个表达式是该子句的结果。用负数调用函数会导致运行时错误,因为没有子句匹配。不处理这种情况是非防御性编程的一个例子,这种做法在 Erlang 中是鼓励的。

在模块内,函数按通常的方式调用;在模块外,模块名称作为前缀,例如factorial:fac(3)。可以定义具有相同名称但参数数量不同的函数——这称为它们的元数。在factorial 模块的导出指令中,元数为一的fac 函数用fac/1 表示。

Erlang 支持元组(也称为乘积类型)和列表。元组用花括号括起来,例如{ok,37}。在元组中,我们通过位置访问元素。记录是另一种数据类型;它们允许我们存储固定数量的元素,然后通过名称访问和操作这些元素。我们使用-record(state, {id, msg_list=[]}). 定义一个记录。要创建一个实例,我们使用表达式Var = #state{id=1},并使用Var#state.id 检查其内容。对于可变数量的元素,我们使用方括号定义的列表,例如{[}23,34{]}{[}X|Xs{]} 表示法匹配一个非空列表,其中头部为X,尾部为Xs。以小写字母开头的标识符表示原子,它们只是代表自身;元组{ok,37} 中的ok 是原子的一个例子。以这种方式使用的原子通常用于区分不同类型的函数结果:除了ok 结果之外,可能还有{error, "Error String"} 形式的结果。

Erlang 系统中的进程在独立的内存中并发运行,并通过消息传递相互通信。进程可以用于大量应用程序,包括数据库网关、协议栈处理程序以及管理来自其他进程的跟踪消息的日志记录。虽然这些进程处理不同的请求,但这些请求的处理方式会有相似之处。

由于进程只存在于虚拟机中,因此单个 VM 可以同时运行数百万个进程,这是 Riak 大量利用的功能。例如,对数据库的每个请求——读取、写入和删除——都被建模为一个单独的进程,这是一种在大多数操作系统级线程实现中不可能实现的方法。

进程由进程标识符(称为 PID)标识,但它们也可以在别名下注册;这仅应用于长寿命的“静态”进程。用别名注册进程允许其他进程在不知道其 PID 的情况下向它发送消息。使用内置函数 (BIF) spawn(Module, Function, Arguments) 创建进程。BIF 是集成在 VM 中的函数,用于执行在纯 Erlang 中不可能或执行速度慢的操作。spawn/3 BIF 以ModuleFunctionArguments 列表作为参数。调用返回新生成的进程的 PID,并作为副作用,创建一个新进程,该进程开始在模块中使用前面提到的参数执行该函数。

使用Pid ! Msg 将消息Msg 发送到进程 ID 为Pid 的进程。进程可以通过调用 BIFself 找出自己的 PID,然后可以将其发送到其他进程,供它们用来与原始进程通信。假设一个进程期望接收{ok, N}{error, Reason} 形式的消息。要处理这些消息,它使用 receive 语句

receive
   {ok, N} ->
      N+1;
   {error, _} ->
      0
end

这样做的结果是一个由模式匹配子句确定的数字。当变量的值在模式匹配中不需要时,可以使用下划线通配符,如上所示。

进程之间的消息传递是异步的,进程接收到的消息按到达顺序放置在进程的邮箱中。假设现在要执行上面的receive 表达式:如果邮箱中的第一个元素是{ok, N}{error, Reason},则将返回相应的結果。如果邮箱中的第一条消息不属于这种形式,则将其保留在邮箱中,并以类似的方式处理第二条消息。如果没有消息匹配,receive 将等待收到匹配的消息。

进程因两个原因而终止。如果不再有代码要执行,则称其以正常原因终止。如果进程遇到运行时错误,则称其以非正常原因终止。除非进程与其链接,否则进程终止不会影响其他进程。进程可以通过link(Pid) BIF 或在调用spawn_link(Module, Function, Arguments) 时相互链接。如果一个进程终止,它会向其链接集中的进程发送 EXIT 信号。如果终止原因是非正常的,进程会自行终止,并进一步传播 EXIT 信号。通过调用process_flag(trap_exit, true) BIF,进程可以将 EXIT 信号作为 Erlang 消息接收在其邮箱中,而不是终止。

Riak 使用 EXIT 信号来监控执行由请求驱动的有限状态机启动的非关键工作的辅助进程的运行状况。当这些辅助进程异常终止时,EXIT 信号允许父进程忽略错误或重新启动进程。

15.2. 进程骨架

我们之前介绍了进程遵循通用模式的概念,无论创建该进程的特定目的如何。首先,进程必须被生成,然后可以选择注册其别名。新生成的进程的第一个操作是初始化进程循环数据。循环数据通常是传递给spawn 内置函数的生成进程时的参数的结果。它的循环数据存储在一个我们称为进程状态的变量中。状态通常存储在一个记录中,它被传递给一个 receive-evaluate 函数,该函数运行一个循环,该循环接收消息、处理消息、更新状态,并将状态作为参数传递回一个尾递归调用。如果它处理的消息之一是stop 消息,则接收进程将清理自身,然后终止。

这是进程中一个反复出现的主题,无论分配给进程的任务是什么,它都会发生。考虑到这一点,让我们看看符合这种模式的进程之间的区别

因此,即使存在一个通用操作的骨架,这些操作也会辅之以与分配给进程的任务直接相关的特定操作。使用这个骨架作为模板,程序员可以创建充当服务器、有限状态机、事件处理程序和主管的 Erlang 进程。但是,与其每次都重新实现这些模式,不如将它们放在称为行为的库模块中。它们作为 OTP 中间件的一部分提供。

15.3. OTP 行为

致力于 Riak 的核心开发团队遍布近十个地理位置。如果没有非常紧密的协调和模板可供参考,结果将是不同的客户端/服务器实现,它们无法处理特殊的边界情况和并发相关的错误。可能没有统一的方式来处理客户端和服务器崩溃,或者保证来自请求的响应确实是响应,而不仅仅是符合内部消息协议的任何消息。

OTP 是一组 Erlang 库和设计原则,提供现成的工具来开发健壮的系统。许多这些模式和库以“行为”的形式提供。

OTP 行为通过提供实现最常见并发设计模式的库模块来解决这些问题。在幕后,无需程序员知晓,库模块确保以一致的方式处理错误和特殊情况。因此,OTP 行为提供了一组标准化的构建块,用于设计和构建工业级系统。

15.3.1. 简介

OTP 行为作为库模块提供,位于 stdlib 应用程序中,该应用程序是 Erlang/OTP 分发的一部分。程序员编写的特定代码放置在单独的模块中,并通过一组预定义的回调函数调用,这些回调函数针对每个行为进行标准化。此回调模块将包含提供所需功能所需的所有特定代码。

OTP 行为包括执行实际处理的 worker 进程和负责监控 worker 和其他 supervisor 的 supervisor。worker 行为(通常在图中表示为圆圈)包括服务器、事件处理程序和有限状态机。supervisor(在插图中表示为方框)监控其子进程(worker 和其他 supervisor),从而创建所谓的监督树。

[OTP Riak Supervision Tree]

图 15.1:OTP Riak 监督树

监督树打包成一个称为应用程序的行为。OTP 应用程序不仅是 Erlang 系统的构建块,也是打包可重用组件的一种方式。工业级系统(如 Riak)由一组松散耦合的、可能是分布式的应用程序组成。其中一些应用程序是标准 Erlang 分发的一部分,而另一些则是构成 Riak 特定功能的组成部分。

OTP 应用程序的示例包括 Corba ORB 或简单网络管理协议 (SNMP) 代理。OTP 应用程序是一个可重用组件,它将库模块与 supervisor 和 worker 进程打包在一起。从现在开始,当我们提到应用程序时,我们指的是 OTP 应用程序。

行为模块包含每个给定行为类型的通用代码。虽然可以实现自己的行为模块,但这样做很少见,因为 Erlang/OTP 分发附带的模块将满足您代码中使用的大多数设计模式。行为模块中提供的通用功能包括以下操作:

循环数据是一个变量,它将包含行为在调用之间需要存储的数据。调用后,将返回循环数据的更新变体。此更新的循环数据(通常称为新的循环数据)作为参数传递给下一次调用。循环数据也常被称为行为状态。

要包含在回调模块中以供通用服务器应用程序提供所需特定行为的功能包括以下内容:

15.3.2. 通用服务器

实现客户端/服务器行为的通用服务器定义在 gen_server 行为中,该行为是标准库应用程序的一部分。在解释通用服务器时,我们将使用来自 riak_core 应用程序的 riak_core_node_watcher.erl 模块。它是一个服务器,用于跟踪和报告 Riak 集群中的哪些子服务和节点可用。模块头和指令如下:

-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
         services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
                bcast_mod={gen_server, abcast}}).

我们可以很容易地通过 -behavior(gen_server). 指令识别通用服务器。此指令用于编译器,以确保所有回调函数都被正确导出。记录状态用于服务器循环数据中。

15.3.3. 启动您的服务器

使用 gen_server 行为,您将使用 gen_server:startgen_server:start_link 函数,而不是使用 spawnspawn_link BIF。spawnstart 之间的主要区别在于调用的同步性质。使用 start 而不是 spawn 使启动 worker 进程更加确定性,并防止不可预见的数据竞争,因为在 worker 初始化之前,调用将不会返回 worker 的 PID。您可以使用以下任一方式调用这些函数:

gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)

ServerName 是一个格式为 {local, Name}{global, Name} 的元组,表示进程别名的本地或全局 Name(如果要注册它)。全局名称允许服务器在分布式 Erlang 节点的集群中透明地访问。如果您不想注册进程,而是使用其 PID 引用它,则省略该参数并使用 start_link/3start/3 函数调用。CallbackModule 是放置特定回调函数的模块的名称,Arguments 是一个有效的 Erlang 术语,传递给 init/1 回调函数,而 Options 是一个列表,允许您设置内存管理标志 fullsweep_afterheapsize,以及其他跟踪和调试标志。

在我们的示例中,我们调用 start_link/4,使用 ?MODULE 宏调用将进程注册为与回调模块相同的名称。此宏在编译代码时由预处理器扩展为定义它的模块的名称。始终建议将您的行为命名为与其实现的回调模块相同的别名。我们不传递任何参数,因此只发送空列表。选项列表保持为空

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start_linkstart 函数之间的明显区别在于 start_link 与其父进程(通常是 supervisor)链接,而 start 不链接。这需要特别说明,因为它是 OTP 行为负责与 supervisor 链接。当从 shell 测试行为时,start 函数通常使用,因为导致 shell 进程崩溃的类型错误不会影响行为。startstart_link 函数的所有变体都返回 {ok, Pid}

startstart_link 函数将生成一个新进程,该进程将调用 CallbackModule 中的 init(Arguments) 回调函数,并提供 Argumentsinit 函数必须初始化服务器的 LoopData,并且必须返回一个格式为 {ok, LoopData} 的元组。LoopData 包含将传递给回调函数的循环数据的第一个实例。如果您想存储传递给 init 函数的一些参数,您将在 LoopData 变量中执行此操作。Riak 节点监视器服务器中的 LoopData 是使用类型为 state 的记录调用 schedule_broadcast/1 的结果,其中字段设置为默认值

init([]) ->

    %% Watch for node up/down events
    net_kernel:monitor_nodes(true),

    %% Setup ETS table to track node status
    ets:new(?MODULE, [protected, named_table]),

    {ok, schedule_broadcast(#state{})}.

虽然 supervisor 进程可能调用 start_link/4 函数,但另一个进程调用 init/1 回调函数:刚刚生成的进程。由于此服务器的目的是通知、记录和广播 Riak 中子服务的可用性,因此初始化请求 Erlang 运行时通知它此类事件,并设置一个表来存储此信息。这需要在初始化期间完成,因为如果该结构尚不存在,则对服务器的所有调用都将失败。只执行必要的操作,并最大程度地减少 init 函数中的操作,因为对 init 的调用是同步调用,它会阻止所有其他序列化进程启动,直到它返回为止。

15.3.4. 传递消息

如果您想向服务器发送同步消息,请使用 gen_server:call/2 函数。异步调用使用 gen_server:cast/2 函数进行。让我们从 Riak 的服务 API 中获取两个函数开始;我们将在稍后提供其余代码。它们由客户端进程调用,并导致向与回调模块相同名称注册的服务器进程发送同步消息。请注意,应在客户端进行发送到服务器的数据的验证。如果客户端发送不正确的信息,服务器应终止。

service_up(Id, Pid) ->
    gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) ->
    gen_server:call(?MODULE, {service_down, Id}).

收到消息后,gen_server 进程会调用 handle_call/3 回调函数,以按发送顺序处理消息

handle_call({service_up, Id, Pid}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:add_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Setup a monitor for the Pid representing this service
    Mref = erlang:monitor(process, Pid),
    erlang:put(Mref, Id),
    erlang:put(Id, Mref),

    %% Update our local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:del_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Update local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

请注意回调函数的返回值。元组包含控制原子 reply,告诉 gen_server 通用代码元组的第二个元素(在这两种情况下都是原子 ok)是发送回客户端的回复。元组的第三个元素是新的 State,它在服务器的新迭代中作为第三个参数传递给 handle_call/3 函数;在这两种情况下,它都更新为反映新的可用服务集。参数 _From 是一个元组,包含唯一的邮件引用和客户端进程标识符。该元组作为一个整体在库函数中使用,我们将在本章中不讨论这些函数。在大多数情况下,您不需要它。

gen_server 库模块内置了许多机制和保护措施,它们在幕后运行。如果您的客户端向您的服务器发送同步消息,并且您在五秒钟内没有收到响应,则执行 call/2 函数的进程将被终止。您可以使用 gen_server:call(Name, Message, Timeout) 覆盖此操作,其中 Timeout 是以毫秒为单位的值或原子 infinity

超时机制最初是为了防止死锁而设置的,确保意外互相调用的服务器在默认超时后被终止。崩溃报告将被记录,并有望导致错误被调试和修复。大多数应用程序在五秒钟的超时下都能正常运行,但在负载非常重的情况下,您可能需要微调该值,甚至可能使用 infinity;此选择取决于应用程序。Erlang/OTP 中的所有关键代码都使用 infinity。Riak 中的各个地方对超时使用不同的值:infinity 在内部耦合的部分之间很常见,而 Timeout 是根据用户传递的参数设置的,在客户端代码与 Riak 交谈的情况下,该代码指定了操作应该被允许超时。

使用gen_server:call/2函数时,还有一些其他安全措施需要考虑,例如向不存在的服务器发送消息以及服务器在发送回复之前崩溃的情况。在这两种情况下,调用进程都将终止。在原始 Erlang 中,发送在接收子句中从未匹配模式的消息是一个错误,会导致内存泄漏。Riak 使用两种不同的策略来缓解这种情况,这两种策略都涉及“万能”匹配子句。在消息可能由用户发起的那些地方,未匹配的消息可能会被静默丢弃。在那些消息只能来自 Riak 内部的地方,它代表一个错误,因此将用于触发错误警报的内部崩溃报告,重新启动接收它的工作进程。

发送异步消息的工作方式类似。消息被异步发送到通用服务器,并在handle_cast/2回调函数中处理。该函数必须返回格式为{reply, NewState}的元组。当我们对服务器的请求不感兴趣并且不担心生成超过服务器所能消费的消息时,会使用异步调用。在那些我们对响应不感兴趣但想等待消息被处理后再发送下一个请求的情况下,我们会使用gen_server:call/2,在回复中返回原子ok。想象一个进程以比 Riak 可以消费的更快的速度生成数据库条目。通过使用异步调用,我们有可能会填满进程邮箱,导致节点内存不足。Riak 使用同步gen_server调用的消息序列化属性来调节负载,只有在处理完上一个请求后才会处理下一个请求。这种方法消除了对更复杂节流代码的需求:除了启用并发之外,gen_server进程还可以用来引入序列化点。

15.3.5. 停止服务器

如何停止服务器?在您的handle_call/3handle_cast/2回调函数中,您不是返回{reply, Reply, NewState}{noreply, NewState},而是返回{stop, Reason, Reply, NewState}{stop, Reason, NewState},分别。一些东西必须触发这个返回值,通常是发送到服务器的停止消息。收到包含ReasonState的停止元组后,通用代码将执行terminate(Reason, State)回调。

terminate函数是插入清理服务器State和系统使用的任何其他持久数据所需的代码的自然位置。在我们的示例中,我们会向我们的对等节点发送最后一条消息,以便他们知道这个节点监视器不再处于运行状态并进行监视。在这个示例中,变量State包含一个带有字段statuspeers的记录。

terminate(_Reason, State) ->
    %% Let our peers know that we are shutting down
    broadcast(State#state.peers, State#state { status = down }).

将行为回调用作库函数并从程序的其他部分调用它们是一种非常糟糕的做法。例如,您永远不应该从另一个模块调用riak_core_node_watcher:init(Args)来检索初始循环数据。此类检索应通过对服务器的同步调用来完成。对行为回调函数的调用应仅来自行为库模块,作为系统中发生的事件的结果,而不是由用户直接调用。

15.4. 其他工作者行为

可以使用这些相同的思想实现许多其他工作者行为。

15.4.1. 有限状态机

有限状态机 (FSM) 在gen_fsm行为模块中实现,是在电信系统 (Erlang 最初发明的领域) 中实现协议栈的关键组件。状态被定义为以状态命名的回调函数,这些回调函数返回一个元组,其中包含下一个State和更新的循环数据。您可以同步和异步地向这些状态发送事件。有限状态机回调模块还应导出标准回调函数,例如initterminatehandle_info

当然,有限状态机并不局限于电信。在 Riak 中,它们用于请求处理程序。当客户端发出诸如getputdelete之类的请求时,监听该请求的进程将生成一个实现相应gen_fsm行为的进程。例如,riak_kv_get_fsm负责处理get请求,检索数据并将其发送到客户端进程。FSM 进程将通过各种状态,因为它确定向哪些节点请求数据,因为它向这些节点发送消息,以及因为它接收响应中的数据、错误或超时。

15.4.2. 事件处理程序

事件处理程序和管理器是gen_event库模块中实现的另一种行为。其理念是创建一个接收特定类型事件的集中点。事件可以同步和异步地发送,并在收到事件时应用一组预定义的操作。对事件的可能响应包括将它们记录到文件、以 SMS 的形式发出警报或收集统计信息。每个操作都在单独的回调模块中定义,并具有自己的循环数据,在调用之间保留。可以为每个特定事件管理器添加、删除或更新处理程序。因此,在实践中,对于每个事件管理器,可能存在许多回调模块,并且这些回调模块的不同实例可能存在于不同的管理器中。事件处理程序包括接收警报、实时跟踪数据、设备相关事件或简单日志的进程。

Riak 中gen_event行为的用途之一是管理对“环事件”的订阅,即对 Riak 集群的成员资格或分区分配的更改。Riak 节点上的进程可以在riak_core_ring_events的一个实例中注册一个函数,该实例实现了gen_event行为。每当管理该节点环的中心进程更改整个集群的成员资格记录时,它就会触发一个事件,导致每个回调模块调用注册的函数。以这种方式,Riak 的各个部分可以轻松地对 Riak 最重要的数据结构之一的更改做出响应,而无需增加该结构的中心管理的复杂性。

大多数常见的并发和通信模式都由我们刚刚讨论的三个主要行为来处理:gen_servergen_fsmgen_event。但是,在大型系统中,随着时间的推移,一些特定于应用程序的模式会显现出来,这些模式需要创建新的行为。Riak 包含一个这样的行为,riak_core_vnode,它形式化了虚拟节点的实现方式。虚拟节点是 Riak 中的主要存储抽象,为请求驱动的 FSM 提供了一个统一的键值存储接口。回调模块的接口使用behavior_info/1函数指定,如下所示

behavior_info(callbacks) ->
    [{init,1},
     {handle_command,3},
     {handoff_starting,2},
     {handoff_cancelled,1},
     {handoff_finished,2},
     {handle_handoff_command,3},
     {handle_handoff_data,2},
     {encode_handoff_item,2},
     {is_empty,1},
     {terminate,2},
     {delete,1}];

上面的示例展示了来自riak_core_vnodebehavior_info/1函数。{CallbackFunction, Arity}元组列表定义了回调模块必须遵循的约定。具体的虚拟节点实现必须导出这些函数,否则编译器将发出警告。实现您自己的 OTP 行为相对简单。除了定义您的回调函数外,使用proc_libsys模块,您还需要使用特定函数启动它们,处理系统消息并监视父进程,以防父进程终止。

15.5. 监督者

监督者行为的任务是监视其子进程,并根据一些预先配置的规则,在它们终止时采取行动。子进程包括监督者和工作进程。这使 Riak 代码库能够专注于正确的情况,这使监督者能够以一致的方式在整个系统中处理软件错误、损坏的数据或系统错误。在 Erlang 世界中,这种非防御性编程方法通常被称为“让它崩溃”策略。构成监督树的子进程可以包括监督者和工作进程。工作进程是 OTP 行为,包括gen_fsmgen_servergen_event。Riak 团队无需处理边界错误情况,因此可以处理更小的代码库。这个代码库,由于使用了行为,从一开始就更小,因为它只处理特定代码。Riak 与大多数 Erlang 应用程序一样,有一个顶级监督者,还有用于具有相关责任的进程组的子监督者。例如,Riak 的虚拟节点、TCP 套接字监听器和查询响应管理器。

15.5.1. 监督者回调函数

为了演示监督者行为是如何实现的,我们将使用riak_core_sup.erl模块。Riak 核心监督者是 Riak 核心应用程序的顶级监督者。它启动一组静态工作者和监督者,以及处理节点 RESTful API 的 HTTP 和 HTTPS 绑定(在应用程序特定配置文件中定义)的动态数量的工作者。与gen_servers类似,所有监督者回调模块都必须包含-behavior(supervisor).指令。它们使用startstart_link函数启动,这些函数采用可选的ServerNameCallBackModule和一个Argument,该Argument传递给init/1回调函数。

查看riak_core_sup.erl模块中前几行代码,除了行为指令和稍后将要描述的宏之外,我们还注意到start_link/3函数

-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

启动监督者将导致生成一个新进程,并且init/1回调函数将在回调模块riak_core_sup.erl中被调用。ServerName是格式为{local, Name}{global, Name}的元组,其中Name是监督者的注册名称。在我们的示例中,注册名称和回调模块都是原子riak_core_sup,源自?MODULE宏。我们将空列表作为参数传递给init/1,将其视为一个空值。init函数是唯一的监督者回调函数。它必须返回一个格式为

{ok,  {SupervisorSpecification, ChildSpecificationList}}

的元组,其中SupervisorSpecification是一个包含有关如何处理进程崩溃和重启的信息的 3 元组{RestartStrategy, AllowedRestarts, MaxSeconds}RestartStrategy是三个配置参数之一,用于确定在异常终止时行为的兄弟姐妹如何受到影响

AllowedRestarts表示在监督者自身(及其子进程)终止之前,监督者子进程在MaxSeconds内可以终止多少次。当一个进程终止时,它会向其监督者发送一个 EXIT 信号,该监督者根据其重启策略相应地处理终止。监督者在达到最大允许重启次数后终止,以确保无法在此级别解决的循环重启和其他问题被升级。很可能问题出现在另一个子树中的一个进程中,这允许接收升级的监督者终止受影响的子树并将其重启。

查看riak_core_sup.erl模块中init/1回调函数的最后一行,我们注意到这个特定的监督者采用的是一对一策略,这意味着进程彼此独立。监督者将在自身重启之前最多允许十次重启。

ChildSpecificationList指定了监督者必须启动和监控的子进程,以及有关如何终止和重启它们的信息。它由以下格式的元组列表组成

{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}

Id是特定监督者的唯一标识符。ModuleFunctionArguments是导出函数,其结果是调用start_link函数,返回格式为{ok, Pid}的元组。Restart策略规定了根据进程的终止类型会发生什么,它可以是

Shutdown是一个以毫秒为单位的值,它指的是当因重启或关闭而终止时,行为在terminate函数中允许执行的时间。原子infinity也可以使用,但对于除了监督者以外的行为,强烈不建议使用。Type要么是原子worker,指的是通用服务器、事件处理程序和有限状态机,要么是原子supervisor。它们与ModuleList(一个实现行为的模块列表)一起用于在运行时软件升级过程中控制和挂起进程。只有现有的或用户实现的行为才能成为子规范列表的一部分,因此被包含在监督树中。

有了这些知识,我们现在应该能够制定重启策略,该策略基于通用架构定义进程间依赖关系、容错阈值和升级过程。我们还应该能够理解riak_core_sup.erl模块的init/1示例中发生了什么。首先,研究CHILD宏。它使用回调模块名作为Id为一个子进程创建子规范,使其成为永久进程,并给它 5 秒的关闭时间。不同的子进程类型可以是工作进程或监督者。看看这个例子,看看你能从中发现什么

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

init([]) ->
    RiakWebs = case lists:flatten(riak_core_web:bindings(http),
                                  riak_core_web:bindings(https)) of
                   [] ->
                       %% check for old settings, in case app.config
                       %% was not updated
                       riak_core_web:old_binding();
                   Binding ->
                       Binding
               end,

    Children =
                 [?CHILD(riak_core_vnode_sup, supervisor),
                  ?CHILD(riak_core_handoff_manager, worker),
                  ?CHILD(riak_core_handoff_listener, worker),
                  ?CHILD(riak_core_ring_events, worker),
                  ?CHILD(riak_core_ring_manager, worker),
                  ?CHILD(riak_core_node_watcher_events, worker),
                  ?CHILD(riak_core_node_watcher, worker),
                  ?CHILD(riak_core_gossip, worker) |
                  RiakWebs
                 ],
    {ok, {{one_for_one, 10, 10}, Children}}.

这个监督者启动的大多数Children是静态定义的工作进程(或者在vnode_sup的情况下,是一个监督者)。例外是RiakWebs部分,它根据 Riak 配置文件的 HTTP 部分动态定义。

除了库应用程序之外,每个 OTP 应用程序,包括 Riak 中的应用程序,都将拥有自己的监督树。在 Riak 中,各种顶级应用程序在 Erlang 节点中运行,例如用于分布式系统算法的riak_core、用于键值存储语义的riak_kv、用于 HTTP 的webmachine等等。我们展示了riak_core下的扩展树,以演示正在进行的多级监督。这种结构的众多好处之一是,给定的子系统可能会崩溃(由于错误、环境问题或有意操作),并且只有该子树最初会被终止。

监督者将重启所需的进程,整体系统将不会受到影响。在实践中,我们已经看到这种方法对 Riak 很有效。用户可能发现如何使虚拟节点崩溃,但它只会由riak_core_vnode_sup重启。如果他们设法使它崩溃,riak_core监督者将重启它,并将终止传播到顶级监督者。这种故障隔离和恢复机制允许 Riak(以及 Erlang)开发人员直接构建弹性系统。

当一个大型工业用户创建了一个非常恶劣的环境以找出几个数据库系统中的每一个将在哪里崩溃时,监督模型的价值得到了体现。这种环境随机地产生了巨大的流量和故障条件的突发情况。当 Riak 即使在最糟糕的这种情况下也没有停止运行时,他们感到困惑。当然,在幕后,他们能够通过多种方式使单个进程或子系统崩溃,但监督者将清理并重启这些进程,每次都使整个系统恢复正常工作状态。

15.5.2. 应用程序

我们之前介绍的application行为用于将 Erlang 模块和资源打包成可重用的组件。在 OTP 中,有两种类型的应用程序。最常见的形式称为正常应用程序,它将启动一个监督树和所有相关的静态工作进程。库应用程序(例如作为 Erlang 分发的一部分提供的标准库)包含库模块,但不启动监督树。这并不是说代码可能不包含进程或监督树。它只是意味着它们是在属于另一个应用程序的监督树的一部分启动的。

Erlang 系统将由一组松散耦合的应用程序组成。有些是由开发人员编写的,有些是作为开源软件提供的,另一些是 Erlang/OTP 分发的一部分。Erlang 运行时系统及其工具平等地对待所有应用程序,无论它们是否是 Erlang 分发的一部分。

15.6. Riak 中的复制和通信

Riak 是为大规模的极端可靠性和可用性而设计的,并受到 Amazon 的 Dynamo 存储系统 [DHJ+07] 的启发。Dynamo 和 Riak 的架构结合了分布式哈希表 (DHT) 和传统数据库的方面。Riak 和 Dynamo 都使用的两种关键技术是用于副本放置的一致哈希和用于共享公共状态的八卦协议

一致哈希要求系统中的所有节点都了解彼此,并了解每个节点拥有哪些分区。此分配数据可以在集中管理的配置文件中维护,但在大型配置中,这变得极其困难。另一种选择是使用中央配置服务器,但这会在系统中引入单点故障。相反,Riak 使用八卦协议在整个系统中传播集群成员资格和分区所有权数据。

八卦协议,也称为流行病协议,的工作方式与它们的名字一样。当系统中的一个节点希望更改一段共享数据时,它会在其本地数据副本上进行更改,并将更新后的数据散布到随机的同级节点。接收到更新后,节点将接收到的更改与本地状态合并,并再次散布到另一个随机的同级节点。

当 Riak 集群启动时,所有节点都必须配置相同的分区计数。一致哈希环然后被分区计数划分,每个间隔被本地存储为{HashRange, Owner}对。集群中的第一个节点只需声明所有分区。当新节点加入集群时,它会联系现有节点以获取其{HashRange, Owner}对列表。然后它会声明 (分区计数)/(节点数量) 对,更新其本地状态以反映其新的所有权。然后,更新后的所有权信息会被散布到同级节点。然后,这种更新后的状态使用上述算法在整个集群中传播。

通过使用八卦协议,Riak 避免了以集中配置服务器的形式引入单点故障,从而减轻了系统操作员维护关键集群配置数据的负担。然后,任何节点都可以使用系统中散布的分区分配数据来路由请求。当八卦协议和一致哈希一起使用时,它们使 Riak 能够作为一个真正去中心化的系统运行,这对部署和操作大规模系统具有重要意义。

15.7. 结论和经验教训

大多数程序员认为,更小、更简单的代码库不仅更易于维护,而且往往包含更少的错误。通过在集群中使用 Erlang 的基本分布式原语进行通信,Riak 可以从一个基本健全的异步消息传递层开始,并构建自己的协议,而不必担心底层实现。随着 Riak 发展成为一个成熟的系统,其网络通信的某些方面从使用 Erlang 的内置分布式系统(转向直接操作 TCP 套接字)转移,而其他方面仍然适合包含的原语。通过从 Erlang 的本机消息传递开始,Riak 团队能够非常快速地构建整个系统。这些原语干净且清晰,以至于后来仍然很容易替换一些在生产环境中证明不是最佳选择的地方。

此外,由于 Erlang 消息传递的性质和 Erlang VM 的轻量级核心,用户可以很容易地在 1 台机器上运行 12 个节点,也可以在 12 台机器上运行 12 个节点。与更重量级的消息传递和集群机制相比,这使得开发和测试变得更加容易。由于 Riak 的本质上是分布式的,这尤其宝贵。从历史上看,大多数分布式系统在单个开发人员的笔记本电脑上很难以“开发模式”运行。因此,开发人员经常最终在他们完整系统的子集环境中测试他们的代码,这种环境的行为截然不同。由于许多节点的 Riak 集群可以很容易地在单个笔记本电脑上运行,而不会过度占用资源或进行复杂的配置,因此开发过程可以更容易地生成准备好用于生产部署的代码。

使用 Erlang/OTP 监督者使 Riak 在面对子组件崩溃时更加弹性。Riak 更进一步;受这种行为的启发,Riak 集群也能够在整个节点崩溃并从系统中消失时轻松保持运行。这可能会导致有时令人惊讶的弹性水平。其中一个例子是,一家大型企业正在对各种数据库进行压力测试,并故意使它们崩溃以观察它们的边缘条件。当他们到达 Riak 时,他们感到困惑。每次他们找到一种方法(通过操作系统级别的操作、错误的 IPC 等)使 Riak 的子系统崩溃时,他们都会看到性能短暂下降,然后系统恢复正常行为。这是对精心设计的“让它崩溃”方法的直接结果。Riak 正在按需干净地重启这些子系统中的每一个,整个系统只是继续运行。这种经验恰恰表明了 Erlang/OTP 构建程序的方法所带来的弹性。

15.7.1. 致谢

本章基于 Francesco Cesarini 和 Simon Thompson 在 2009 年于布达佩斯和科马尔诺举行的中欧函数式编程学校上的讲义笔记。英国肯特大学的 Simon Thompson 作出了主要贡献。特别感谢所有审稿人,他们在本章写作的不同阶段提供了宝贵的反馈。