500 行代码或更少
持续集成系统

马里尼·达斯

马里尼·达斯是一位软件工程师,她热衷于快速开发(但安全!),并解决跨职能问题。她在 Mozilla 担任工具工程师,目前正在 Twitch 磨练自己的技能。在 Twitter 或她的 博客 上关注马里尼。

什么是持续集成系统?

在开发软件时,我们希望能够验证我们的新功能或错误修复是安全的并且按预期工作。我们通过对代码运行测试来做到这一点。有时,开发人员会在本地运行测试以验证他们的更改是安全的,但开发人员可能没有时间在软件运行的每个系统上测试他们的代码。此外,随着越来越多的测试被添加,运行它们所需的时间,即使只是在本地,也变得不那么可行。正因为如此,持续集成系统应运而生。

持续集成 (CI) 系统是专门用于测试新代码的系统。在提交到代码存储库后,持续集成系统有责任验证此提交不会破坏任何测试。为此,系统必须能够获取新的更改,运行测试并报告其结果。与任何其他系统一样,它也应该是故障容忍的。这意味着如果系统中的任何部分发生故障,它应该能够恢复并从该点继续。

此测试系统还应该能够很好地处理负载,以便在提交速度快于测试运行速度的情况下,我们可以在合理的时间内获得测试结果。我们可以通过分配和并行化测试工作来实现这一点。本项目将演示一个小型、基础的分布式持续集成系统,该系统旨在可扩展。

项目局限性和说明

本项目使用 Git 作为需要测试的代码的存储库。只使用标准的源代码管理调用,因此,如果您不熟悉 Git,但熟悉其他版本控制系统 (VCS),例如 svn 或 Mercurial,您仍然可以继续阅读。

由于代码长度和 unittest 的限制,我简化了测试发现。我们只会运行位于存储库中名为 tests 的目录中的测试。

持续集成系统监视一个主存储库,该存储库通常托管在 Web 服务器上,而不是 CI 的文件系统本地。在我们示例的情况下,我们将使用本地存储库而不是远程存储库。

持续集成系统不需要以固定、规则的计划运行。您还可以让它们每隔几次提交运行一次,或每次提交运行一次。在我们示例的情况下,CI 系统将定期运行。这意味着如果它被设置为每五秒检查一次更改,它将在五秒后针对最近进行的提交运行测试。它不会测试该时间段内进行的每次提交,只会测试最近的一次提交。

此 CI 系统旨在定期检查存储库中的更改。在现实世界的 CI 系统中,您还可以让存储库观察者收到托管存储库的通知。例如,GitHub 提供“提交后挂钩”,它们向 URL 发送通知。按照此模型,存储库观察者将由托管在该 URL 的 Web 服务器调用以响应该通知。由于在本地建模这一点很复杂,因此我们使用观察者模型,其中存储库观察者将检查更改,而不是接收通知。

CI 系统还具有报告器方面,其中测试运行器将其结果报告给一个组件,该组件使人们能够看到它们,也许是在网页上。为简单起见,本项目收集测试结果并将它们存储为文件,这些文件存储在调度程序进程的本地文件系统中。

请注意,此 CI 系统使用的体系结构只是众多可能性之一。选择这种方法是为了将我们的案例研究简化为三个主要组件。

简介

持续集成系统的基本结构由三个组件组成:观察者、测试作业调度程序和测试运行器。观察者监视存储库。当它注意到已进行提交时,它会通知作业调度程序。作业调度程序然后找到一个测试运行器并为其提供要测试的提交号。

构建 CI 系统的方法有很多。我们可以让观察者、调度程序和运行器成为同一台机器上的同一进程。这种方法非常有限,因为它没有负载处理,因此,如果向存储库添加的更改多于 CI 系统能够处理的更改,将积累大量的积压。这种方法也根本不具有容错性;如果运行它的计算机发生故障或出现停电,则没有备用系统,因此不会运行任何测试。理想的系统应该是能够处理所有请求的测试作业,并且会尽力在机器出现故障时进行补偿。

