nodejs模块之 redis作业/任务队列模块 bee-queue

nodejs模块,redis作业/任务队列模块,bee-queue

Git URL:

Git Clone代码到本地:

Subversion代码到本地:

一个简单、快速、健壮的Node.js 作业/任务(job/task) 队列。由Redis支持。

  • 简单:~1000 LOC,最小依赖性。
  • 快速:通过最小化Redis和网络开销来最大化吞吐量。 基准测试。
  • 健壮:考虑并发性。原子性和失败的设计;接近完整的代码覆盖。

简介

蜂群队列旨在为分布式工作池供电,并且构建了短而实时的工作。 web服务器可以对作业进行排队,等待辅助进程完成它,并在HTTP请求中返回它的结果。 缩放就像运行更多的工人一样简单。

由于 Mixmax folks,蜜蜂队列再次被定期地维护在

的工作流程,支持优先级和可以重复作业,但通常都是针对任务优先级和可以重复作业的。 蜜蜂队列可以处理较长的后台作业,但它们不是的主要焦点。

安装

你还需要 2.8 + * 运行在某个地方。

注意:由于 * <3.2的问题,我们已经注意到一些作业被延迟,因此推荐使用 Redis 3.2 +。

目录

动机

celery 用于 python,Resque用于 ruby,但是 Kueox已经存在,所以它们的性能也很好,因此我们还需要存在

简而言之,我们需要混合和 MATCH 处理好的事情,我们还需要挤出更多的性能。 还有一个长版本,详细说明。

蜜蜂队列是将多头和健壮性结合起来,使kue能够将事件送回作业创建者,然后将重点放在minimizing代码质量,并通过严格的代码质量测试和测试标准。 它会破坏特性的宽度,因这里当然,Kue或者be可以能更适合于( 请参见捐赠。)。

Bull和Kue做得很好,应该有很多的信用。 bee借用了两种想法,而公牛在初始开发中是一个特别宝贵的参考。

为什么蜜蜂?

蜜蜂队列就像蜜蜂,因为它:

  • 很简单简单
  • 速度快( 蜜蜂可以飞 20英里) !
  • 在鲜花( 服务器) 之间携带花粉( 邮件)
  • worker bees”"

基准测试

resultschart

通过each库,这些基本基准在每个库运行 10,000作业,在不同的并发级别,在 Amazon AWS EC2 m4.large. 上运行,每个组合都有 36个运行,每个组合都有 3个运行,。 原始数据收集和代码在基准文件夹中可用。

网络接口

查看 Arena web界面来管理作业和检查队列运行状况。

概述

创建队列

队列对象是这个库所做的一切的起点。 为了制作一个,我们只需要给它一个 NAME,通常指示它将处理的工作类型:

队列非常轻量级- – 连接到agavi的唯一重要开销是,如果需要处理不同类型的作业,请实例化:

这里,传递一个 settings 对象来指定备用的Redis主机,并指出这个队列将只添加任务的( 不处理它们)。 有关更多选项,请参见队列设置

创建作业

作业使用 Queue.createJob(data) 创建,它返回一个作业对象存储任意的data

作业有一个用于配置作业的链接 API,以及 .save([cb]) 方法将作业保存到tmodel中并进行处理:

job方法的save 除了调用可选回调之外还返回一个 Promise。

可以使用命令 .setId(id).retries(n).backoff(strategy, delayFactor).delayUntil(date|timestamp).timeout(ms) 来配置每个作业,用于设置选项。

稍后可以使用 Queue#getJob插件从Redis检索作业,但是大多数用例不需要这样做,而且可以使用作业和队列事件( ) 来完成。

处理作业

要开始处理作业,请调用 Queue.process 并提供处理程序函数:

处理函数可以返回 Promise,而不是调用提供的回调。 这样可以直观地使用 async/await:

处理程序函数得到了需要处理的作业,包括创建作业时的job.data。 然后,通过返回 Promise 或者调用 done 回调来传递结果。 有关处理程序的更多信息,请参见 Queue#process

.process 只能被每个 Queue 实例调用一次,但我们可以按照我们喜欢的实例进行处理。 从这个角度,我们可以很容易地做一个工作的机器人池,并且在处理我们的工作时花费他们的生活。

.process 还可以采用并发参数。 如果作业花费的时间大部分只是等待外部资源,你可能希望每个处理器实例一次最多只能处理 10个:

进度报告

处理程序可以发送进度报告,这些报告将作为事件接收到原始作业实例中:

.process 一样,这些 progress 事件跨多个进程或者服务器工作;作业实例将接收进程事件,无论处理发生什么情况。 注意这种机制依赖于 pub/sub,因此会给每个额外的工作线程带来额外的开销。

作业和队列事件

蜜蜂队列对象发出的事件种类有三种: 队列本地事件队列PubSub事件,以及作业事件。 链接的API参考部分提供了更完整的。

上面演示的进度报告通过作业事件进行。 作业还发出 succeeded 事件,我们在打开示例中看到了这些事件,failedretrying 事件。

队列PubSub事件与作业事件直接对应: job succeededjob retryingjob failedjob progress。 这些事件从所有队列实例和队列中的所有作业激发。

在所有队列实例上都包括 readyerror,以及与正在发送的PubSub事件对应的worker队列上的succeededretryingfailed

在跨进程重启时,作业事件变得不可靠,因为对关联作业对象的队列引用将丢失。 队列级事件可以能更可以靠,但作业事件更方便,如HTTP请求在进程重启失去状态时更方便。

停止作业

蜜蜂队列尝试提供 “至少一次交付”服务。 任何排队的作业至少应处理一次,如果工作人员崩溃,或者不能确认完成作业。

要实现这一功能,员工会定期 phone home home每个工作,只是”我还在处理这个,我还没有停止,所以你不需要重试它。” checkStalledJobs method方法找到工作 workers,并在工作id中加入一个 stalled 事件,然后由另一个工作者将它的enqueues。

API参考

死信队列

设置

默认队列设置为:

settings 字段包括:

  • prefix: 字符串,默认的bq。如果 bq: 名称空间是不可以用的,不可以用或者问题的。
  • stallInterval: 数字,毫秒,员工必须报告他们没有停止的窗口的长度。 更高的值将减少 rtc/网络开销,但是如果工作人员停止,则在重试时会花费更长时间。 更高的值也会导致在失速检测期间误报的概率较低。
  • 如果延迟任务将在窗口中继续运行,队列将再次检查该窗口没有丢失任何作业,该窗口将继续检查。
  • delayedDebounce: 数字,ms,为了避免不必要的多个工作,队列可能会延迟单个作业。
  • redis: 对象或者字符串,指定如何连接到 Redis。 有关完整的选项集,请参阅 redis.createClient()
    • host: 字符串,Redis主机。
    • port: 数字,Redis端口。
    • socket: 字符串,用于代替主机和端口的Redis套接字。

    注意,这也可以是 node_redis RedisClient 实例,在这种情况下,蜜蜂队列将在它的上发出正常的命令。 如果启用的话,它将 duplicate() 用于阻塞命令和PubSub订阅。 这是高级用法

  • isWorker: boolean。如果这里队列不处理作业,则禁用。
  • getEvents: boolean。如果这里队列不需要接收作业事件,则禁用。
  • sendEvents: boolean。如果这里工作线程不需要将作业事件发送回其他队列,则禁用。
  • storeJobs: 布尔值。如果这里工具不需要将事件与特定的Job 实例关联,则禁用。 这通常提高了内存的使用率,因为对于许多用例来说,作业的存储是不必要的。
  • ensureScripts: 布尔值。确保在运行任何命令之前在agavi中存在Lua脚本。
  • activateDelayedJobs: 布尔值一旦通过了 delayUntil 时间戳,就激活延迟的作业。 注意,必须在延迟重试策略( fixedexponential ) 中至少启用一个 Queue 实例( 这将在计算延迟后重新激活它们)。
  • removeOnSuccess: 布尔值。允许这里工作人员自动将它的成功完成的作业从tmodel中删除,以便使内存使用。
  • removeOnFailure: 布尔值。允许这里工作人员自动将它的失败的作业从tmodel中删除,以便使内存使用。 这不会删除被设置为重试的作业,除非它们在重试时失败。
  • quitCommandClient: boolean。在调用 Queue#close 时是否对redis命令客户端( 客户端发送正常操作的时间超过) 进行 QUIT 处理。 对于正常使用,默认为 true,如果为 redis 选项提供了现有 RedisClient 对象,则为 false
  • redisScanCount: 数字。用于设置 Queue#getJobs 和失败作业类型中使用的SSCAN Redis命令的值。

属性

  • name: 字符串,传递给构造函数的NAME。
  • keyPrefix: 字符串,用于与这里队列关联的所有Redis键的前缀。
  • jobs: 将当前跟踪的作业的( 启用 storeJobsgetEvents 时) 关联起来的Map
  • paused: boolean,是否暂停队列实例。 如果队列正在关闭,则只有 true。
  • settings: 对象,在传递的和默认值之间确定的设置

