500 行或更少
流水车间调度器

克里斯蒂安·穆斯

克里斯蒂安·穆斯博士MERS 团队麻省理工学院 CSAIL 的研究员。他对各种主题感兴趣,包括人工智能、数据驱动项目、映射、图论和数据可视化,以及凯尔特音乐、雕刻、足球和咖啡。

流水车间调度器

流水车间调度 是运筹学中最具挑战性和研究最充分的问题之一。像许多具有挑战性的优化问题一样,对于实际规模的问题,找到最佳解决方案是不可能的。在本章中,我们将考虑流水车间调度求解器的实现,该求解器使用一种称为局部搜索的技术。局部搜索允许我们在无法找到最佳解决方案的情况下找到一个“相当好”的解决方案。求解器将尝试在给定时间内找到问题的新的解决方案,并通过返回找到的最佳解决方案来结束。

局部搜索背后的理念是通过考虑可能略微更好的类似解决方案来启发式地改进现有解决方案。求解器使用各种策略来 (1) 尝试找到类似的解决方案,以及 (2) 选择一个有希望的解决方案作为下一个探索对象。该实现是用 Python 编写的,没有外部要求。通过利用 Python 一些鲜为人知的功能,求解器根据哪些策略效果好,在求解过程中动态地改变其搜索策略。

首先,我们将提供有关流水车间调度问题和局部搜索技术的背景材料。然后我们将详细介绍通用求解器代码以及我们使用的各种启发式算法和邻域选择策略。接下来,我们将考虑求解器用来将所有内容联系在一起的动态策略选择。最后,我们将总结该项目,并总结在实现过程中学到的一些经验教训。

背景

流水车间调度

流水车间调度问题是一个优化问题,我们必须确定作业中各种任务的处理时间,以便安排这些任务以最大限度地减少完成作业所需的总时间。例如,一家汽车制造商有一个装配线,汽车的每个部件都在不同的机器上依次完成。不同的订单可能具有自定义要求,例如,使车身喷漆的任务从一辆车到另一辆车有所不同。在我们的示例中,每辆车都是一个新的作业,而汽车的每个部件都称为任务。每个作业都将具有相同的任务完成顺序。

流水车间调度的目标是最小化处理完所有作业中所有任务所需的总时间。(通常,这个总时间被称为完工时间。)这个问题有很多应用,但与优化生产设施最相关。

每个流水车间问题都包含 \(n\) 台机器和 \(m\) 个作业。在我们的汽车示例中,将有 \(n\) 个工作站来处理汽车,并且总共需要制造 \(m\) 辆汽车。每个作业都由正好 \(n\) 个任务组成,我们可以假设作业的第 \(i\) 个任务必须使用机器 \(i\),并且需要预先确定的处理时间:\(p(j,i)\) 是作业 \(j\) 的第 \(i\) 个任务的处理时间。此外,任何给定作业的任务顺序应遵循可用机器的顺序;对于给定的作业,任务 \(i\) 必须在任务 \(i+1\) 开始之前完成。在我们的汽车示例中,我们不想在框架组装之前开始喷漆。最后的限制是,不能在同一台机器上同时处理两个任务。

由于作业内任务的顺序是预先确定的,因此流水车间调度问题的解决方案可以用作业的排列来表示。作业在机器上的处理顺序对每台机器都是相同的,并且给定一个排列,机器 \(i\) 上作业 \(j\) 的任务被安排为以下两种可能性中较晚者

  1. 机器 \(i\) 上作业 \(j-1\) 的任务完成时间(即同一机器上的最新任务),或

  2. 机器 \(i-1\) 上作业 \(j\) 的任务完成时间(即同一作业上的最新任务)

由于我们选择这两个值的较大值,因此机器 \(i\) 或作业 \(j\) 将产生闲置时间。我们最终想要最小化这种闲置时间,因为它会导致总完工时间变大。

由于问题的形式简单,任何作业的排列都是有效的解决方案,最佳解决方案将对应于某些排列。因此,我们通过改变作业的排列并测量相应的完工时间来寻找改进的解决方案。在下文中,我们将作业的排列称为候选