为了构建一个容错且负载均衡的 CI 系统,在本项目中,这些组件中的每一个都是自己的进程。这将使每个进程独立于其他进程,并让我们运行每个进程的多个实例。当您需要同时运行多个测试作业时,这很有用。然后,我们可以并行生成多个测试运行器,使我们能够运行所需的所有作业,并防止我们积累排队的测试积压。

在本项目中,这些组件不仅作为单独的进程运行,而且还通过套接字进行通信,这将使我们能够在单独的联网机器上运行每个进程。每个组件都会被分配一个唯一的主机/端口地址,每个进程可以通过在分配的地址上发布消息来与其他进程进行通信。

这种设计将使我们能够通过启用分布式体系结构来动态处理硬件故障。我们可以让观察者在某一台机器上运行,测试作业调度程序在另一台机器上运行,测试运行器在另一台机器上运行,它们都可以通过网络互相通信。如果这些机器中的任何一台出现故障,我们可以安排一台新机器在网络上启动,因此系统变得故障安全。

本项目不包括自动恢复代码,因为这取决于您的分布式系统的体系结构,但在现实世界中,CI 系统是在这样的分布式环境中运行的,因此它们可以具有故障转移冗余(即,如果一个进程运行的机器之一失效,我们可以回退到备用机器)。

出于本项目的目的,这些进程中的每一个都将本地手动启动不同的本地端口。

本项目中的文件

本项目包含每个组件的 Python 文件:存储库观察者 (repo_observer.py)、测试作业调度程序 (dispatcher.py) 和测试运行器 (test_runner.py)。这三个进程中的每一个都使用套接字互相通信,由于用于传输信息 的代码被它们共享,因此有一个 helpers.py 文件包含它,因此每个进程都从这里导入通信函数,而不是在文件中进行重复。

这些进程还使用 bash 脚本文件。这些脚本文件用于以比不断使用 Python 的操作系统级模块(如 os 和 subprocess)更简单的方式执行 bash 和 git 命令。

最后,有一个 tests 目录,其中包含 CI 系统将运行的两个示例测试。一个测试将通过,另一个测试将失败。

初始设置

虽然此 CI 系统已准备好在一个分布式系统中工作,但让我们首先在同一台计算机上本地运行所有内容,以便我们能够了解 CI 系统的工作原理,而无需增加遇到网络相关问题的风险。如果您希望在分布式环境中运行它,您可以在自己的机器上运行每个组件。

持续集成系统通过检测代码存储库中的更改来运行测试,因此,首先,我们需要设置 CI 系统将监视的存储库。

让我们将它命名为 test_repo

$ mkdir test_repo 
$ cd test_repo 
$ git init

这将是我们的主存储库。开发人员在此处签入他们的代码,因此我们的 CI 应该拉取此存储库并检查提交,然后运行测试。检查新提交的事物是存储库观察者。

存储库观察者通过检查提交来工作,因此我们需要在主存储库中至少进行一次提交。让我们提交我们的示例测试,以便我们有一些测试可以运行。

将 tests 文件夹从这个代码库复制到 test_repo 并提交它

$ cp -r /this/directory/tests /path/to/test_repo/ 
$ cd /path/to/test\_repo 
$ git add tests/ 
$ git commit -m ”add tests”

现在您在主存储库中有一个提交。

存储库观察者组件将需要其自己的代码克隆,以便它可以检测到何时进行新的提交。让我们创建主存储库的克隆,并将其命名为 test_repo_clone_obs

$ git clone /path/to/test_repo test_repo_clone_obs

测试运行器也将需要其自己的代码克隆,以便它可以签出存储库中的特定提交并运行测试。让我们创建主存储库的另一个克隆,并将其命名为 test_repo_clone_runner

$ git clone /path/to/test_repo test_repo_clone_runner

组件

