详解Redis用链表实现消息队列

前言

Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。

链表实现消息队列

Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。

用法如下:

charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在这里
/* ---------------------------------------------------- */
//当我在另一个客户端lpush一个元素之后,客户端输出为
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的时间

当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。

先看下有关数据结构,以及server和client有关属性

//阻塞状态
typedef struct blockingState {
 /* Generic fields. */
 mstime_t timeout;  /* 超时时间 */
 /* REDIS_BLOCK_LIST */
 dict *keys;    /* The keys we are waiting to terminate a blocking
        * operation such as BLPOP. Otherwise NULL. */
 robj *target;   /* The key that should receive the element,
        * for BRPOPLPUSH. */
 /* REDIS_BLOCK_WAIT */
 int numreplicas;  /* Number of replicas we are waiting for ACK. */
 long long reploffset; /* Replication offset to reach. */
} blockingState;
//继续列表
typedef struct readyList {
 redisDb *db;//就绪键所在的数据库
 robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
 int btype;    /* Type of blocking op if REDIS_BLOCKED. */
 blockingState bpop;  /* blocking state */
}
//服务器有关属性
struct redisServer {
  /* Blocked clients */
 unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
 list *unblocked_clients; /* list of clients to unblock before next loop */
 list *ready_keys;  /* List of readyList structures for BLPOP & co */
}
//数据库有关属性
typedef struct redisDb {
  //keys->redisCLient映射
  dict *blocking_keys;  /* Keys with clients waiting for data (BLPOP) */
 dict *ready_keys;   /* Blocked keys that received a PUSH */
}redisDB

必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为

void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
  != REDIS_OK) return;//将超时时间保存在timeout中
 for (j = 1; j < c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
  if (o != NULL) {//如果不为空
   if (o->type != REDIS_LIST) {//不是链表类型
    addReply(c,shared.wrongtypeerr);//报错
    return;
   } else {
    if (listTypeLength(o) != 0) {//链表不为空
     /* Non empty list, this is like a non normal [LR]POP. */
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);//从链表中pop出一个元素
     redisAssert(value != NULL);
     //给客户端发送pop出来的元素信息
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     /* 省略一部分 */
    }
   }
  }
 }
  /* 如果链表为空,则阻塞客户端 */
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞

void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空
 if (target != NULL) incrRefCount(target);//不为空,增加引用计数
 for (j = 0; j < numkeys; j++) {
  /* 将阻塞的key存入c.bpop.keys字典中 */
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  /* And in the other "side", to map keys -> clients */
  //将阻塞的key和客户端添加进c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   /* For every key we take a list of clients blocked for it */
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddNodeTail(l,c);//添加到阻塞键的客户点链表中
 }
 blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}

blockClient函数只是简单的设置客户端属性,如下

void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;//设置标志
 c->btype = btype;//阻塞操作类型
 server.bpop_blocked_clients++;
}

由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了?

插入一个元素解阻塞

任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句

if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();

这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了?

原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:

//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 //判断是否为链表类型,如果是,调用有链表已经ready函数
 if (val->type == REDIS_LIST) signalListAsReady(db, key);
 if (server.cluster_enabled) slotToKeyAdd(key);
 }
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
 readyList *rl;
 /* 没有客户端阻塞在这个键上,则直接返回. */
 if (dictFind(db->blocking_keys,key) == NULL) return;
 /* 这个键已近被唤醒了,所以没必要重新入队 */
 if (dictFind(db->ready_keys,key) != NULL) return;
 /* Ok, 除了上述两情况,把这个键放入server.ready_keys */
 rl = zmalloc(sizeof(*rl));
 rl->key = key;
 rl->db = db;
 incrRefCount(key);
 listAddNodeTail(server.ready_keys,rl);//添加链表末尾
 /* We also add the key in the db->ready_keys dictionary in order
  * to avoid adding it multiple times into a list with a simple O(1)
  * check. */
 incrRefCount(key);
 //同时将这个阻塞键放入db->ready_keys
 redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

OK,这时server.ready_keys上已经有就绪键了,这时就调用processCommand函数中的handleClientsBlockedOnLists()函数来处理阻塞客户端,在这个函数中,