让我们考虑一个简单的示例,包含两个作业和两台机器。第一个作业包含任务 \(\mathbf{A}\)\(\mathbf{B}\),分别需要 1 分钟和 2 分钟才能完成。第二个作业包含任务 \(\mathbf{C}\)\(\mathbf{D}\),分别需要 2 分钟和 1 分钟才能完成。回想一下,\(\mathbf{A}\) 必须在 \(\mathbf{B}\) 之前,而 \(\mathbf{C}\) 必须在 \(\mathbf{D}\) 之前。由于有两个作业,因此我们只有两个排列需要考虑。如果我们将作业 2 排在作业 1 之前,则完工时间为 5 (图 9.1);另一方面,如果我们将作业 1 排在作业 2 之前,则完工时间仅为 4 (图 9.2).

Figure 9.1 - Flow Shop Example 1

图 9.1 - 流水车间示例 1

Figure 9.2 - Flow Shop Example 2

图 9.2 - 流水车间示例 2

请注意,没有任何空间可以将任何任务提前。良好排列的指导原则是最小化任何机器没有任务可处理的时间。

局部搜索是一种解决优化问题的策略,当最佳解决方案难以计算时。直观地说,它从一个看起来相当好的解决方案移动到另一个看起来更好的解决方案。与其将每个可能的解决方案都视为下一个需要重点关注的候选者,我们定义了所谓的邻域:被认为与当前解决方案类似的解决方案集。由于作业的任何排列都是有效的解决方案,我们可以将任何围绕作业进行重新排列的机制视为局部搜索过程(这实际上是我们下面要做的)。

为了正式使用局部搜索,我们必须回答一些问题

  1. 我们应该从哪个解决方案开始?
  2. 给定一个解决方案,我们应该考虑哪些相邻的解决方案?
  3. 给定一组候选邻居,我们应该考虑将哪个邻居作为下一个邻居?

以下三个部分将依次回答这些问题。

通用求解器

在本节中,我们将提供流水车间调度器的通用框架。首先,我们拥有必要的 Python 导入和求解器的设置

import sys, os, time, random

from functools import partial
from collections import namedtuple
from itertools import product

import neighbourhood as neigh
import heuristics as heur

##############
## Settings ##
##############
TIME_LIMIT = 300.0 # Time (in seconds) to run the solver
TIME_INCREMENT = 13.0 # Time (in seconds) in between heuristic measurements
DEBUG_SWITCH = False # Displays intermediate heuristic info when True
MAX_LNS_NEIGHBOURHOODS = 1000 # Maximum number of neighbours to explore in LNS

有两种设置需要进一步解释。TIME_INCREMENT 设置将用作动态策略选择的一部分,而 MAX_LNS_NEIGHBOURHOODS 设置将用作邻域选择策略的一部分。下面将对两者进行更详细的描述。

这些设置可以作为命令行参数公开给用户,但在这一阶段,我们改为将输入数据作为程序的参数提供。输入问题(来自 Taillard 基准集的问题)假定采用流水车间调度的标准格式。以下代码用作求解器文件的 __main__ 方法,并根据输入到程序的参数数量调用相应的函数

if __name__ == '__main__':

    if len(sys.argv) == 2:
        data = parse_problem(sys.argv[1], 0)
    elif len(sys.argv) == 3:
        data = parse_problem(sys.argv[1], int(sys.argv[2]))
    else:
        print "\nUsage: python flow.py <Taillard problem file> [<instance number>]\n"
        sys.exit(0)

    (perm, ms) = solve(data)
    print_solution(data, perm)

我们将在稍后描述 Taillard 问题文件的解析。(这些文件可以在 网上获取。)

solve 方法期望 data 变量是包含每个作业活动持续时间的整数列表。solve 方法首先初始化一个全局策略集(将在下面描述)。关键是我们使用 strat_* 变量来维护每个策略的统计信息。这有助于在求解过程中动态地选择策略。