存储库观察者 (repo_observer.py)

存储库观察者监视存储库,并在看到新的提交时通知调度程序。为了与所有版本控制系统(因为并非所有 VCS 都具有内置通知系统)一起工作,此存储库观察者被编写为定期检查存储库中的新提交,而不是依赖 VCS 通知它已进行更改。

观察者将定期轮询存储库,并在看到更改时,会告诉调度程序针对其运行测试的最新提交 ID。观察者通过在其存储库中找到当前提交 ID,然后更新存储库,最后,它找到最新的提交 ID 并进行比较来检查新提交。出于本示例的目的,观察者只会针对最新的提交调度测试。这意味着如果在定期检查之间进行了两次提交,观察者只会针对最新的提交运行测试。通常,CI 系统会检测自上次测试提交以来的所有提交,并为每个新提交调度测试运行器,但我为了简化,修改了这一假设。

观察者必须知道要观察哪个存储库。我们之前在 /path/to/test_repo_clone_obs 中创建了存储库的克隆。观察者将使用此克隆来检测更改。为了让存储库观察者使用此克隆,我们在调用 repo_observer.py 文件时将路径传递给它。存储库观察者将使用此克隆从主存储库中拉取。

我们还必须为观察者提供调度程序的地址,以便观察者可以向其发送消息。当您启动存储库观察者时,您可以使用 --dispatcher-server 命令行参数传入调度程序的服务器地址。如果您没有传入,它将假设默认地址为 localhost:8888

def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " \
                        "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

一旦调用了存储库观察者文件,它就会启动 poll() 函数。此函数解析命令行参数,然后启动一个无限的 while 循环。while 循环用于定期检查存储库中的更改。它所做的第一件事是调用 update_repo.sh Bash 脚本 1.

    while True:
        try:
            # call the bash script that will update the repo and check
            # for changes. If there's a change, it will drop a .commit_id file
            # with the latest commit in the current working directory
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. " +
                            "Reason: %s" % e.output)

update_repo.sh 文件用于识别任何新的提交并让存储库观察者知道。它通过记录我们目前知道的提交 ID,然后拉取存储库,最后检查最新的提交 ID 来做到这一点。如果它们匹配,则不会进行任何更改,因此存储库观察者无需执行任何操作,但如果提交 ID 有所不同,那么我们就知道已进行新的提交。在这种情况下,update_repo.sh 将创建一个名为 .commit_id 的文件,其中存储着最新的提交 ID。

以下是update_repo.sh脚本的逐步分解。首先,脚本会调用run_or_fail.sh文件,该文件提供了我们所有shell脚本使用的run_or_fail辅助方法。该方法用于运行给定的命令,或在出现错误时给出错误消息。

#!/bin/bash

source run_or_fail.sh 

接下来,脚本尝试删除名为.commit_id的文件。由于updaterepo.shrepo_observer.py文件无限次调用,如果之前有新的提交,则会创建.commit_id文件,但它包含我们已经测试过的提交。因此,我们希望删除该文件,只有在发现新的提交时才创建新的文件。

bash rm -f .commit_id 

在删除文件(如果存在)之后,它会验证我们正在观察的存储库是否存在,然后将其重置为最新的提交,以防出现任何导致其不同步的情况。

run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD

然后,它调用git log并解析输出,查找最新的提交ID。

COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

然后,它拉取存储库,获取任何最近的更改,然后获取最新的提交ID。

run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

最后,如果提交ID与之前的ID不匹配,那么我们知道有新的提交需要检查,因此脚本将最新的提交ID存储在.commit_id文件中。

# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
  popd 1> /dev/null
  echo $NEW_COMMIT_ID > .commit_id
fi

update_repo.shrepo_observer.py中完成运行时,存储库观察器会检查.commit_id文件是否存在。如果文件确实存在,那么我们知道有新的提交,我们需要通知调度器,以便它可以启动测试。存储库观察器会通过连接到调度器服务器并发送“status”请求来检查调度器服务器的状态,以确保服务器没有问题,并且确保它已准备好接受指令。

        if os.path.isfile(".commit_id"):
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher server: %s" % e)

