500 行代码或更少
用 Python 编写的 Python 解释器

Allison Kaptur

Allison 是 Dropbox 的一名工程师,她在那里帮助维护世界上最大的 Python 客户端网络之一。在加入 Dropbox 之前,她曾在 Recurse Center 担任协调员,Recurse Center 是一个位于纽约的程序员作家疗养院。她曾在北美 PyCon 上谈论 Python 内部机制,并且喜欢奇怪的错误。她的博客地址是 akaptur.com.

(本章也有 简体中文 版本).

介绍

Byterun 是用 Python 实现的 Python 解释器。通过我对 Byterun 的工作,我惊讶地发现 Python 解释器的基本结构很容易适应 500 行的代码限制。本章将介绍解释器的结构,并为您提供足够的背景知识来进一步探索它。目标不是解释关于解释器的所有内容——就像编程和计算机科学的许多有趣领域一样,您可能需要花费数年时间才能深入理解这个主题。

Byterun 由 Ned Batchelder 和我编写,并基于 Paul Swartz 的工作。它的结构类似于 Python 的主要实现 CPython,因此了解 Byterun 将有助于您了解一般的解释器,特别是 CPython 解释器。(如果您不知道自己使用的是哪个 Python,它很可能是 CPython。)尽管长度很短,但 Byterun 能够运行大多数简单的 Python 程序1.

Python 解释器

在我们开始之前,让我们缩小一下我们所说的“Python 解释器”的范围。在讨论 Python 时,“解释器”一词可以以多种不同的方式使用。有时解释器指的是 Python REPL,即您在命令行中键入 python 时获得的交互式提示。有时人们会将“Python 解释器”与“Python”几乎等同起来,以讨论从头到尾执行 Python 代码。在本章中,“解释器”具有更狭义的含义:它是执行 Python 程序过程中的最后一步。

在解释器接管之前,Python 执行了另外三个步骤:词法分析、语法分析和编译。这些步骤共同将程序员的源代码从文本行转换为结构化的代码对象,其中包含解释器可以理解的指令。解释器的任务是接收这些代码对象,并遵循这些指令。

您可能很惊讶地听到编译是执行 Python 代码的一个步骤。Python 通常被称为像 Ruby 或 Perl 这样的“解释型”语言,而不是像 C 或 Rust 这样的“编译型”语言。然而,这种术语不像看起来那样精确。大多数解释型语言,包括 Python,都涉及编译步骤。Python 被称为“解释型”语言的原因是编译步骤执行的工作相对较少(而解释器执行的工作相对较多),与编译型语言相比。正如我们将在本章后面看到的那样,Python 编译器对程序行为的了解远远少于 C 编译器。

Python 的 Python 解释器

Byterun 是用 Python 编写的 Python 解释器。这可能让您感到奇怪,但它并不比用 C 编写 C 编译器更奇怪。(事实上,广泛使用的 C 编译器 gcc 是用 C 编写的。)您可以用几乎任何语言编写 Python 解释器。

用 Python 编写 Python 解释器既有优点也有缺点。最大的缺点是速度:通过 Byterun 执行代码的速度远低于在 CPython 中执行代码的速度,在 CPython 中,解释器是用 C 编写并经过仔细优化。然而,Byterun 最初被设计为一个学习练习,所以速度对我们来说并不重要。使用 Python 最大的优势是我们可以更容易地只实现解释器,而不需要实现 Python 运行时的其他部分,特别是对象系统。例如,Byterun 在需要创建类时可以回退到“真实”Python。另一个优点是 Byterun 很容易理解,部分原因是它是用一种高级语言(Python!)编写的,许多人发现它很容易阅读。(我们还在 Byterun 中排除了解释器优化——再次将清晰度和简洁性置于速度之上。)

构建解释器

在我们开始查看 Byterun 的代码之前,我们需要了解一些关于解释器结构的更高层次的背景信息。Python 解释器是如何工作的?

Python 解释器是一个虚拟机,这意味着它是一个模拟物理计算机的软件。这个特殊的虚拟机是一个堆栈机:它操作几个堆栈来执行操作(与寄存器机形成对比,寄存器机写入和读取特定的内存位置)。

Python 解释器是一个字节码解释器:它的输入是称为字节码的指令集。当您编写 Python 时,词法分析器、语法分析器和编译器会生成解释器要操作的代码对象。每个代码对象包含一组要执行的指令——即字节码——以及解释器需要的其他信息。字节码是 Python 代码的中间表示:它以解释器可以理解的方式表达了您编写的源代码。它类似于汇编语言作为 C 代码和硬件之间的中间表示。

一个微型解释器

为了使这一点具体化,让我们从一个非常小的解释器开始。这个解释器只能添加数字,它只理解三个指令。它可以执行的所有代码都由这些指令的不同组合组成。这三个指令是:

由于我们本章不关心词法分析器、语法分析器和编译器,因此指令集是如何生成的并不重要。您可以想象编写 7 + 5 并让编译器发出这三个指令的组合。或者,如果您有合适的编译器,您可以编写 Lisp 语法,它被转换为相同的指令组合。解释器并不关心。重要的是我们的解释器得到了指令的格式良好的排列。

假设

7 + 5

产生以下指令集

what_to_execute = {
    "instructions": [("LOAD_VALUE", 0),  # the first number
                     ("LOAD_VALUE", 1),  # the second number
                     ("ADD_TWO_VALUES", None),
                     ("PRINT_ANSWER", None)],
    "numbers": [7, 5] }

Python 解释器是一个堆栈机,因此它必须操作堆栈来添加两个数字 (图 12.1.) 解释器将从执行第一个指令 LOAD_VALUE 开始,并将第一个数字压入堆栈。接下来,它将第二个数字压入堆栈。对于第三个指令 ADD_TWO_VALUES,它将从堆栈中弹出两个数字,将它们加在一起,并将结果压入堆栈。最后,它将从堆栈中弹出答案并打印出来。