队列本地事件

已经就绪

不是听这个事件,考虑调用 Queue#ready([cb]),它返回一一一个队列准备好后解决的诺求。 如果队列已经就绪,那么承诺就已经解决了。

队列已经连接到 Redis,并确保脚本是高速缓存。 如果在未缓存脚本的情况下,可以在未缓存脚本的情况下进行缓存,你可以在不检查该事件的情况下。

错误

任何Redis错误都是从队列发出的。 注意,这里事件不会为失败的作业发出。

成功

这里队列已经成功处理 Job。 如果定义了 result,则处理程序称为 done(null, result)

重试

队列已经处理 Job,但报告了一个故障,并已经被加入队列以进行其他尝试。 job.options.retries 已经减少,堆栈跟踪( 或者错误信息) 已经添加到它的job.options.stacktraces array 中。

失败

这里队列已经处理 Job,但它的处理程序报告了失败,方法是拒绝它的返回的承诺,或者通过调用 done(err)

已经停止

这里队列检测到作业暂停失败。 注意,这可能与处理作业并最终停止的实例不同;相反,它是队列的实例,它是的实例 detect stalled的任务。

队列PubSub事件

这些事件都是由某些工作者队列( 启用了 sendEvents ) 报告的,并作为 Redis pub/子消息发送回任何队列,以便对它们进行。 这意味着监听这些事件实际上是对队列中所有员工的所有活动的监视。

如果事件的jobId 用于该队列实例创建的作业,则该作业对象将发出相应的作业事件事件( )。

请注意,队列datacontext事件传递 jobId,但没有引用,因为该作业最初由它的他某个队列创建。 只有在创建作业的进程中发出作业事件事件,并且从作业对象本身发出。

作业成功

某些工作线程已经成功处理作业 jobId。 如果定义了 result,则处理程序称为 done(null, result)

作业重试

某些工作线程已经处理作业 jobId,但报告了故障,并已经将它的排队以进行其他尝试。

作业失败

某些工作线程已经处理 Job,但它的处理程序报告 done(err) 失败。

作业进度

某些工人的进程作业 jobId,并发送了一个进度报告 %。

队列延迟作业激活

Queue 将不会激活延迟作业,除非 activateDelayedJobs 被设置为 true

作业激活的及时性通过 Queue 上的delayedDebounce 设置来控制。 这里设置定义一个窗口,用于对延迟的作业进行分组。 如果将来三个作业队列为 10,10.5和 12s,则 1000delayedDebounce 将导致第二个作业在第二个作业时间激活。

Queue 上的nearTermWindow 设置决定了 Queue 在试图激活任何已经运行的延迟作业之前应等待的最大持续时间。 这里设置是控制在 earlierDelayed 事件交付时与发布 Queue的死亡相结合的网络故障。

方法

队列( 名称,[settings] )

用于实例化新队列;打开与Redis的连接。

Queue#createJob ( 数据)

返回一个新的作业对象( 与关联用户数据关联)。

Queue#getJob ( jobId,[cb] )

jobId 查找作业。 如果 getEventsstoreJobs 是 true,返回的作业将发出事件。

请注意这个方法;大多数潜在的使用将更好地由已经有作业实例的作业事件提供。 当 storeJobs 设置为 true 时,使用这里方法可能会增加内存使用情况,因为每个队列维护所有相关作业的表。

Queue#getJobs ( 类型,页,[cb] )

按队列类型查找作业。 查找 waitingactive 或者 delayed 类型的作业时,应该使用 startend 属性配置 page,以指定要返回的作业索引范围。 类型 failedsucceeded的作业将返回大小为 page['size']的队列的任意子集。 注意:这是因为失败和成功的作业类型由一个Redis集表示,它不维护作业排序。

注意 page 属性的大值可能会导致Redis服务器上的过量负载。

Queue#process (。[concurrency],处理程序( 作业,完成) )

使用提供的处理程序函数开始处理作业。

process 方法只应被调用一次,并且不应该在 isWorker的false 中调用。

可选的concurrency 参数设置这里处理器同时激活的作业的最大数目。 默认值为 1.

处理程序函数应该是:

  • 返回最终解析或者拒绝的Promise,或者
  • 一次调用 done
    • 使用 done(err) 指示作业失败
    • 使用 done() 或者 done(null, result) 表示作业成功
      • result 必须是json序列化( 对于 JSON.stringify )
  • 永远不要使用事件循环( 很长一段时间)。 如果这样做的话,失败检测可能会认为工作停止了,当它真正阻塞事件循环时。

返回。如果处理程序返回一个 Promise,则将忽略对 done 回调的调用。