如果它响应“OK”,那么存储库观察器会打开.commit_id文件,读取最新的提交ID,并使用dispatch:<commit ID>请求将该ID发送到调度器。然后,它将休眠五秒钟并重复此过程。如果在此过程中出现任何错误,我们也会在五秒钟后重试。

            if response == "OK":
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" %
                    response)
                print "dispatched!"
            else:
                raise Exception("Could not dispatch the test: %s" %
                response)
        time.sleep(5)

存储库观察器将永远重复此过程,直到你通过KeyboardInterrupt(Ctrl+c)或发送终止信号来终止进程。

调度器 (dispatcher.py)

调度器是一个独立的服务,用于委派测试任务。它监听来自测试运行器和存储库观察器的请求的端口。它允许测试运行器注册自己,并在从存储库观察器获得提交ID时,它会将一个测试运行器分配到新的提交上。它还会优雅地处理测试运行器出现的任何问题,并在出现问题时将提交ID重新分配给新的测试运行器。

当执行dispatch.py时,会调用serve函数。首先,它会解析允许你指定调度器主机和端口的参数。

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

这将启动调度器服务器,以及另外两个线程。一个线程运行runner_checker函数,另一个线程运行redistribute函数。

    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print `serving on %s:%s` % (args.host, int(args.port))

    ...

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl+C or Cmd+C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()

runner_checker函数定期 ping 每个已注册的测试运行器,以确保它们仍然响应。如果它们变得无响应,那么该运行器将从池中移除,并且其提交ID将被分配到下一个可用的运行器。该函数将提交ID记录在pending_commits变量中。

    def runner_checker(server):
        def manage_commit_lists(runner):
            for commit, assigned_runner in server.dispatched_commits.iteritems():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)
        while not server.dead:
            time.sleep(1)
            for runner in server.runners:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"],
                                                   int(runner["port"]),
                                                   "ping")
                    if response != "pong":
                        print "removing runner %s" % runner
                        manage_commit_lists(runner)
                except socket.error as e:
                    manage_commit_lists(runner)

redistribute函数用于分配记录在pending_commits中的提交ID。当redistribute运行时,它会检查pending_commits中是否有任何提交ID。如果有,它会使用提交ID调用dispatch_tests函数。

    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

dispatch_tests函数用于从已注册运行器池中找到一个可用的测试运行器。如果有一个可用,它会向其发送一个带有提交ID的runtest消息。如果当前没有可用的运行器,它会等待两秒钟并重复此过程。分配完成后,它会在dispatched_commits变量中记录哪个测试运行器正在测试哪个提交ID。如果提交ID在pending_commits变量中,dispatch_tests将将其删除,因为它已经成功重新分配。

def dispatch_tests(server, commit_id):
    # NOTE: usually we don't run this forever
    while True:
        print "trying to dispatch to runners"
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           "runtest:%s" % commit_id)
            if response == "OK":
                print "adding id %s" % commit_id
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)

调度器服务器使用SocketServer模块,这是一个非常简单的服务器,是标准库的一部分。SocketServer模块中有四种基本服务器类型:TCPUDPUnixStreamServerUnixDatagramServer。我们将使用基于 TCP 的套接字服务器,以便我们可以在服务器之间确保连续、有序的数据流,因为 UDP 不保证这一点。

SocketServer提供的默认TCPServer一次只能处理一个请求,因此它无法处理调度器正在与一个连接(例如来自测试运行器的连接)通信,然后又进来一个新的连接(例如来自存储库观察器的连接)的情况。如果发生这种情况,存储库观察器必须等待第一个连接完成并断开连接,才能获得服务。这对我们来说并不理想,因为调度器服务器必须能够直接快速地与所有测试运行器和存储库观察器通信。