void handleClientsBlockedOnLists(void) {
 while(listLength(server.ready_keys) != 0) {
  list *l;
  /* 将server.ready_keys赋给一个新的list,再将server.ready_keys清空 */
  l = server.ready_keys;
  server.ready_keys = listCreate();
  /* 迭代每一个就绪的每一个readyList */
  while(listLength(l) != 0) {
   listNode *ln = listFirst(l);//获取第一个就绪readyList
   readyList *rl = ln->value;
   /* 从rl所属的数据库中删除rl */
   dictDelete(rl->db->ready_keys,rl->key);
   /* 查询rl所属的数据库查找rl->key ,给阻塞客户端回复rl->key链表中的第一个元素*/
   robj *o = lookupKeyWrite(rl->db,rl->key);
   if (o != NULL && o->type == REDIS_LIST) {
    dictEntry *de;
    /* 在rl->db->blocking_keys查找阻塞在rl->key的客户端链表 */
    de = dictFind(rl->db->blocking_keys,rl->key);
    if (de) {
     list *clients = dictGetVal(de);//转换为客户端链表
     int numclients = listLength(clients);
     while(numclients--) {//给每个客户端发送消息
      listNode *clientnode = listFirst(clients);
      redisClient *receiver = clientnode->value;//阻塞的客户端
      robj *dstkey = receiver->bpop.target;//brpoplpush命令目的链表
      int where = (receiver->lastcmd &&
          receiver->lastcmd->proc == blpopCommand) ?
         REDIS_HEAD : REDIS_TAIL;//获取取出的方向
      robj *value = listTypePop(o,where);//取出就绪链表的元素
      if (value) {
       /* Protect receiver->bpop.target, that will be
        * freed by the next unblockClient()
        * call. */
       if (dstkey) incrRefCount(dstkey);
       unblockClient(receiver);//设置客户端为非阻塞状态
       if (serveClientBlockedOnList(receiver,
        rl->key,dstkey,rl->db,value,
        where) == REDIS_ERR)
       {
        /* If we failed serving the client we need
         * to also undo the POP operation. */
         listTypePush(o,value,where);
       }//给客户端回复链表中的元素内容
       if (dstkey) decrRefCount(dstkey);
       decrRefCount(value);
      } else {
       break;
      }
     }
    }
    //如果链表为空,则从数据库中删除
    if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
    /* We don't call signalModifiedKey() as it was already called
     * when an element was pushed on the list. */
   }
   /* 回收rl */
   decrRefCount(rl->key);
   zfree(rl);
   listDelNode(l,ln);
  }
  listRelease(l); /* We have the new list on place at this point. */
 }
}

从这个源码可知,如果有两个客户端,同时阻塞在一个链表上面,那么如果链表插入一个元素之后,只有先阻塞的那个客户端收到消息,后面阻塞的那个客户端继续阻塞,这也是先阻塞先服务的思想。handleClientsBlockedOnLists函数调用了unblockClient(receiver) ,该函数功能为接触客户端阻塞标志,以及找到db阻塞在key上的客户端链表,并将接触阻塞的客户端从链表删除。然后调用serveClientBlockOnList给客户端回复刚在链表插入的元素。

int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
 robj *argv[3];
 if (dstkey == NULL) {
  /* Propagate the [LR]POP operation. */
  argv[0] = (where == REDIS_HEAD) ? shared.lpop :
           shared.rpop;
  argv[1] = key;
  propagate((where == REDIS_HEAD) ?
   server.lpopCommand : server.rpopCommand,
   db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
  /* BRPOP/BLPOP */
  addReplyMultiBulkLen(receiver,2);
  addReplyBulk(receiver,key);
  addReplyBulk(receiver,value);
 } else {
  /* BRPOPLPUSH */
   /* 省略 */
 }
}

propagate函数主要是将命令信息发送给aof和slave。函数中省略部分是brpoplpush list list1 0命令的目的链表list1非空时,将从list链表pop出来的元素插入list1中。当给客户端发送消息之后,客户端就从read函数调用中返回,变为不阻塞。

通过超时时间解阻塞

如果链表一直没有数据插入,那么客户端将会一直阻塞下去,这肯定是不行的,所以brpop还支持超时阻塞,即阻塞时间超过一定值之后,服务器返回一个空值,这样客户端就解脱阻塞了。

对于时间超时,都放在了100ms执行一次的时间事件中;超时解脱阻塞函数也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout

int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
 time_t now = now_ms/1000;
 //..........
 else if (c->flags & REDIS_BLOCKED) {
  /* Blocked OPS timeout is handled with milliseconds resolution.
   * However note that the actual resolution is limited by
   * server.hz. */
  if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
   /* Handle blocking operation specific timeout. */
   replyToBlockedClientTimedOut(c);
   unblockClient(c);
  }
 }
 //.............

把这个函数不相干的代码删除,主要部分先判断这个客户端是否阻塞,如果是,超时时间是否到期,如果是,则调用replyToBlockedClientTimedOut给客户端回复一个空回复,以及接触客户端阻塞。

总结

链表消息队列实现暂时分析到这了,大家都学会了吗?希望这篇文章给大家能带来一定的帮助,如果有疑问可以留言交流。

时间: 2016-09-05

redis实现简单队列

在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话,建议使用第三方封装好的 EasyNetQ,rabbitMQ比较适合对安全性,稳定性要求较高的地方,但有时我们也会有对这方面要求不是很高的场景,比如:文章阅读数,实时性要求不是很高的地方,所以我想到了用redis来做队列. redis 的List结构本身就是一个链表 (双向链表),所以符合我们

Redis实现分布式队列浅析

Redis是什么? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. redis的安装和配置 Linux系统下: apt-get install redis-server Windows下: 下载链接 下载安装msi文件就好了 配置主从同步 需要实现分布式队列,至少要有一个master(192.168.45.1)和一个slave(192

PHP基于Redis消息队列实现发布微博的方法

本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]https://github.com/ErikDubbelboer/phpRedisAdmin.git[/url] cd phpRedisAdmin git clone [url]https://github.com/nrk/predis.git[/url] vendor 首先安装上述的Redis图形化管理

