Beanstalkd任务列队/消息列队

简介

Beanstalkd是一个轻量级高性能的任务列队系统,主要特性有优先级设置,持久化,分布式容错,超时控制

安装

sudo yum install beanstalkd

启动

sudo beanstalkd -d ./DIR

常用的启动参数

  • -d  指定binlog路径,用于断电后自动恢复
  • -f  同步时间 -f0为实时同步
  • -F  不同步 默认

使用简介

生产者通过put命令将一个job推动到一个指定的tube列队中,消费者从指定的tube列队中获取job通过通过reserve/release/bury/delete进行操作,改变job状态

如果有多个消费者在同一个tube不会收到相同的任务请求

发送任务

<?php
//发送任务
require_once 'Client.php';
//实例化beanstalk
beanstalk = new \Beanstalk\Client();beanstalk->connect();
//选择使用的tube
beanstalk->useTube('test');
//往tube中增加数据put = beanstalk->put(
    23, // 任务的优先级.
    0,  // 不等待直接放到ready队列中.
    60, // 处理任务的时间.
    'hello, beanstalk'  // 任务内容
);
if (!put) {
    exit('commit job fail');
}
$beanstalk->disconnect();

处理任务

<?php

require_once 'Client.php';
//实例化beanstalk
beanstalk = new \Beanstalk\Client();beanstalk->connect();

//查看beanstalkd状态
//var_dump(beanstalk->stats());
//查看有多少个tube
//var_dump(beanstalk->listTubes());
beanstalk->useTube('test');
//设置要监听的tubebeanstalk->watch('test');
//取消对默认tube的监听,可以省略
beanstalk->ignore('default');
//查看监听的tube列表
//var_dump(beanstalk->listTubesWatched());
//查看test的tube当前的状态
//var_dump(beanstalk->statsTube('test'));

while (true) {
//获取任务,此为阻塞获取,直到获取有用的任务为止job = beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk')
    //处理任务result = (job['body']);
    if (result) {
        //删除任务
        beanstalk->delete(job['id']);
    } else {
        //休眠任务
        beanstalk->bury(job['id']);
    }
    //跳出无限循环
    if (file_exists('shutdown')) {
        file_put_contents('shutdown', 'beanstalkd在' . date('Y-m-d H:i:s') . '关闭');
        break;
    }
}
$beanstalk->disconnect();

 

tube

任务列队,存放统一类型的job

生产者

  • 通过put命令来提交一个job到tube列队

消费者

  • 通过reserver 获取一个job
  • 通过release 将该job重新放回列队
  • 通过bury 将一个job休眠,等到需要的时候再将job kick回列队
  • 通过delete将一个job删除,一般是处理完成后

job状态图

083138jz3t1ktu2gzckirc

参考

基于beanstalkd的消息队列实践

管理工具bstools

 

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注