Queue#checkStalledJobs ( [interval],[cb] )

检查似乎是失败的作业,因这里需要重试,然后重新排列它们。

如果检查由提供的参数确定,将会发生什么情况:

  • cb: cb 被调用
  • interval: 超时设置为在 interval ms中再次调用该方法
  • cbinterval: 设置了超时,然后调用 cb

当工作线程开始处理时,蜜蜂队列自动调用这里方法,因此它将检查工作进程是否重新启动。 你还应该使用一个间隔参数进行自己的调用,以使检查反复发生;请参见罩下的,以获得解释的解释。

当作业停顿到重试时的最大延迟大约是 stallInterval + interval,因此,如果不需要调用 checkStalledJobs,则将延迟设置为 minimize,将 interval 设置为相同或者稍短。 根据工作时间,每 0.5秒检查一次良好的系统平均频率,这取决于你的工作时间。 更大的部署,或者处理更高的CPU方差的部署可能需要更高的间隔。

如果你想在每次检查结果中得到结果,你必须提供一个回调,如果你想要的结果是第一次检查,那么返回的Promise 只能是一次。 如果指定了 interval 并且没有 cb,那么在第一次检查之后遇到的错误将作为 error 事件发出。

Queue#checkHealth ( [cb] )

检查队列的”健康状况”。 返回解析为每个状态(。waitingactivesucceededfaileddelayed ) 中的作业数的诺求,以及 newestJob 中最新的作业 in ( 如果使用默认标识行为)。 通过将 waitingactive 计数合并在一起,你可以定期查询 newestJob ID来估计作业创建吞吐量,并可以推断作业处理吞吐量。

Queue#close ( [cb] )

关闭队列到Redis的连接。 幂等。

推荐的Pattern 用于正常关闭你的工作线程:

Queue#removeJob ( jobId,[cb] )

jobId 删除作业。 幂等。

如果作业当前正在被另一个worker处理,这可能会产生意外的副作用,所以只有当你知道该作业是安全的时才使用这里方法。

返回在它的上调用的Queue 实例。

Queue#destroy ( [cb] )

删除属于这里队列( 请参见引擎盖下的 。)的所有Redis密钥。 幂等。

不用说,这应该非常小心。

返回已经删除的密钥数。

作业

属性

  • id: 字符串,每个作业的唯一作业 ID。 在 .save 回拨之前未填充。 可以用 Job#setId 覆盖。
  • data: 对象;与作业关联的用户数据。 它应该:
    • 为json可以序列化( 对于 JSON.stringify )
    • 从不用于传递大数据块( 100kB+ )
    • 理想情况下尽可能小( 1kB或者更少)
  • options: 蜜蜂队列用来存储超时。重试。堆栈跟踪等的对象。
    • 不直接修改;使用作业方法。
  • Queue: 负责这里作业实例的队列。 这是:
    • 调用 createJob 来完成作业的队列,
    • 运行 getJob 以从redis获取作业的队列,或者
    • 调用 process 来处理它的队列
  • progress: 数字,0和 100之间的进度,由 reportProgress 报告。

作业事件

这些都是像队列PubSub事件的pub/子事件,当 getEvents 为 false 时禁用。

成功

作业已经成功如果定义了 result,则调用名为 done(null, result)的处理程序。

重试

作业已经失败,但它正在自动排队以进行其他尝试。 job.options.retries 已经相应减少。

失败

作业已经失败,并且未被重试。

进程

作业已经发送了一个进度报告 %。

方法

可以使用命令 .setId(id).retries(n).backoff(strategy, delayFactor).delayUntil(date|timestamp).timeout(ms) 来配置每个作业。

Job#setId ( id )

显式设置作业的ID。 如果具有给定标识的作业已经存在,则不会创建作业,并且 job.id 将被设置为 null。 这里方法可以用于通过传递资源的ID来对每个外部资源运行一次。 例如通过将作业ID设置为用户的ID,你可以只运行一次用户的安装作业。

避免传递数字作业 ID,因为它可能与自动生成的标识冲突。

Job#retries ( n )

设置作业在失败时自动重试的次数。

存储在 job.options.retries 中,每次重试作业时都减少。

默认为 0.

Job#backoff ( 策略,delayFactor )

在处理重试时设置回退策略。

这里设置以 {strategy, delay} 形式存储在 job.options.backoff 中。

默认为 'immediate'

Job#delayUntil ( date|timestamp )

延迟作业直到指定日期/时间戳通过。 有关控制延迟作业激活的信息,请参阅 Queue 设置部分。