Figure 12.1 - A stack machine

图 12.1 - 堆栈机

LOAD_VALUE 指令告诉解释器将一个数字压入堆栈,但指令本身并没有指定哪个数字。每个指令都需要一个额外的信息,告诉解释器在哪里找到要加载的数字。因此,我们的指令集有两个部分:指令本身,以及指令需要的常量列表。(在 Python 中,我们所说的“指令”是字节码,“要执行的内容”对象是代码对象)。

为什么不直接将数字放在指令中?想象一下,如果我们要将字符串加在一起,而不是数字。我们不想将字符串塞进指令中,因为它们可能任意大。这种设计也意味着我们只需要每个对象的副本,因此,例如,要添加 7 + 7,“数字”可以是 [7]

您可能想知道为什么除了 ADD_TWO_VALUES 之外的指令还需要存在。事实上,对于添加两个数字的简单情况来说,这个例子有点牵强。然而,这条指令是构建更复杂程序的基石。例如,仅仅通过我们目前定义的指令,我们就可以在给定这些指令的正确集合的情况下,将三个值加在一起——或者将任意数量的值加在一起。堆栈提供了一种干净的方式来跟踪解释器的状态,并且随着我们不断前进,它将支持更多复杂的操作。

现在让我们开始编写解释器本身。解释器对象有一个堆栈,我们将用列表来表示它。该对象还具有一个方法,用于描述如何执行每个指令。例如,对于 LOAD_VALUE,解释器将该值压入堆栈。

class Interpreter:
    def __init__(self):
        self.stack = []

    def LOAD_VALUE(self, number):
        self.stack.append(number)

    def PRINT_ANSWER(self):
        answer = self.stack.pop()
        print(answer)

    def ADD_TWO_VALUES(self):
        first_num = self.stack.pop()
        second_num = self.stack.pop()
        total = first_num + second_num
        self.stack.append(total)