为了让调度器服务器处理同时连接,它使用ThreadingTCPServer自定义类,该类为默认的SocketServer添加了线程功能。这意味着,每当调度器接收到连接请求时,它都会为该连接创建一个新的进程。这允许调度器同时处理多个请求。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = [] # Keeps track of test runner pool
    dead = False # Indicate to other threads that we are no longer running
    dispatched_commits = {} # Keeps track of commits we dispatched
    pending_commits = [] # Keeps track of commits we have yet to dispatch

调度器服务器通过为每个请求定义处理程序来工作。这是由DispatcherHandler类定义的,该类继承自SocketServerBaseRequestHandler。这个基类只需要我们定义handle函数,该函数将在每次请求连接时被调用。DispatcherHandler中定义的handle函数是我们的自定义处理程序,它将在每个连接上被调用。它查看传入的连接请求(self.request保存请求信息),并解析出它被请求执行什么命令。

class DispatcherHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our dispatcher.
    This will dispatch test runners against the incoming commit
    and handle their requests and test results
    """
    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024
    def handle(self):
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

它处理四个命令:statusregisterdispatchresultsstatus用于检查调度器服务器是否已启动并运行。

        if command == "status":
            print "in status"
            self.request.sendall("OK")

为了让调度器执行任何有用的操作,它需要至少有一个已注册的测试运行器。当在主机:端口对上调用register时,它会将运行器的信息存储在一个列表中(附加到ThreadingTCPServer对象的运行器对象),以便它稍后在需要向其提供提交ID以运行测试时能够与其通信。

        elif command == "register":
            # Add this test runner to our pool
            print "register"
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port":port}
            self.server.runners.append(runner)
            self.request.sendall("OK")

dispatch由存储库观察器使用,用于将测试运行器分配到提交上。此命令的格式为dispatch:<commit ID>。调度器从此消息中解析出提交ID,并将其发送到测试运行器。

        elif command == "dispatch":
            print "going to dispatch"
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                # The coordinator can trust us to dispatch the test
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)

results由测试运行器用于报告已完成的测试运行的结果。此命令的格式为results:<commit ID>:<results 数据的长度(字节)>:<results><commit ID>用于标识测试运行针对哪个提交ID。<results 数据的长度(字节)>用于确定结果数据需要多大的缓冲区。最后,<results>保存实际的结果输出。

        elif command == "results":
            print "got test results"
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 is the number of ":" in the sent command
            remaining_buffer = self.BUF_SIZE - \
                (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK")

测试运行器 (test_runner.py)

测试运行器负责针对给定的提交ID运行测试并报告结果。它只与调度器服务器通信,调度器服务器负责向其提供要运行的提交ID,并且会接收测试结果。

当调用test_runner.py文件时,它会调用serve函数,该函数启动测试运行器服务器,并启动一个线程来运行dispatcher_checker函数。由于这个启动过程与repo_observer.pydispatcher.py中描述的过程非常相似,这里省略描述。

dispatcher_checker函数每五秒钟 ping 一次调度器服务器,以确保它仍然启动并运行。这对资源管理很重要。如果调度器宕机,那么测试运行器将关闭,因为它无法在没有调度器的情况下执行任何有意义的工作,因为没有调度器来分配工作或报告结果。

    def dispatcher_checker(server):
        while not server.dead:
            time.sleep(5)
            if (time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                                       server.dispatcher_server["host"],
                                       int(server.dispatcher_server["port"]),
                                       "status")
                    if response != "OK":
                        print "Dispatcher is no longer functional"
                        server.shutdown()
                        return
                except socket.error as e:
                    print "Can't communicate with dispatcher: %s" % e
                    server.shutdown()
                    return

测试运行器是一个ThreadingTCPServer,就像调度器服务器一样。它需要线程,因为调度器不仅会向其提供要运行的提交ID,而且调度器还会定期 ping 该运行器以验证它在运行测试时是否仍然处于活动状态。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None # Holds the dispatcher server host/port information
    last_communication = None # Keeps track of last communication from dispatcher
    busy = False # Status flag
    dead = False # Status flag

通信流程从调度器请求运行器接受要运行的提交ID开始。如果测试运行器已准备好运行作业,它会向调度器服务器发送确认响应,然后调度器服务器会关闭连接。为了让测试运行器服务器同时运行测试并接受调度器的更多请求,它在新线程上启动请求的测试作业。

这意味着,当调度器服务器发出请求(在本例中为 ping)并希望收到响应时,它将在一个单独的线程上完成,而测试运行器则在其自己的线程上忙于运行测试。这允许测试运行器服务器同时处理多个任务。如果不用这种线程设计,可以让调度器服务器保持与每个测试运行器的连接,但这会增加调度器服务器的内存需求,并且容易受到网络问题的影响,例如意外断开的连接。

测试运行器服务器会响应调度器发出的两条消息。第一个是ping,调度器服务器使用它来验证运行器是否仍然处于活动状态。

class TestHandler(SocketServer.BaseRequestHandler):
    ...

    def handle(self):
        ....
        if command == "ping":
            print "pinged"
            self.server.last_communication = time.time()
            self.request.sendall("pong")

第二个是runtest,它接受runtest:<commit ID>形式的消息,用于在给定的提交上启动测试。当调用runtest时,测试运行器会检查它是否已经在运行测试,如果是,它会向调度器返回BUSY响应。如果可用,它会向服务器发送OK消息,将其状态设置为忙碌,并运行其run_tests函数。

        elif command == "runtest":
            print "got runtest command: am I busy? %s" % self.server.busy
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print "running"
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False

此函数调用shell脚本test_runner_script.sh,该脚本将存储库更新到给定的提交ID。一旦脚本返回,如果它成功更新了存储库,我们就会使用unittest运行测试并将结果收集到一个文件中。当测试完成运行后,测试运行器会读取结果文件,并将结果消息发送到调度器。

    def run_tests(self, commit_id, repo_folder):
        # update repo
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print output
        # run the tests
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # give the dispatcher the results
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
                            int(self.server.dispatcher_server["port"]),
                            "results:%s:%s:%s" % (commit_id, len(output), output))

以下是test_runner_script.sh

#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

为了运行test_runner.py,你必须将其指向要针对其运行测试的存储库的克隆。在本例中,你可以使用之前创建的/path/to/test_repo test_repo_clone_runner克隆作为参数。默认情况下,test_runner.py将在本地主机上启动自己的服务器,使用 8900-9000 范围内的端口,并尝试连接到localhost:8888上的调度器服务器。你可以传递可选参数来更改这些值。--host--port参数用于指定运行测试运行器服务器的特定地址,而--dispatcher-server参数指定调度器的地址。

控制流程图

图 2.1 是此系统的概述图。此图假设所有三个文件 (repo_observer.pydispatcher.pytest_runner.py) 已经运行,并描述了每个进程在进行新的提交时采取的操作。

Figure 2.1 - Control Flow

图 2.1 - 控制流程

运行代码

我们可以使用三个不同的终端 shell 在本地运行这个简单的 CI 系统,每个 shell 用于一个进程。我们首先启动调度器,在端口 8888 上运行。

$ python dispatcher.py

在新 shell 中,我们启动测试运行器(以便它可以向调度器注册)。

$ python test_runner.py <path/to/test_repo_clone_runner>

测试运行器会给自己分配一个端口,该端口在 8900-9000 范围内。你可以根据需要运行任意数量的测试运行器。

最后,在另一个新的 shell 中,我们启动存储库观察器。

$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>

现在一切都设置好了,让我们触发一些测试!为此,我们需要进行新的提交。转到你的 master 存储库,并进行任意更改。

$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file

然后,repo_observer.py 会意识到有一个新的提交,并通知调度器。你可以在各自的 shell 中看到输出,因此可以监控它们。调度器接收到测试结果后,会将它们存储在这个代码库中的 test_results/ 文件夹中,使用提交 ID 作为文件名。

错误处理

这个 CI 系统包含一些简单的错误处理。

如果你杀死了 test_runner.py 进程,dispatcher.py 会发现运行器不可用,并将其从池中移除。

你也可以杀死测试运行器,以模拟机器崩溃或网络故障。如果你这样做,调度器会意识到运行器宕机,并会将工作分配给池中可用的另一个测试运行器,或者等待新的测试运行器在池中注册。

如果你杀死了调度器,仓库观察器会发现它宕机,并抛出异常。测试运行器也会注意到,并关闭。

结论

通过将职责分离到各自的进程中,我们能够构建分布式持续集成系统的基础。通过进程之间使用套接字请求进行通信,我们能够将系统分布到多台机器上,从而帮助我们的系统更可靠和可扩展。

由于 CI 系统现在非常简单,你可以自己扩展它以使其更具功能性。以下是一些改进建议

每次提交的测试运行

当前系统会定期检查是否有新的提交要运行,并会运行最近的提交。这应该改进为测试每个提交。为此,你可以修改周期性检查器,以便为日志中上次测试提交和最新提交之间的每个提交调度测试运行。

更智能的测试运行器

如果测试运行器检测到调度器没有响应,它就会停止运行。即使测试运行器正在运行测试,也会发生这种情况!如果测试运行器在测试运行过程中等待一段时间(或者如果你不关心资源管理,则无限期地等待)调度器恢复联机,会更好。在这种情况下,如果调度器在测试运行器正在积极运行测试时宕机,测试运行器不会关闭,而是会完成测试并等待调度器恢复联机,并将结果报告给它。这将确保我们不会浪费测试运行器所做的任何努力,并且我们只会对每个提交运行一次测试。

真实报告

在真实的 CI 系统中,你将让测试结果报告给一个报告服务,该服务将收集结果,将它们发布到某个地方供人们查看,并在发生故障或其他值得注意的事件时通知一组相关方。你可以通过创建一个新的进程来获取报告结果来扩展我们简单的 CI 系统,而不是让调度器收集结果。这个新进程可以是一个 web 服务器(或可以连接到 web 服务器),它可以将结果发布到网上,并可以使用邮件服务器向订阅者发送有关任何测试失败的警报。

测试运行器管理器

现在,你必须手动启动 test_runner.py 文件以启动测试运行器。相反,你可以创建一个测试运行器管理器进程,该进程将评估来自调度器的测试请求的当前负载,并相应地调整活动测试运行器的数量。这个进程将接收 runtest 消息,并将为每个请求启动一个测试运行器进程,并在负载降低时杀死未使用的进程。

使用这些建议,你可以使这个简单的 CI 系统更加健壮和容错,并且你可以将其与其他系统集成,例如基于 web 的测试报告器。

如果你想了解持续集成系统能够达到的灵活性程度,我建议你看看 Jenkins,一个用 Java 编写的非常健壮的开源 CI 系统。它为你提供了一个基本的 CI 系统,你可以使用插件扩展它。你也可以访问其源代码 通过 GitHub。另一个推荐的项目是 Travis CI,它用 Ruby 编写,其源代码也可用 通过 GitHub

这是一个了解 CI 系统的工作原理以及如何自己构建 CI 系统的练习。你现在应该对构建可靠的分布式系统所需的知识有更深入的了解,你现在可以使用这些知识来开发更复杂的解决方案。

  1. 使用 Bash 是因为我们需要检查文件是否存在,创建文件,并使用 Git,而 shell 脚本是实现此目的最直接和最简单的方法。或者,你可以使用跨平台的 Python 包;例如,Python 的 os 内置模块可用于访问文件系统,而 GitPython 可用于访问 Git,但它们以更间接的方式执行操作。