def solve(data):
    """Solves an instance of the flow shop scheduling problem"""

    # We initialize the strategies here to avoid cyclic import issues
    initialize_strategies()
    global STRATEGIES

    # Record the following for each strategy:
    #  improvements: The amount a solution was improved by this strategy
    #  time_spent: The amount of time spent on the strategy
    #  weights: The weights that correspond to how good a strategy is
    #  usage: The number of times we use a strategy
    strat_improvements = {strategy: 0 for strategy in STRATEGIES}
    strat_time_spent = {strategy: 0 for strategy in STRATEGIES}
    strat_weights = {strategy: 1 for strategy in STRATEGIES}
    strat_usage = {strategy: 0 for strategy in STRATEGIES}

流水车间调度问题的一个吸引人的特征是,每个排列都是有效的解决方案,至少有一个将具有最佳完工时间(尽管许多将具有糟糕的完工时间)。谢天谢地,这使我们在从一个排列转到另一个排列时可以免去检查我们是否停留在可行解决方案空间内——一切都可行!

但是,为了在排列空间中启动局部搜索,我们必须有一个初始排列。为了保持简单,我们通过随机地打乱作业列表来播种我们的局部搜索

    # Start with a random permutation of the jobs
    perm = range(len(data))
    random.shuffle(perm)

接下来,我们初始化允许我们跟踪到目前为止找到的最佳排列的变量,以及用于提供输出的计时信息。

    # Keep track of the best solution
    best_make = makespan(data, perm)
    best_perm = perm
    res = best_make

    # Maintain statistics and timing for the iterations
    iteration = 0
    time_limit = time.time() + TIME_LIMIT
    time_last_switch = time.time()

    time_delta = TIME_LIMIT / 10
    checkpoint = time.time() + time_delta
    percent_complete = 10

    print "\nSolving..."

由于这是一个局部搜索求解器,因此只要时间限制没有达到,我们就简单地继续尝试改进解决方案。我们提供指示求解器进度并跟踪我们计算的迭代次数的输出

    while time.time() < time_limit:

        if time.time() > checkpoint:
            print " %d %%" % percent_complete
            percent_complete += 10
            checkpoint += time_delta

        iteration += 1

