基于Redis实现普通队列和延时队列的两种方式与问题

普通队列

基于Redis实现的普通队列,可以用于各种异步任务的处理,如短信发送,代码如下

投递任务脚本,只需要通过Lpush将任务添加到列表

<?php

$key = 'key1';
$i = 0;
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);

while (true) {
    $i += 1;
    echo $value = '投递任务:' . $i;
    $redis->Lpush($key, $value);
    usleep(50000); //微妙
}

消费任务脚本,通过brPop阻塞获取任务

<?php

$key = 'key1';
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);

while (true) {
    $arr = $redis->brPop([$key], 3);  //获取一条
    if (!empty($arr)) {
        echo "获得分配的任务:{$arr[1]}\r\n";
    }
}

由于brPop提供阻塞获取的功能,所以在消费任务速度大于投递速度的情况下,可以认为是实时异步处理

延时队列

投递任务脚本

<?php

$key = 'key';
$score = time();
$i = 0;
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
while (true) {
    $i += 1;
    echo $value = '投递任务:' . $i;
    $redis->zAdd($key, $score, $value); //有序集合的成员是唯一的,但分数(score)却可以重复。
    usleep(50000); //微妙
}

消费任务脚本

<?php

$key = 'key';
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);

while (true) {
    $arr = $redis->zRange($key, 0, 0, true);  //获取一条

    if (empty($arr) or $arr[key($arr)] > time()) {  //不存在或时间未到,等待并进入下一个循环
        usleep(500000); //微妙
        continue;
    }

    $value = key($arr);
    if ($redis->zRem($key, $value)) {
        echo "获得分配的任务:{$value}\r\n";
    } else {
        echo "任务被其他进程抢占,进入下一个循环\r\n";
    }
}

 使用Redis和两种方式的优缺点

两种方式都是基于Redis所以优点基本也是Redis自身的优点,简单,稳定,快速,支持分布式,但是基于Redis的延时队列始终需要配置一个轮询时间,所以从某个角度来讲并不是实时消费,但是可以通过单独的消费主进程来投递任务到子进程的方式和减少轮询时间来优化,类似Nginx的任务处理流程.

关于消费端,如果使用php实现只适合处理普通的队列,用来异步发下短信等等, laravel框架其实已经有类似的集成,可以直接使用, 对于延时队列如果想高效处理还是需要使用go等对多线程或多进程支持友好的语言来处理

正常情况下在使用时还需要考虑开发和维护成本,比如异步短信,对于普通项目而言可能使用laravel或其他框架自身实现的Redis队列功能或插件反而更合适,不需要单独维护一套代码,但是对于并发任务较大的项目肯定是不适用的,此时消费端只能使用Go等语言

发表评论

电子邮件地址不会被公开。 必填项已用*标注