这三个函数实现了我们解释器理解的三个指令。解释器还需要最后一个部分:一个将所有内容联系在一起并实际执行它的方法。这个方法 run_code 以上面定义的 what_to_execute 字典作为参数。它循环遍历每个指令,处理该指令的参数(如果有的话),然后在解释器对象上调用相应的方法。

    def run_code(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        numbers = what_to_execute["numbers"]
        for each_step in instructions:
            instruction, argument = each_step
            if instruction == "LOAD_VALUE":
                number = numbers[argument]
                self.LOAD_VALUE(number)
            elif instruction == "ADD_TWO_VALUES":
                self.ADD_TWO_VALUES()
            elif instruction == "PRINT_ANSWER":
                self.PRINT_ANSWER()

为了测试它,我们可以创建一个对象的实例,然后用上面定义的添加 7 + 5 的指令集调用 run_code 方法。

    interpreter = Interpreter()
    interpreter.run_code(what_to_execute)

果然,它打印出答案:12。

虽然这个解释器非常有限,但这个过程与真实 Python 解释器添加数字的方式几乎完全相同。即使在这个小的例子中,也需要注意一些事情。

首先,有些指令需要参数。在真实的 Python 字节码中,大约有一半的指令有参数。参数与指令一起打包,就像我们示例中一样。请注意,指令的参数与调用的方法的参数不同。

其次,请注意,ADD_TWO_VALUES 的指令不需要任何参数。相反,要添加的值是从解释器的堆栈中弹出的。这是基于堆栈的解释器的定义特征。

请记住,给定有效的指令集,在不对我们的解释器进行任何更改的情况下,我们可以一次添加多个数字。考虑下面的指令集。您期望发生什么?如果您有一个友好的编译器,您可以编写什么代码来生成这个指令集?

    what_to_execute = {
        "instructions": [("LOAD_VALUE", 0),
                         ("LOAD_VALUE", 1),
                         ("ADD_TWO_VALUES", None),
                         ("LOAD_VALUE", 2),
                         ("ADD_TWO_VALUES", None),
                         ("PRINT_ANSWER", None)],
        "numbers": [7, 5, 8] }

在这一点上,我们可以开始看到这种结构的可扩展性:我们可以添加解释器对象上的方法,这些方法描述了更多操作(只要我们有一个编译器来提供给我们格式良好的指令集)。

变量

接下来,让我们将变量添加到我们的解释器中。变量需要一个用于存储变量值的指令 STORE_NAME;一个用于检索它的指令 LOAD_NAME;以及一个将变量名映射到值的映射。现在,我们将忽略命名空间和作用域,因此我们可以将变量映射存储在解释器对象本身中。最后,我们必须确保 what_to_execute 除了常量列表之外,还有一个变量名列表。

>>> def s():
...     a = 1
...     b = 2
...     print(a + b)
# a friendly compiler transforms `s` into:
    what_to_execute = {
        "instructions": [("LOAD_VALUE", 0),
                         ("STORE_NAME", 0),
                         ("LOAD_VALUE", 1),
                         ("STORE_NAME", 1),
                         ("LOAD_NAME", 0),
                         ("LOAD_NAME", 1),
                         ("ADD_TWO_VALUES", None),
                         ("PRINT_ANSWER", None)],
        "numbers": [1, 2],
        "names":   ["a", "b"] }

我们的新实现如下。为了跟踪哪些名称绑定到哪些值,我们将在 __init__ 方法中添加一个 environment 字典。我们还将添加 STORE_NAMELOAD_NAME。这些方法首先查找要查询的变量名,然后使用字典来存储或检索其值。

指令的参数现在可以有两种不同的含义:它们可以是“数字”列表中的索引,也可以是“名称”列表中的索引。解释器通过检查正在执行的指令来判断应该使用哪种含义。我们将把这种逻辑(以及指令到其参数含义的映射)分解到一个单独的方法中。

class Interpreter:
    def __init__(self):
        self.stack = []
        self.environment = {}

    def STORE_NAME(self, name):
        val = self.stack.pop()
        self.environment[name] = val

    def LOAD_NAME(self, name):
        val = self.environment[name]
        self.stack.append(val)

    def parse_argument(self, instruction, argument, what_to_execute):
        """ Understand what the argument to each instruction means."""
        numbers = ["LOAD_VALUE"]
        names = ["LOAD_NAME", "STORE_NAME"]

        if instruction in numbers:
            argument = what_to_execute["numbers"][argument]
        elif instruction in names:
            argument = what_to_execute["names"][argument]

        return argument

    def run_code(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        for each_step in instructions:
            instruction, argument = each_step
            argument = self.parse_argument(instruction, argument, what_to_execute)

            if instruction == "LOAD_VALUE":
                self.LOAD_VALUE(argument)
            elif instruction == "ADD_TWO_VALUES":
                self.ADD_TWO_VALUES()
            elif instruction == "PRINT_ANSWER":
                self.PRINT_ANSWER()
            elif instruction == "STORE_NAME":
                self.STORE_NAME(argument)
            elif instruction == "LOAD_NAME":
                self.LOAD_NAME(argument)

即使只有五个指令,run_code方法也开始变得乏味。如果我们保持这种结构,就需要为每个指令添加一个if语句的分支。这里,我们可以利用Python的动态方法查找功能。我们将始终定义一个名为FOO的方法来执行名为FOO的指令,因此我们可以使用Python的getattr函数动态查找方法,而不是使用大的if语句。run_code方法看起来像这样

    def execute(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        for each_step in instructions:
            instruction, argument = each_step
            argument = self.parse_argument(instruction, argument, what_to_execute)
            bytecode_method = getattr(self, instruction)
            if argument is None:
                bytecode_method()
            else:
                bytecode_method(argument)

真实的Python字节码

在这一点上,我们将放弃我们玩具指令集,转而使用真正的Python字节码。字节码的结构类似于我们玩具解释器的冗长指令集,只是它使用一个字节而不是一个长名称来标识每个指令。为了理解这种结构,我们将逐步介绍一个简短函数的字节码。考虑以下示例

>>> def cond():
...     x = 3
...     if x < 5:
...         return 'yes'
...     else:
...         return 'no'
...

Python在运行时公开了许多内部信息,我们可以直接从REPL访问它们。对于函数对象condcond.__code__是与其关联的代码对象,而cond.__code__.co_code则是字节码。当你编写Python代码时,几乎没有理由直接使用这些属性,但它们确实允许我们进行各种操作——以及查看内部信息以便理解它们。

>>> cond.__code__.co_code  # the bytecode as raw bytes
b'd\x01\x00}\x00\x00|\x00\x00d\x02\x00k\x00\x00r\x16\x00d\x03\x00Sd\x04\x00Sd\x00
   \x00S'
>>> list(cond.__code__.co_code)  # the bytecode as numbers
[100, 1, 0, 125, 0, 0, 124, 0, 0, 100, 2, 0, 107, 0, 0, 114, 22, 0, 100, 3, 0, 83, 
 100, 4, 0, 83, 100, 0, 0, 83]

当我们只打印字节码时,它看起来难以理解——我们只能知道它是一系列字节。幸运的是,有一个强大的工具可以帮助我们理解它:Python标准库中的dis模块。

dis是一个字节码反汇编器。反汇编器将为机器编写的低级代码(如汇编代码或字节码)以人类可读的方式打印出来。当我们运行dis.dis时,它将输出它传递的字节码的解释。

>>> dis.dis(cond)
  2           0 LOAD_CONST               1 (3)
              3 STORE_FAST               0 (x)

  3           6 LOAD_FAST                0 (x)
              9 LOAD_CONST               2 (5)
             12 COMPARE_OP               0 (<)
             15 POP_JUMP_IF_FALSE       22

  4          18 LOAD_CONST               3 ('yes')
             21 RETURN_VALUE

  6     >>   22 LOAD_CONST               4 ('no')
             25 RETURN_VALUE
             26 LOAD_CONST               0 (None)
             29 RETURN_VALUE

这一切意味着什么?让我们以第一个指令LOAD_CONST为例。第一列中的数字(2)显示了我们Python源代码中的行号。第二列是字节码中的索引,告诉我们LOAD_CONST指令出现在位置零。第三列是指令本身,映射到其人类可读的名称。第四列(如果有)是该指令的参数。第五列(如果有)是关于参数含义的提示。

考虑此字节码的前几个字节:[100, 1, 0, 125, 0, 0]。这六个字节表示两个指令及其参数。我们可以使用dis.opname(一个字节到可理解字符串的映射)来找出100和125映射到哪些指令

>>> dis.opname[100]
'LOAD_CONST'
>>> dis.opname[125]
'STORE_FAST'

第二个和第三个字节——1, 0——是LOAD_CONST的参数,而第五个和第六个字节——0, 0——是STORE_FAST的参数。就像在我们的玩具示例中一样,LOAD_CONST需要知道在哪里找到要加载的常量,而STORE_FAST需要找到要存储的名称。(Python的LOAD_CONST与我们的玩具解释器的LOAD_VALUE相同,而LOAD_FASTLOAD_NAME相同。)因此,这六个字节表示代码的第一行,x = 3。(为什么每个参数使用两个字节?如果Python使用一个字节而不是两个字节来定位常量和名称,那么你只能将256个名称/常量与单个代码对象关联。使用两个字节,你可以拥有高达256的平方,也就是65,536个。)

条件语句和循环

到目前为止,解释器只是通过逐个执行指令来执行代码。这是一个问题;通常,我们希望多次执行某些指令,或者在某些条件下跳过它们。为了允许我们在代码中编写循环和if语句,解释器必须能够在指令集中跳转。从某种意义上说,Python使用字节码中的GOTO语句来处理循环和条件语句!再次查看函数cond的反汇编

>>> dis.dis(cond)
  2           0 LOAD_CONST               1 (3)
              3 STORE_FAST               0 (x)

  3           6 LOAD_FAST                0 (x)
              9 LOAD_CONST               2 (5)
             12 COMPARE_OP               0 (<)
             15 POP_JUMP_IF_FALSE       22

  4          18 LOAD_CONST               3 ('yes')
             21 RETURN_VALUE

  6     >>   22 LOAD_CONST               4 ('no')
             25 RETURN_VALUE
             26 LOAD_CONST               0 (None)
             29 RETURN_VALUE

代码第3行的条件语句if x < 5编译成四个指令:LOAD_FASTLOAD_CONSTCOMPARE_OPPOP_JUMP_IF_FALSEx < 5生成代码来加载x,加载5,并将这两个值进行比较。POP_JUMP_IF_FALSE指令负责实现if。此指令将从解释器的栈中弹出顶部的值。如果该值为真,则不会发生任何事情。(该值可以是“真值”——它不必是文字True对象。)如果该值为假,则解释器将跳到另一个指令。

要到达的指令称为跳转目标,它作为POP_JUMP指令的参数提供。这里,跳转目标是22。索引为22的指令是第6行的LOAD_CONST。(dis>>标记跳转目标。)如果x < 5的结果为假,则解释器将直接跳转到第6行(return "no"),跳过第4行(return "yes")。因此,解释器使用跳转指令有选择地跳过指令集的一部分。

Python循环也依赖于跳转。在下面的字节码中,请注意,第4行的while x < 5生成的字节码与if x < 10几乎相同。在这两种情况下,都会计算比较,然后POP_JUMP_IF_FALSE控制接下来执行的指令。在第4行末尾(循环体的末尾),指令JUMP_ABSOLUTE始终将解释器发送回循环顶部的指令9。当x < 5变为假时,POP_JUMP_IF_FALSE将解释器跳过循环末尾,到达指令34。

>>> def loop():
...      x = 1
...      while x < 5:
...          x = x + 1
...      return x
...
>>> dis.dis(loop)
  2           0 LOAD_CONST               1 (1)
              3 STORE_FAST               0 (x)

  3           6 SETUP_LOOP              26 (to 35)
        >>    9 LOAD_FAST                0 (x)
             12 LOAD_CONST               2 (5)
             15 COMPARE_OP               0 (<)
             18 POP_JUMP_IF_FALSE       34

  4          21 LOAD_FAST                0 (x)
             24 LOAD_CONST               1 (1)
             27 BINARY_ADD
             28 STORE_FAST               0 (x)
             31 JUMP_ABSOLUTE            9
        >>   34 POP_BLOCK

  5     >>   35 LOAD_FAST                0 (x)
             38 RETURN_VALUE

探索字节码

我建议你尝试在你编写的函数上运行dis.dis。一些可以探索的问题

框架

到目前为止,我们已经了解到Python虚拟机是一个栈机器。它在指令中步进和跳转,在栈上压入和弹出值。然而,我们的心理模型中还有一些空白。在上面的示例中,最后一个指令是RETURN_VALUE,它对应于代码中的return语句。但是指令返回到哪里呢?

为了回答这个问题,我们必须增加一层复杂性:框架。框架是代码块的信息和上下文的集合。框架在你的Python代码执行时被创建和销毁。每个函数的调用都对应一个框架——因此,虽然每个框架都与一个代码对象相关联,但一个代码对象可以有多个框架。如果你有一个函数递归调用自身十次,那么你将有十一个框架——每个递归级别一个,以及你从模块开始时的一个。通常,每个Python程序中的每个作用域都有一个框架。例如,每个模块、每个函数调用和每个类定义都有一个框架。

框架存在于调用栈上,这是与我们之前讨论的完全不同的栈。(调用栈是你最熟悉的栈——你在异常的回溯中看到了它。每个以“File 'program.py', line 10”开头的回溯行都对应于调用栈上的一个框架。)我们一直在检查的栈——解释器在执行字节码时操作的栈——我们称之为数据栈。还有一个第三个栈,称为块栈。块用于某些类型的控制流,尤其是循环和异常处理。调用栈上的每个框架都有自己的数据栈和块栈。

让我们用一个例子来说明这一点。假设Python解释器目前正在执行下面标记为3的行。解释器正在调用foo的中间,而foo又正在调用bar。该图显示了框架、块栈和数据栈的调用栈的示意图。(此代码像REPL会话一样编写,因此我们首先定义了所需的函数。)在我们感兴趣的时刻,解释器正在执行foo()(位于底部),然后进入foo的主体,然后进入bar

>>> def bar(y):
...     z = y + 3     # <--- (3) ... and the interpreter is here.
...     return z
...
>>> def foo():
...     a = 1
...     b = 2
...     return a + bar(b) # <--- (2) ... which is returning a call to bar ...
...
>>> foo()             # <--- (1) We're in the middle of a call to foo ...
3
Figure 12.2 - The call stack

图12.2 - 调用栈

在这一点上,解释器正在函数调用bar的中间。调用栈上有三个框架:一个用于模块级别,一个用于函数foo,一个用于bar (图12.2.) 一旦bar返回,与其关联的框架就会从调用栈中弹出并丢弃。

字节码指令RETURN_VALUE告诉解释器在框架之间传递一个值。首先,它将从调用栈顶部的框架的数据栈中弹出顶部的值。然后,它将整个框架从调用栈中弹出并丢弃。最后,该值被压入到下一个框架的数据栈中。

当我和Ned Batchelder在开发Byterun时,我们的实现中存在一个严重的错误。我们没有在每个框架上使用一个数据栈,而是在整个虚拟机上只有一个数据栈。我们有几十个测试,由我们通过Byterun和真正的Python解释器运行的Python代码片段组成,以确保这两个解释器都发生相同的事情。几乎所有这些测试都通过了。我们唯一无法正常运行的是生成器。最后,更仔细地阅读CPython代码后,我们意识到了错误2。将数据栈移动到每个框架上解决了这个问题。

回过头来看这个错误,我惊讶于Python中只有很少的操作依赖于每个框架拥有不同的数据栈。Python解释器中的几乎所有操作都会仔细清理数据栈,因此框架共享同一个栈并不重要。在上面的示例中,当bar完成执行时,它将清空其数据栈。即使foo共享同一个栈,这些值也会更低。然而,对于生成器来说,一个关键特性是能够暂停一个框架,返回到另一个框架,然后稍后返回到生成器框架,并使其处于与你离开时完全相同的状态。

Byterun

现在我们已经了解了足够的关于Python解释器的上下文信息,可以开始检查Byterun了。

Byterun中有四种类型的对象

VirtualMachine

每次程序运行时,只会创建一个VirtualMachine实例,因为我们只有一个 Python 解释器。VirtualMachine在帧之间传递时存储调用堆栈、异常状态和返回值。执行代码的入口点是run_code方法,该方法接受一个编译后的代码对象作为参数。它首先设置并运行一个帧。这个帧可能会创建其他帧;调用堆栈将在程序执行时增大和缩小。当第一个帧最终返回时,执行结束。

class VirtualMachineError(Exception):
    pass

class VirtualMachine(object):
    def __init__(self):
        self.frames = []   # The call stack of frames.
        self.frame = None  # The current frame.
        self.return_value = None
        self.last_exception = None

    def run_code(self, code, global_names=None, local_names=None):
        """ An entry point to execute code using the virtual machine."""
        frame = self.make_frame(code, global_names=global_names, 
                                local_names=local_names)
        self.run_frame(frame)

Frame

接下来,我们将编写Frame对象。帧是一个没有方法的属性集合。如上所述,这些属性包括编译器创建的代码对象;局部、全局和内置命名空间;对之前帧的引用;数据堆栈;块堆栈;以及执行的最后一条指令。(我们必须做一些额外的工作才能获得内置命名空间,因为 Python 在不同的模块中对这个命名空间的处理方式不同;这个细节对虚拟机来说并不重要。)

class Frame(object):
    def __init__(self, code_obj, global_names, local_names, prev_frame):
        self.code_obj = code_obj
        self.global_names = global_names
        self.local_names = local_names
        self.prev_frame = prev_frame
        self.stack = []
        if prev_frame:
            self.builtin_names = prev_frame.builtin_names
        else:
            self.builtin_names = local_names['__builtins__']
            if hasattr(self.builtin_names, '__dict__'):
                self.builtin_names = self.builtin_names.__dict__

        self.last_instruction = 0
        self.block_stack = []

接下来,我们将向虚拟机添加帧操作。帧有三个辅助函数:一个用于创建新帧(它负责为新帧整理命名空间),以及分别用于将帧推入和弹出帧堆栈的两个函数。第四个函数run_frame完成了执行帧的主要工作。我们很快就会回到这个问题。

class VirtualMachine(object):
    [... snip ...]

    # Frame manipulation
    def make_frame(self, code, callargs={}, global_names=None, local_names=None):
        if global_names is not None and local_names is not None:
            local_names = global_names
        elif self.frames:
            global_names = self.frame.global_names
            local_names = {}
        else:
            global_names = local_names = {
                '__builtins__': __builtins__,
                '__name__': '__main__',
                '__doc__': None,
                '__package__': None,
            }
        local_names.update(callargs)
        frame = Frame(code, global_names, local_names, self.frame)
        return frame

    def push_frame(self, frame):
        self.frames.append(frame)
        self.frame = frame

    def pop_frame(self):
        self.frames.pop()
        if self.frames:
            self.frame = self.frames[-1]
        else:
            self.frame = None

    def run_frame(self):
        pass
        # we'll come back to this shortly

Function

Function对象的实现有点复杂,大多数细节对于理解解释器并不重要。要注意的重要一点是,调用函数(调用__call__方法)会创建一个新的Frame对象并开始运行它。

class Function(object):
    """
    Create a realistic function object, defining the things the interpreter expects.
    """
    __slots__ = [
        'func_code', 'func_name', 'func_defaults', 'func_globals',
        'func_locals', 'func_dict', 'func_closure',
        '__name__', '__dict__', '__doc__',
        '_vm', '_func',
    ]

    def __init__(self, name, code, globs, defaults, closure, vm):
        """You don't need to follow this closely to understand the interpreter."""
        self._vm = vm
        self.func_code = code
        self.func_name = self.__name__ = name or code.co_name
        self.func_defaults = tuple(defaults)
        self.func_globals = globs
        self.func_locals = self._vm.frame.f_locals
        self.__dict__ = {}
        self.func_closure = closure
        self.__doc__ = code.co_consts[0] if code.co_consts else None

        # Sometimes, we need a real Python function.  This is for that.
        kw = {
            'argdefs': self.func_defaults,
        }
        if closure:
            kw['closure'] = tuple(make_cell(0) for _ in closure)
        self._func = types.FunctionType(code, globs, **kw)

    def __call__(self, *args, **kwargs):
        """When calling a Function, make a new frame and run it."""
        callargs = inspect.getcallargs(self._func, *args, **kwargs)
        # Use callargs to provide a mapping of arguments: values to pass into the new 
        # frame.
        frame = self._vm.make_frame(
            self.func_code, callargs, self.func_globals, {}
        )
        return self._vm.run_frame(frame)

def make_cell(value):
    """Create a real Python closure and grab a cell."""
    # Thanks to Alex Gaynor for help with this bit of twistiness.
    fn = (lambda x: lambda: x)(value)
    return fn.__closure__[0]

接下来,回到VirtualMachine对象,我们将为数据堆栈操作添加一些辅助方法。操作堆栈的字节码始终操作当前帧的数据堆栈。这将使我们对POP_TOPLOAD_FAST以及所有其他涉及堆栈的指令的实现更加易读。

class VirtualMachine(object):
    [... snip ...]

    # Data stack manipulation
    def top(self):
        return self.frame.stack[-1]

    def pop(self):
        return self.frame.stack.pop()

    def push(self, *vals):
        self.frame.stack.extend(vals)

    def popn(self, n):
        """Pop a number of values from the value stack.
        A list of `n` values is returned, the deepest value first.
        """
        if n:
            ret = self.frame.stack[-n:]
            self.frame.stack[-n:] = []
            return ret
        else:
            return []

在开始运行帧之前,我们还需要两个方法。

第一个是parse_byte_and_args,它接收一个字节码,检查它是否有参数,如果有参数则解析参数。此方法还会更新帧的属性last_instruction,它引用执行的最后一条指令。一条指令如果沒有参数,则长度为一个字节,如果有参数,则长度为三个字节;后两个字节是参数。每个指令参数的含义取决于它是什么指令。例如,如上所述,对于POP_JUMP_IF_FALSE,指令的参数是跳转目标。对于BUILD_LIST,它是列表中的元素数量。对于LOAD_CONST,它是在常量列表中的索引。

一些指令使用简单的数字作为参数。对于其他指令,虚拟机必须做一些工作来发现参数的含义。标准库中的dis模块公开了一个速查表,解释了参数的含义,这使得我们的代码更加简洁。例如,列表dis.hasname告诉我们,LOAD_NAMEIMPORT_NAMELOAD_GLOBAL以及其他九条指令的参数具有相同的含义:对于这些指令,参数表示代码对象上的名称列表中的索引。

class VirtualMachine(object):
    [... snip ...]

    def parse_byte_and_args(self):
        f = self.frame
        opoffset = f.last_instruction
        byteCode = f.code_obj.co_code[opoffset]
        f.last_instruction += 1
        byte_name = dis.opname[byteCode]
        if byteCode >= dis.HAVE_ARGUMENT:
            # index into the bytecode
            arg = f.code_obj.co_code[f.last_instruction:f.last_instruction+2]  
            f.last_instruction += 2   # advance the instruction pointer
            arg_val = arg[0] + (arg[1] * 256)
            if byteCode in dis.hasconst:   # Look up a constant
                arg = f.code_obj.co_consts[arg_val]
            elif byteCode in dis.hasname:  # Look up a name
                arg = f.code_obj.co_names[arg_val]
            elif byteCode in dis.haslocal: # Look up a local name
                arg = f.code_obj.co_varnames[arg_val]
            elif byteCode in dis.hasjrel:  # Calculate a relative jump
                arg = f.last_instruction + arg_val
            else:
                arg = arg_val
            argument = [arg]
        else:
            argument = []

        return byte_name, argument

下一个方法是dispatch,它查找给定指令的操作并执行它们。在 CPython 解释器中,这个分派是通过一个跨越 1,500 行的巨大 switch 语句完成的!幸运的是,由于我们正在编写 Python,所以我们可以更加简洁。我们将为每个字节名称定义一个方法,然后使用getattr查找它。就像上面的玩具解释器一样,如果我们的指令名为FOO_BAR,那么对应的函数将命名为byte_FOO_BAR。目前,我们将这些方法的内容作为一个黑盒。每个字节码方法将返回None或一个字符串,称为why,它是在某些情况下解释器需要的额外状态。这些单独的指令方法的返回值仅用作解释器状态的内部指标,不要将它们与执行帧的返回值混淆。

class VirtualMachine(object):
    [... snip ...]

    def dispatch(self, byte_name, argument):
        """ Dispatch by bytename to the corresponding methods.
        Exceptions are caught and set on the virtual machine."""

        # When later unwinding the block stack,
        # we need to keep track of why we are doing it.
        why = None
        try:
            bytecode_fn = getattr(self, 'byte_%s' % byte_name, None)
            if bytecode_fn is None:
                if byte_name.startswith('UNARY_'):
                    self.unaryOperator(byte_name[6:])
                elif byte_name.startswith('BINARY_'):
                    self.binaryOperator(byte_name[7:])
                else:
                    raise VirtualMachineError(
                        "unsupported bytecode type: %s" % byte_name
                    )
            else:
                why = bytecode_fn(*argument)
        except:
            # deal with exceptions encountered while executing the op.
            self.last_exception = sys.exc_info()[:2] + (None,)
            why = 'exception'

        return why

    def run_frame(self, frame):
        """Run a frame until it returns (somehow).
        Exceptions are raised, the return value is returned.
        """
        self.push_frame(frame)
        while True:
            byte_name, arguments = self.parse_byte_and_args()

            why = self.dispatch(byte_name, arguments)

            # Deal with any block management we need to do
            while why and frame.block_stack:
                why = self.manage_block_stack(why)

            if why:
                break

        self.pop_frame()

        if why == 'exception':
            exc, val, tb = self.last_exception
            e = exc(val)
            e.__traceback__ = tb
            raise e

        return self.return_value

Block

在我们实现每个字节码指令的方法之前,我们将简要讨论块。块用于某些类型的流程控制,特别是异常处理和循环。块负责确保在操作完成时数据堆栈处于适当的状态。例如,在循环中,一个特殊的迭代器对象在循环运行时保留在堆栈上,但在循环结束后被弹出。解释器必须跟踪循环是正在继续还是已经结束。

为了跟踪这部分额外信息,解释器设置了一个标志来指示它的状态。我们将这个标志实现为一个名为why的变量,它可以是None,也可以是字符串"continue""break""exception""return"之一。这表明应该对块堆栈和数据堆栈进行什么样的操作。回到迭代器示例,如果块堆栈的顶部是loop块,并且why代码是continue,那么迭代器对象应该保留在数据堆栈上,但是如果why代码是break,那么它应该被弹出。

块操作的具体细节相当复杂,我们不会在这方面花更多时间,但鼓励感兴趣的读者仔细阅读。

Block = collections.namedtuple("Block", "type, handler, stack_height")

class VirtualMachine(object):
    [... snip ...]

    # Block stack manipulation
    def push_block(self, b_type, handler=None):
        stack_height = len(self.frame.stack)
        self.frame.block_stack.append(Block(b_type, handler, stack_height))

    def pop_block(self):
        return self.frame.block_stack.pop()

    def unwind_block(self, block):
        """Unwind the values on the data stack corresponding to a given block."""
        if block.type == 'except-handler':
            # The exception itself is on the stack as type, value, and traceback.
            offset = 3  
        else:
            offset = 0

        while len(self.frame.stack) > block.level + offset:
            self.pop()

        if block.type == 'except-handler':
            traceback, value, exctype = self.popn(3)
            self.last_exception = exctype, value, traceback

    def manage_block_stack(self, why):
        """ """
        frame = self.frame
        block = frame.block_stack[-1]
        if block.type == 'loop' and why == 'continue':
            self.jump(self.return_value)
            why = None
            return why

        self.pop_block()
        self.unwind_block(block)

        if block.type == 'loop' and why == 'break':
            why = None
            self.jump(block.handler)
            return why

        if (block.type in ['setup-except', 'finally'] and why == 'exception'):
            self.push_block('except-handler')
            exctype, value, tb = self.last_exception
            self.push(tb, value, exctype)
            self.push(tb, value, exctype) # yes, twice
            why = None
            self.jump(block.handler)
            return why

        elif block.type == 'finally':
            if why in ('return', 'continue'):
                self.push(self.return_value)

            self.push(why)

            why = None
            self.jump(block.handler)
            return why
        return why

指令

剩下的就是实现数十个指令方法。实际的指令是解释器中最不有趣的部分,所以我们这里只展示了几个,但完整的实现可以在 GitHub 上找到。(这里包含了足够的指令来执行我们上面反汇编的所有代码示例。)

class VirtualMachine(object):
    [... snip ...]

    ## Stack manipulation

    def byte_LOAD_CONST(self, const):
        self.push(const)

    def byte_POP_TOP(self):
        self.pop()

    ## Names
    def byte_LOAD_NAME(self, name):
        frame = self.frame
        if name in frame.f_locals:
            val = frame.f_locals[name]
        elif name in frame.f_globals:
            val = frame.f_globals[name]
        elif name in frame.f_builtins:
            val = frame.f_builtins[name]
        else:
            raise NameError("name '%s' is not defined" % name)
        self.push(val)

    def byte_STORE_NAME(self, name):
        self.frame.f_locals[name] = self.pop()

    def byte_LOAD_FAST(self, name):
        if name in self.frame.f_locals:
            val = self.frame.f_locals[name]
        else:
            raise UnboundLocalError(
                "local variable '%s' referenced before assignment" % name
            )
        self.push(val)

    def byte_STORE_FAST(self, name):
        self.frame.f_locals[name] = self.pop()

    def byte_LOAD_GLOBAL(self, name):
        f = self.frame
        if name in f.f_globals:
            val = f.f_globals[name]
        elif name in f.f_builtins:
            val = f.f_builtins[name]
        else:
            raise NameError("global name '%s' is not defined" % name)
        self.push(val)

    ## Operators

    BINARY_OPERATORS = {
        'POWER':    pow,
        'MULTIPLY': operator.mul,
        'FLOOR_DIVIDE': operator.floordiv,
        'TRUE_DIVIDE':  operator.truediv,
        'MODULO':   operator.mod,
        'ADD':      operator.add,
        'SUBTRACT': operator.sub,
        'SUBSCR':   operator.getitem,
        'LSHIFT':   operator.lshift,
        'RSHIFT':   operator.rshift,
        'AND':      operator.and_,
        'XOR':      operator.xor,
        'OR':       operator.or_,
    }

    def binaryOperator(self, op):
        x, y = self.popn(2)
        self.push(self.BINARY_OPERATORS[op](x, y))

    COMPARE_OPERATORS = [
        operator.lt,
        operator.le,
        operator.eq,
        operator.ne,
        operator.gt,
        operator.ge,
        lambda x, y: x in y,
        lambda x, y: x not in y,
        lambda x, y: x is y,
        lambda x, y: x is not y,
        lambda x, y: issubclass(x, Exception) and issubclass(x, y),
    ]

    def byte_COMPARE_OP(self, opnum):
        x, y = self.popn(2)
        self.push(self.COMPARE_OPERATORS[opnum](x, y))

    ## Attributes and indexing

    def byte_LOAD_ATTR(self, attr):
        obj = self.pop()
        val = getattr(obj, attr)
        self.push(val)

    def byte_STORE_ATTR(self, name):
        val, obj = self.popn(2)
        setattr(obj, name, val)

    ## Building

    def byte_BUILD_LIST(self, count):
        elts = self.popn(count)
        self.push(elts)

    def byte_BUILD_MAP(self, size):
        self.push({})

    def byte_STORE_MAP(self):
        the_map, val, key = self.popn(3)
        the_map[key] = val
        self.push(the_map)

    def byte_LIST_APPEND(self, count):
        val = self.pop()
        the_list = self.frame.stack[-count] # peek
        the_list.append(val)

    ## Jumps

    def byte_JUMP_FORWARD(self, jump):
        self.jump(jump)

    def byte_JUMP_ABSOLUTE(self, jump):
        self.jump(jump)

    def byte_POP_JUMP_IF_TRUE(self, jump):
        val = self.pop()
        if val:
            self.jump(jump)

    def byte_POP_JUMP_IF_FALSE(self, jump):
        val = self.pop()
        if not val:
            self.jump(jump)

    ## Blocks

    def byte_SETUP_LOOP(self, dest):
        self.push_block('loop', dest)

    def byte_GET_ITER(self):
        self.push(iter(self.pop()))

    def byte_FOR_ITER(self, jump):
        iterobj = self.top()
        try:
            v = next(iterobj)
            self.push(v)
        except StopIteration:
            self.pop()
            self.jump(jump)

    def byte_BREAK_LOOP(self):
        return 'break'

    def byte_POP_BLOCK(self):
        self.pop_block()

    ## Functions

    def byte_MAKE_FUNCTION(self, argc):
        name = self.pop()
        code = self.pop()
        defaults = self.popn(argc)
        globs = self.frame.f_globals
        fn = Function(name, code, globs, defaults, None, self)
        self.push(fn)

    def byte_CALL_FUNCTION(self, arg):
        lenKw, lenPos = divmod(arg, 256) # KWargs not supported here
        posargs = self.popn(lenPos)

        func = self.pop()
        frame = self.frame
        retval = func(*posargs)
        self.push(retval)

    def byte_RETURN_VALUE(self):
        self.return_value = self.pop()
        return "return"

动态类型:编译器不知道什么

你可能听说过 Python 是一种“动态”语言,特别是“动态类型”的语言。我们到目前为止所做的工作揭示了这个描述的一些内容。

在这个上下文中,“动态”的含义之一是,许多工作是在运行时完成的。我们之前看到,Python 编译器对代码的实际功能了解不多。例如,考虑下面的简短函数modmod接收两个参数,并返回第一个参数对第二个参数取模的结果。在字节码中,我们看到变量ab被加载,然后字节码BINARY_MODULO执行模运算本身。

>>> def mod(a, b):
...    return a % b
>>> dis.dis(mod)
  2           0 LOAD_FAST                0 (a)
              3 LOAD_FAST                1 (b)
              6 BINARY_MODULO
              7 RETURN_VALUE
>>> mod(19, 5)
4

计算 19 % 5 得出 4——这并不奇怪。如果我们用不同的参数调用它会发生什么?

>>> mod("by%sde", "teco")
'bytecode'

刚刚发生了什么?你可能之前在不同的上下文中见过这种语法

>>> print("by%sde" % "teco")
bytecode

使用符号%来格式化要打印的字符串意味着调用指令BINARY_MODULO。当指令执行时,这条指令对堆栈上的前两个值取模——无论它们是字符串、整数,还是你自己定义的类的实例。字节码是在编译函数时(实际上是在定义函数时)生成的,并且相同的字节码被用于不同类型的参数。

Python 编译器对字节码的影响了解相对较少。由解释器来确定BINARY_MODULO正在操作的对象的类型,并为该类型执行正确的事情。这就是为什么 Python 被描述为动态类型的原因:直到你实际运行它,你才知道这个函数的参数类型。相比之下,在静态类型语言中,程序员提前告诉编译器参数将是什么类型(或者编译器自己推断出来)。

编译器的无知是优化 Python 或静态分析 Python 的挑战之一——仅仅查看字节码,而不实际运行代码,你就不知道每条指令会做什么!实际上,你可以定义一个实现了__mod__方法的类,如果你的对象使用%,Python 就会调用那个方法。所以BINARY_MODULO可以运行任何代码!

只看下面的代码,第一次计算a % b似乎是浪费的。

def mod(a,b):
    a % b
    return a %b

不幸的是,对这段代码的静态分析——你可以在不运行它的情况下进行的分析——不能确定第一个a % b确实什么也没做。用%调用__mod__可能会写入文件,或者与程序的另一个部分交互,或者做 Python 中可能做的任何其他事情。当你不知道一个函数做了什么时,很难优化它!在 Russell Power 和 Alex Rubinsteyn 的一篇很棒的论文“我们能用多快的速度使 Python 解释执行?”中,他们指出:“在普遍缺乏类型信息的情况下,每条指令都必须被视为INVOKE_ARBITRARY_METHOD。”

结论

Byterun 是一个紧凑的 Python 解释器,比 CPython 更容易理解。Byterun 复制了 CPython 的主要结构细节:一个基于堆栈的解释器,它对称为字节码的指令集进行操作。它步进或跳转这些指令,将数据推入和弹出数据堆栈。解释器在调用和从函数和生成器中返回时创建、销毁和跳过帧。Byterun 也共享真实解释器的局限性:因为 Python 使用动态类型,所以解释器必须在运行时努力确定程序的正确行为。

我鼓励你反汇编你自己的程序,并使用 Byterun 运行它们。你很快就会遇到这个简短版本的 Byterun 没有实现的指令。完整的实现可以在 https://github.com/nedbat/byterun 找到——或者,通过仔细阅读真实 CPython 解释器的ceval.c,你可以自己实现它!

致谢

感谢 Ned Batchelder 发起这个项目并指导我的贡献,感谢 Michael Arntzenius 帮助我调试代码和编辑文稿,感谢 Leta Montopoli 的编辑,感谢整个 Recurse Center 社区提供的支持和兴趣。任何错误都是我自己的。

  1. 本章基于 Python 3.5 或更早版本生成的字节码,因为 Python 3.6 中的字节码规范有一些变化。

  2. 感谢 Michael Arntzenius 对这个错误的见解。