默认值为enqueueing立即处理作业。

Job#timeout ( ms )

以毫秒为单位设置作业运行时间超时;如果作业函数的处理程序花费比调用 done 时间更长的时间,则工作线程假定作业失败并将它的报告为此类任务。

默认为无超时。

Job#save ( [cb] )

保存作业,将它的排队以进行处理。 在回调激发( 以及相关的承诺) 之后,job.id 将被填充。

Job#reportProgress ( n )

当在处理程序函数中调用时报告作业进程。 导致发出 progress 事件。 不会将进度保持在,但将它的存储在 job.progress 上,如果它的他 Queue s 启用了 storeJobsgetEvents,则 progress 将结束。

Job#remove ( [cb] )

从队列中删除作业。 幂等。

如果作业当前正在被另一个worker处理,这可能会产生意外的副作用,所以只有当你知道该作业是安全的时才使用这里方法。

要注意,这种方法会调用带有作业id的 Queue#removeJob 插件,因此如果你没有内存中的作业,那么使用就更有效了,而不是首先使用,而不是先获取工作。

返回在它的上调用的Job 实例。

默认值

lib/defaults.js 中队列 settings的默认值。 更改该文件将更改蜜蜂队列行为的默认。

在引擎盖下

每个队列都使用以下Redis密钥:

  • bq:name:id: 整数,递增以确定下一个作业 ID。
  • bq:name:jobs: 从作业ID到包含它的数据和选项的JSON字符串的哈希。
  • bq:name:waiting: 等待处理的作业的id列表。
  • bq:name:active: 当前正在处理的IDs作业列表。
  • bq:name:succeeded: 已经成功完成的作业的id。
  • bq:name:failed: 失败的作业的id集。
  • bq:name:delayed: 与延迟作业对应的有序id集- 这个集合将延迟时间戳映射到 id。
  • bq:name:stalling: 在这里间隔内没有’已经签入’的作业的一组 id。
  • bq:name:stallBlock: 在这里间隔内没有’已经签入’的作业的一组 id。
  • bq:name:events: 员工发送作业结果的pub/子频道。
  • bq:name:earlierDelayed: 在所有它的他作业之前添加新的延迟作业时,创建作业的脚本将发布该作业的时间戳。

to是非轮询,因这里空闲工作人员在排队时立即听到接收作业。 这是由 brpoplpush 提供的,它用于将作业从等待列表移到活动列表。 蜜蜂队列一般遵循”可靠队列”Pattern 描述这里的

isWorker 设置创建一个专用于 brpoplpush的额外Redis连接。 如果启用 getEvents 或者 activateDelayedJobs,则另一连接用于接收 pub/子事件。 因此,如果不需要这些设置,则应该禁用这些设置。

失速设置是最新失速间隔开始时的活动列表的快照。 每隔一段时间,员工从停机集中删除作业 id,所以在间隔结束时,所有的id都会丢失他们的窗口,需要重新运行它们。 当 checkStalledJobs 运行时,它将在失速集( 到等待列表) 中保留的所有作业,然后获取活动列表的快照并将它的存储在阻塞集合中。

蜂群队列要求用户自己启动重复检查,因为如果我们自动执行,系统中的每个队列实例都会执行检查。 从所有实例进行检查比只检查一两个实例更有效,并且提供了更弱的保证。 例如在10个进程上运行的checkStalledJobs 间隔将平均每 500检查一次,但仅保证每一次检查一次ms毫秒。 每 1000ms 个实例检查两个实例也会平均每 500次检查一次,但在整个时间范围内分布比较好,并且会保证每小时检查一次 ms。 尽管这个检查不贵,但是不会很有用,避免无用的工作是本库的主要要点。

欢迎请求请求,只需确保 npm test 通过。 要进行重大更改,请先打开讨论。

一些重要的非功能包括:

  • 工作跟踪:Kue执行这里操作。
  • 所有员工暂停简历:Bull。
  • 作业优先级:多个队列在简单情况下完成任务,但Kue具有一流的支持。 ox提供了多个队列的包装器。

它的中有些可以能是值得增加的;如果你有兴趣使用或者帮助实现它们,请注释 !

你需要一个本地的redis服务器来运行测试。 注意,运行测试可能会删除 bq:test-*-*:* 形式的某些键。

承接各种网站开发与修改、爬虫、数据采集分析、小程序等任务

Html+Css+JS+PHP+Nodejs+Python

专治网站各种不服

一起探讨,互相学习,共同进步!有事儿您说话。

This entry was posted in NodeJS, redis and tagged , , by 织梦先生. Bookmark the permalink.