Beanstalkd任务列队/消息列队

简介

Beanstalkd是一个轻量级高性能的任务列队系统,主要特性有优先级设置,持久化,分布式容错,超时控制

安装

sudo yum install beanstalkd

启动

sudo beanstalkd -d ./DIR

常用的启动参数

  • -d  指定binlog路径,用于断电后自动恢复
  • -f  同步时间 -f0为实时同步
  • -F  不同步 默认

使用简介

生产者通过put命令将一个job推动到一个指定的tube列队中,消费者从指定的tube列队中获取job通过通过reserve/release/bury/delete进行操作,改变job状态

如果有多个消费者在同一个tube不会收到相同的任务请求

发送任务

<?php
//发送任务
require_once 'Client.php';
//实例化beanstalk
$beanstalk = new \Beanstalk\Client();
$beanstalk->connect();
//选择使用的tube
$beanstalk->useTube('test');
//往tube中增加数据
$put = $beanstalk->put(
    23, // 任务的优先级.
    0,  // 不等待直接放到ready队列中.
    60, // 处理任务的时间.
    'hello, beanstalk'  // 任务内容
);
if (!$put) {
    exit('commit job fail');
}
$beanstalk->disconnect();

处理任务

<?php

require_once 'Client.php';
//实例化beanstalk
$beanstalk = new \Beanstalk\Client();
$beanstalk->connect();

//查看beanstalkd状态
//var_dump($beanstalk->stats());
//查看有多少个tube
//var_dump($beanstalk->listTubes());
$beanstalk->useTube('test');
//设置要监听的tube
$beanstalk->watch('test');
//取消对默认tube的监听,可以省略
$beanstalk->ignore('default');
//查看监听的tube列表
//var_dump($beanstalk->listTubesWatched());
//查看test的tube当前的状态
//var_dump($beanstalk->statsTube('test'));

while (true) {
//获取任务,此为阻塞获取,直到获取有用的任务为止
    $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk')
    //处理任务
    $result = ($job['body']);
    if ($result) {
        //删除任务
        $beanstalk->delete($job['id']);
    } else {
        //休眠任务
        $beanstalk->bury($job['id']);
    }
    //跳出无限循环
    if (file_exists('shutdown')) {
        file_put_contents('shutdown', 'beanstalkd在' . date('Y-m-d H:i:s') . '关闭');
        break;
    }
}
$beanstalk->disconnect();

 

tube

任务列队,存放统一类型的job

生产者

  • 通过put命令来提交一个job到tube列队

消费者

  • 通过reserver 获取一个job
  • 通过release 将该job重新放回列队
  • 通过bury 将一个job休眠,等到需要的时候再将job kick回列队
  • 通过delete将一个job删除,一般是处理完成后

job状态图

083138jz3t1ktu2gzckirc

参考

基于beanstalkd的消息队列实践

管理工具bstools

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注