Redis 实现队列原理的实例详解

Redis 实现队列原理的实例详解 场景说明: ·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 ·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求 ·抢购场景,先入先出的模式 命令: rpush + blpop 或 lpush + brpop rpush : 往列表右侧推入数据 blpop : 客户端阻塞直到队列有值输出 简单队列: simple.php $stmt = $pdo->prepare('select id, c

PHP实现电商订单自动确认收货redis队列

一.场景 之前做的电商平台,用户在收到货之后,大部分都不会主动的点击确认收货,导致给商家结款的时候,商家各种投诉,于是就根据需求,要做一个订单在发货之后的x天自动确认收货.所谓的订单自动确认收货,就是在在特定的时间,执行一条update语句,改变订单的状态. 二.思路 最笨重的做法,通过linux后台定时任务,查询符合条件的订单,然后update.最理想情况下,如果每分钟都有需要update的订单,这种方式也还行.奈何平台太小,以及卖家发货时间大部分也是密集的,不会分散在24小时的每分钟.那么,

详解thinkphp+redis+队列的实现代码

1,安装Redis,根据自己的PHP版本安装对应的redis扩展(此步骤简单的描述一下) 1.1,安装 php_igbinary.dll,php_redis.dll扩展此处需要注意你的php版本如图: 1.2,php.ini文件新增 extension=php_igbinary.dll;extension=php_redis.dll两处扩展 ok此处已经完成第一步redis环境搭建完成看看phpinfo 项目中实际使用redis 2.1,第一步配置redis参数如下,redis安装的默认端口为6

redis 队列操作的例子(php)

入队操作 复制代码 代码如下: <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); while(True){ try{ $value = 'value_'.date('Y-m-d H:i:s'); $redis->LPUSH('key1',$value); sleep(rand()%3); echo $value."\n"; }catch(Exception $e){ echo $e->g

php中使用redis队列操作实例代码

例1,入队操作: 复制代码 代码如下: <?php$redis = new Redis();$redis->connect('127.0.0.1',6379);while(True){  try{    $value = 'value_'.date('Y-m-d H:i:s');    $redis->LPUSH('key1',$value);    sleep(rand()%3);    echo $value."\n";  }catch(Exception $e)

Python的Flask框架应用调用Redis队列数据的方法

任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大小小的请求给服务器.有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情. 更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做.从事异步任务的工具有很多.

php针对cookie操作的队列操作类实例

本文实例讲述了php针对cookie操作的队列操作类.分享给大家供大家参考.具体分析如下: 这里包括了从简单的cookie操作(增加,删除,修改)到我们的cookie队列操作类的操作,对此感兴趣的朋友可以参考一下. 一.PHP 的COOKIE cookie 是一种在远程浏览器端储存数据并以此来跟踪和识别用户的机制. PHP 在http 协议的头信息里发送cookie,因此 setcookie() 函数必须在其它信息被输出到浏览器前调用,这和对  header() 函数的限制类似. 设置cooki

PHP针对redis常用操作实例详解

本文实例讲述了PHP针对redis常用操作.分享给大家供大家参考,具体如下: /*1.Connection*/ $redis = new Redis(); $redis->connect('127.0.0.1',6379,1);//短链接,本地host,端口为6379,超过1秒放弃链接 $redis->open('127.0.0.1',6379,1);//短链接(同上) $redis->pconnect('127.0.0.1',6379,1);//长链接,本地host,端口为6379,超

JQuery中Ajax的操作完整例子

Java软件开发中,后台中我们可以通过各种框架,像SSH等进行对代码的封装,方便我们对Java代码的编写,例如,Struts,SpringMVC对从前台到action的流程进行封装控制,使我们只需要进行一些简单配置就可以实现:而spring进行了对各种对象的管理进行封装,提供了AOP编程的方式,大大方便了我们:而hibernate和IBatis则是对JDBC代码进行封装,不需要我们每次都写那些重复而繁杂的JDBC代码. 前台呢,对于页面一些效果,验证等,我们都是通过JavaScript语言进行完

PHP实现负载均衡session共享redis缓存操作示例

本文实例讲述了PHP实现负载均衡session共享redis缓存操作.分享给大家供大家参考,具体如下: 1.首先先创建html表单页面 <meta chatset='utf-8'> <center> <form action="se.php" method="post"> <table> <tr> <td>帐号:</td> <td><input type="

django之状态保持-使用redis存储session的例子

关于redis安装,pip install django-redis-sessions,按照提示进行安装相关的服务端 和客户端. django版本1.8.2, Python版本2.7.12 1 进入虚拟环境h1 workon h1 2 创建一个项目test应用booktest django-admin startproject test 3 创建应用booktest 进入项目test目录,创建应用booktest,这个booktest应用目录和manage.py在 同级目录下 python ma