下面我们将描述如何选择策略,但现在知道策略提供一个 neighbourhood 函数和一个 heuristic 函数就足够了。前者为我们提供了一组下一个候选者来考虑,而后者则从该集合中选择最佳候选者。从这些函数中,我们得到了一个新的排列(perm)和一个新的完工时间结果(res

        # Heuristically choose the best strategy
        strategy = pick_strategy(STRATEGIES, strat_weights)

        old_val = res
        old_time = time.time()

        # Use the current strategy's heuristic to pick the next permutation from
        # the set of candidates generated by the strategy's neighbourhood
        candidates = strategy.neighbourhood(data, perm)
        perm = strategy.heuristic(data, candidates)
        res = makespan(data, perm)

计算完工时间的代码非常简单:我们可以通过评估最后作业完成的时间来从排列中计算它。我们将看到 compile_solution 是如何工作的,但现在知道返回一个二维数组,并且 [-1][-1] 处的元素对应于调度中最后作业的开始时间就足够了

def makespan(data, perm):
    """Computes the makespan of the provided solution"""
    return compile_solution(data, perm)[-1][-1] + data[perm[-1]][-1]

为了帮助选择策略,我们对 (1) 策略改进解决方案的程度、(2) 策略花费在计算信息上的时间,以及 (3) 使用策略的次数保持统计信息。如果我们偶然发现了一个更好的解决方案,我们也会更新最佳排列的变量

        # Record the statistics on how the strategy did
        strat_improvements[strategy] += res - old_val
        strat_time_spent[strategy] += time.time() - old_time
        strat_usage[strategy] += 1

        if res < best_make:
            best_make = res
            best_perm = perm[:]

在定期间隔内,将更新策略使用情况的统计信息。为了提高可读性,我们删除了相关的代码段,并在下面详细介绍了该代码。作为最后一步,一旦 while 循环完成(即时间限制达到),我们将输出有关求解过程的一些统计信息,并返回最佳排列及其完工时间

    print " %d %%\n" % percent_complete
    print "\nWent through %d iterations." % iteration

    print "\n(usage) Strategy:"
    results = sorted([(strat_weights[STRATEGIES[i]], i)
                      for i in range(len(STRATEGIES))], reverse=True)
    for (w, i) in results:
        print "(%d) \t%s" % (strat_usage[STRATEGIES[i]], STRATEGIES[i].name)

    return (best_perm, best_make)

解析问题

作为解析过程的输入,我们提供包含输入的文件名和要使用的示例编号。(每个文件包含多个实例。)

def parse_problem(filename, k=1):
    """Parse the kth instance of a Taillard problem file

    The Taillard problem files are a standard benchmark set for the problem
    of flow shop scheduling. 

    print "\nParsing..."

我们从读取文件并识别每个问题实例之间的分隔行开始

    with open(filename, 'r') as f:
        # Identify the string that separates instances
        problem_line = ('/number of jobs, number of machines, initial seed, '
                        'upper bound and lower bound :/')

        # Strip spaces and newline characters from every line
        lines = map(str.strip, f.readlines())

为了使定位正确的实例变得更容易,我们假设行将以 '/' 字符分隔。这使我们能够基于出现在每个实例顶部的公共字符串来拆分文件,并在第一行的开头添加 '/' 字符,使下面的字符串处理能够根据我们选择的实例正确地工作,无论我们选择哪个实例。我们还会检测到给定的实例编号是否超出文件中找到的实例集合的范围。

        # We prep the first line for later
        lines[0] = '/' + lines[0]

        # We also know '/' does not appear in the files, so we can use it as
        #  a separator to find the right lines for the kth problem instance
        try:
            lines = '/'.join(lines).split(problem_line)[k].split('/')[2:]
        except IndexError:
            max_instances = len('/'.join(lines).split(problem_line)) - 1
            print "\nError: Instance must be within 1 and %d\n" % max_instances
            sys.exit(0)

我们直接解析数据,将每个任务的处理时间转换为整数并存储在列表中。最后,我们对数据进行压缩以反转行和列,使格式符合上述求解代码的预期。(data 中的每个项目都应该对应于特定作业。)

        # Split every line based on spaces and convert each item to an int
        data = [map(int, line.split()) for line in lines]

    # We return the zipped data to rotate the rows and columns, making each
    #  item in data the durations of tasks for a particular job
    return zip(*data)

编译解决方案

流水车间调度问题的解决方案包括每个作业中每个任务的精确时间安排。因为我们用作业排列隐式地表示解决方案,所以我们引入了compile_solution 函数来将排列转换为精确时间。作为输入,该函数接收问题的相关数据(提供每个任务的持续时间)以及作业排列。

该函数首先初始化用于存储每个任务开始时间的数据结构,然后包含排列中第一个作业的任务。

def compile_solution(data, perm):
    """Compiles a scheduling on the machines given a permutation of jobs"""

    num_machines = len(data[0])

    # Note that using [[]] * m would be incorrect, as it would simply
    #  copy the same list m times (as opposed to creating m distinct lists).
    machine_times = [[] for _ in range(num_machines)]

    # Assign the initial job to the machines
    machine_times[0].append(0)
    for mach in range(1,num_machines):
        # Start the next task in the job when the previous finishes
        machine_times[mach].append(machine_times[mach-1][0] +
                                   data[perm[0]][mach-1])

然后,我们添加所有剩余作业的任务。作业中的第一个任务总是会在前一个作业中的第一个任务完成后立即开始。对于剩余任务,我们尽早安排作业:在同一作业中前一个任务的完成时间和同一台机器上前一个任务的完成时间中取最大值。

    # Assign the remaining jobs
    for i in range(1, len(perm)):

        # The first machine never contains any idle time
        job = perm[i]
        machine_times[0].append(machine_times[0][-1] + data[perm[i-1]][0])

        # For the remaining machines, the start time is the max of when the
        #  previous task in the job completed, or when the current machine
        #  completes the task for the previous job.
        for mach in range(1, num_machines):
            machine_times[mach].append(max(
                machine_times[mach-1][i] + data[perm[i]][mach-1],
                machine_times[mach][i-1] + data[perm[i-1]][mach]))

    return machine_times

打印解决方案

当求解过程完成后,程序将以紧凑的形式输出有关解决方案的信息。我们不会提供每个作业中每个任务的精确时间,而是输出以下信息

  1. 产生最佳完工时间的作业排列
  2. 排列计算出的完工时间
  3. 每台机器的开始时间、完成时间和空闲时间
  4. 每个作业的开始时间、完成时间和空闲时间

作业或机器的开始时间对应于作业中或机器上的第一个任务的开始。类似地,作业或机器的完成时间对应于作业中或机器上的最后一个任务的结束。空闲时间是特定作业或机器在任务之间的时间差。理想情况下,我们希望减少空闲时间,因为这将意味着总体处理时间也会减少。

编译解决方案的代码(即,计算每个任务的开始时间)已经讨论过,输出排列和完工时间是微不足道的

def print_solution(data, perm):
    """Prints statistics on the computed solution"""

    sol = compile_solution(data, perm)

    print "\nPermutation: %s\n" % str([i+1 for i in perm])

    print "Makespan: %d\n" % makespan(data, perm)

接下来,我们使用 Python 中的字符串格式化功能来打印每台机器和每个作业的开始、结束和空闲时间的表格。请注意,作业的空闲时间是从作业开始到完成的时间,减去作业中每个任务的处理时间之和。我们以类似的方式计算机器的空闲时间。

    row_format ="{:>15}" * 4
    print row_format.format('Machine', 'Start Time', 'Finish Time', 'Idle Time')
    for mach in range(len(data[0])):
        finish_time = sol[mach][-1] + data[perm[-1]][mach]
        idle_time = (finish_time - sol[mach][0]) - sum([job[mach] for job in data])
        print row_format.format(mach+1, sol[mach][0], finish_time, idle_time)

    results = []
    for i in range(len(data)):
        finish_time = sol[-1][i] + data[perm[i]][-1]
        idle_time = (finish_time - sol[0][i]) - sum([time for time in data[perm[i]]])
        results.append((perm[i]+1, sol[0][i], finish_time, idle_time))

    print "\n"
    print row_format.format('Job', 'Start Time', 'Finish Time', 'Idle Time')
    for r in sorted(results):
        print row_format.format(*r)

    print "\n\nNote: Idle time does not include initial or final wait time.\n"

邻域

局部搜索背后的思想是从一个解决方案局部移动到附近其他解决方案。我们将给定解决方案的邻域称为与其相邻的其他解决方案。在本节中,我们详细介绍了四个潜在的邻域,每个邻域的复杂度都在增加。

第一个邻域产生给定数量的随机排列。该邻域甚至没有考虑我们开始的解决方案,因此“邻域”这个词有点言过其实。然而,在搜索中加入一些随机性是良好的做法,因为它促进了对搜索空间的探索。

def neighbours_random(data, perm, num = 1):
    # Returns <num> random job permutations, including the current one
    candidates = [perm]
    for i in range(num):
        candidate = perm[:]
        random.shuffle(candidate)
        candidates.append(candidate)
    return candidates

对于下一个邻域,我们考虑在排列中交换任意两个作业。通过使用itertools 包中的combinations 函数,我们可以轻松地遍历每对索引,并创建一个对应于交换位于每个索引处的作业的新排列。从某种意义上说,该邻域创建了与我们开始的排列非常相似的排列。

def neighbours_swap(data, perm):
    # Returns the permutations corresponding to swapping every pair of jobs
    candidates = [perm]
    for (i,j) in combinations(range(len(perm)), 2):
        candidate = perm[:]
        candidate[i], candidate[j] = candidate[j], candidate[i]
        candidates.append(candidate)
    return candidates

我们考虑的下一个邻域使用针对问题本身的信息。我们找到空闲时间最长的作业,并考虑以所有可能的方式交换它们。我们接受一个值size,它是我们考虑的作业数量:size 个空闲时间最长的作业。该过程的第一步是计算排列中每个作业的空闲时间

def neighbours_idle(data, perm, size=4):
    # Returns the permutations of the <size> most idle jobs
    candidates = [perm]

    # Compute the idle time for each job
    sol = flow.compile_solution(data, perm)
    results = []

    for i in range(len(data)):
        finish_time = sol[-1][i] + data[perm[i]][-1]
        idle_time = (finish_time - sol[0][i]) - sum([t for t in data[perm[i]]])
        results.append((idle_time, i))

接下来,我们计算空闲时间最长的size 个作业的列表。

    # Take the <size> most idle jobs
    subset = [job_ind for (idle, job_ind) in reversed(sorted(results))][:size]

最后,我们通过考虑我们已识别出的空闲时间最长的作业的每个排列来构建邻域。为了找到排列,我们利用itertools 包中的permutations 函数。

    # Enumerate the permutations of the idle jobs
    for ordering in permutations(subset):
        candidate = perm[:]
        for i in range(len(ordering)):
            candidate[subset[i]] = perm[ordering[i]]
        candidates.append(candidate)

    return candidates

我们考虑的最后一个邻域通常被称为大邻域搜索 (LNS)。直观地说,LNS 通过考虑当前排列的小子集来隔离工作——找到子集作业的最佳排列可以为我们提供 LNS 邻域的单个候选者。通过对特定大小的几个(或所有)子集重复此过程,我们可以增加邻域中候选者的数量。我们通过MAX_LNS_NEIGHBOURHOODS 参数限制了被考虑的数量,因为邻居的数量可以非常快地增长。LNS 计算的第一步是使用itertools 包的combinations 函数计算我们将考虑交换的作业集的随机列表

def neighbours_LNS(data, perm, size = 2):
    # Returns the Large Neighbourhood Search neighbours
    candidates = [perm]

    # Bound the number of neighbourhoods in case there are too many jobs
    neighbourhoods = list(combinations(range(len(perm)), size))
    random.shuffle(neighbourhoods)

接下来,我们遍历子集以在每个子集中找到最佳的作业排列。我们已经在上面看到了遍历空闲时间最长的作业的所有排列的类似代码。这里的主要区别在于我们只记录子集的最佳排列,因为更大的邻域是通过为考虑的作业的每个子集选择一个排列来构建的。

    for subset in neighbourhoods[:flow.MAX_LNS_NEIGHBOURHOODS]:

        # Keep track of the best candidate for each neighbourhood
        best_make = flow.makespan(data, perm)
        best_perm = perm

        # Enumerate every permutation of the selected neighbourhood
        for ordering in permutations(subset):
            candidate = perm[:]
            for i in range(len(ordering)):
                candidate[subset[i]] = perm[ordering[i]]
            res = flow.makespan(data, candidate)
            if res < best_make:
                best_make = res
                best_perm = candidate

        # Record the best candidate as part of the larger neighbourhood
        candidates.append(best_perm)

    return candidates

如果我们将size 参数设置为等于作业的数量,那么将考虑每个排列并选择最佳排列。然而,在实践中,我们需要将子集的大小限制在 3 或 4 左右;任何更大的数字都会导致neighbours_LNS 函数花费过多的时间。

启发式算法

启发式算法从一组提供的候选者中返回单个候选排列。启发式算法还能够访问问题数据以评估哪些候选者可能更可取。

我们考虑的第一个启发式算法是heur_random。该启发式算法从列表中随机选择一个候选者,而不会评估哪个候选者可能更可取

def heur_random(data, candidates):
    # Returns a random candidate choice
    return random.choice(candidates)

下一个启发式算法heur_hillclimbing 使用了另一个极端。它不是随机选择一个候选者,而是选择具有最佳完工时间的候选者。请注意,列表scores 将包含(make,perm) 形式的元组,其中make 是排列perm 的完工时间值。对这样的列表进行排序会将具有最佳完工时间的元组放在列表的开头;从该元组中,我们返回排列。

def heur_hillclimbing(data, candidates):
    # Returns the best candidate in the list
    scores = [(flow.makespan(data, perm), perm) for perm in candidates]
    return sorted(scores)[0][1]

最后的启发式算法heur_random_hillclimbing 组合了上述随机和爬山启发式算法。在执行局部搜索时,您可能并不总是希望选择一个随机候选者,甚至是不选择最佳候选者。heur_random_hillclimbing 启发式算法通过以 0.5 的概率选择最佳候选者,然后以 0.25 的概率选择第二佳候选者,以此类推,返回一个“相当好”的解决方案。while 循环本质上在每次迭代时抛掷硬币,以查看是否应该继续增加索引(并限制列表的大小)。最终选择的索引对应于启发式算法选择的候选者。

def heur_random_hillclimbing(data, candidates):
    # Returns a candidate with probability proportional to its rank in sorted quality
    scores = [(flow.makespan(data, perm), perm) for perm in candidates]
    i = 0
    while (random.random() < 0.5) and (i < len(scores) - 1):
        i += 1
    return sorted(scores)[i][1]

因为完工时间是我们试图优化的标准,所以爬山法会将局部搜索过程引导到具有更好完工时间的解决方案。引入随机性允许我们探索邻域,而不是在每一步都盲目地走向看起来最好的解决方案。

动态策略选择

局部搜索良好排列的核心是使用特定的启发式算法和邻域函数从一个解决方案跳到另一个解决方案。我们如何在一组选项中选择一个?在实践中,在搜索过程中切换策略通常会有所帮助。我们使用的动态策略选择将在启发式算法和邻域函数的组合之间切换,尝试动态地切换到那些效果最好的策略。对我们来说,策略是启发式算法和邻域函数的特定配置(包括其参数值)。

首先,我们的代码构建了我们在求解过程中想要考虑的策略范围。在策略初始化中,我们使用functools 包中的partial 函数来部分分配每个邻域的参数。此外,我们构造一个启发式算法函数列表,最后我们使用乘积运算符将每个邻域和启发式算法函数的组合作为新的策略添加。

################
## Strategies ##
#################################################
## A strategy is a particular configuration
##  of neighbourhood generator (to compute
##  the next set of candidates) and heuristic
##  computation (to select the best candidate).
##

STRATEGIES = []

# Using a namedtuple is a little cleaner than using dictionaries.
#  E.g., strategy['name'] versus strategy.name
Strategy = namedtuple('Strategy', ['name', 'neighbourhood', 'heuristic'])

def initialize_strategies():

    global STRATEGIES

    # Define the neighbourhoods (and parameters) that we would like to use
    neighbourhoods = [
        ('Random Permutation', partial(neigh.neighbours_random, num=100)),
        ('Swapped Pairs', neigh.neighbours_swap),
        ('Large Neighbourhood Search (2)', partial(neigh.neighbours_LNS, size=2)),
        ('Large Neighbourhood Search (3)', partial(neigh.neighbours_LNS, size=3)),
        ('Idle Neighbourhood (3)', partial(neigh.neighbours_idle, size=3)),
        ('Idle Neighbourhood (4)', partial(neigh.neighbours_idle, size=4)),
        ('Idle Neighbourhood (5)', partial(neigh.neighbours_idle, size=5))
    ]

    # Define the heuristics that we would like to use
    heuristics = [
        ('Hill Climbing', heur.heur_hillclimbing),
        ('Random Selection', heur.heur_random),
        ('Biased Random Selection', heur.heur_random_hillclimbing)
    ]

    # Combine every neighbourhood and heuristic strategy
    for (n, h) in product(neighbourhoods, heuristics):
        STRATEGIES.append(Strategy("%s / %s" % (n[0], h[0]), n[1], h[1]))

定义了策略后,我们不一定想要在搜索过程中坚持使用单个选项。相反,我们随机选择其中一个策略,但根据策略的执行效果来加权选择。我们将在下面描述加权,但是对于pick_strategy 函数,我们只需要一个策略列表和一个相应的相对权重列表(任何数字都可以)。为了用给定的权重随机选择一个策略,我们在 0 到所有权重之和之间均匀地选择一个数字。随后,我们找到最小的索引 \(i\),使得所有权重的小于 \(i\) 的索引之和大于我们选择的随机数。这种技术有时被称为轮盘赌选择,它将为我们随机选择一个策略,并为具有更高权重的策略提供更大的机会。

def pick_strategy(strategies, weights):
    # Picks a random strategy based on its weight: roulette wheel selection
    #  Rather than selecting a strategy entirely at random, we bias the
    #  random selection towards strategies that have worked well in the
    #  past (according to the weight value).
    total = sum([weights[strategy] for strategy in strategies])
    pick = random.uniform(0, total)
    count = weights[strategies[0]]

    i = 0
    while pick > count:
        count += weights[strategies[i+1]]
        i += 1

    return strategies[i]

剩下的就是描述在搜索解决方案的过程中权重是如何增强的。这发生在求解器的主要 while 循环中,以定期的时间间隔(由TIME_INCREMENT 变量定义)发生


        # At regular intervals, switch the weighting on the strategies available.
        #  This way, the search can dynamically shift towards strategies that have
        #  proven more effective recently.
        if time.time() > time_last_switch + TIME_INCREMENT:

            time_last_switch = time.time()

回想一下,strat_improvements 存储了策略在过去所做出的所有改进的总和,而strat_time_spent 存储了策略在最后一个间隔内所花费的时间。我们通过每个策略所花费的总时间来归一化所做的改进,以获得衡量每个策略在最后一个间隔内执行效果的指标。由于策略可能没有机会运行,因此我们选择一个很短的时间作为默认值。

            # Normalize the improvements made by the time it takes to make them
            results = sorted([
                (float(strat_improvements[s]) / max(0.001, strat_time_spent[s]), s)
                for s in STRATEGIES])

现在我们有了对每个策略执行效果的排名,我们向最佳策略的权重添加 \(k\)(假设我们有 \(k\) 个策略),向次佳策略添加 \(k-1\),依此类推。每个策略的权重都会增加,列表中最差的策略的权重只会增加 1。

            # Boost the weight for the successful strategies
            for i in range(len(STRATEGIES)):
                strat_weights[results[i][1]] += len(STRATEGIES) - i

作为额外的措施,我们人为地提高了所有未使用的策略的权重。这样做是为了避免完全忘记一个策略。虽然一个策略可能在开始时看起来表现很差,但在搜索的后期它可能会非常有用。

                # Additionally boost the unused strategies to avoid starvation
                if results[i][0] == 0:
                    strat_weights[results[i][1]] += len(STRATEGIES)

最后,我们输出一些关于策略排名的信息(如果DEBUG_SWITCH 标志被设置),并为下一个间隔重置strat_improvementsstrat_time_spent 变量。

            if DEBUG_SWITCH:
                print "\nComputing another switch..."
                print "Best: %s (%d)" % (results[0][1].name, results[0][0])
                print "Worst: %s (%d)" % (results[-1][1].name, results[-1][0])
                print results
                print sorted([strat_weights[STRATEGIES[i]] 
                              for i in range(len(STRATEGIES))])

            strat_improvements = {strategy: 0 for strategy in STRATEGIES}
            strat_time_spent = {strategy: 0 for strategy in STRATEGIES}

讨论

在本章中,我们看到了可以用相对少量的代码来解决流水车间调度这个复杂的优化问题的可能性。找到流水车间这类大型优化问题的最佳解决方案可能很困难。在这种情况下,我们可以转向近似技术,例如局部搜索,来计算一个足够好的解决方案。通过局部搜索,我们可以从一个解决方案移动到另一个解决方案,目的是找到一个质量好的解决方案。

局部搜索背后的基本直觉可以应用于广泛的问题。我们专注于(1)从一个候选解决方案生成与问题相关的解决方案的邻域,以及(2)建立评估和比较解决方案的方法。有了这两个组成部分,我们可以利用局部搜索范式来找到有价值的解决方案,即使最佳选项计算起来过于困难。

我们没有使用任何单一的策略来解决问题,而是看到了如何动态选择策略并在求解过程中进行调整。这种简单而强大的技术赋予程序能够混合和匹配针对当前问题的部分策略的能力,并且它也意味着开发人员不必手工定制策略。