详解Redis用链表实现消息队列

前言

Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。

链表实现消息队列

Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。

用法如下:

charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在这里
/* ---------------------------------------------------- */
//当我在另一个客户端lpush一个元素之后,客户端输出为
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的时间

当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。

先看下有关数据结构,以及server和client有关属性

//阻塞状态
typedef struct blockingState {
 /* Generic fields. */
 mstime_t timeout;  /* 超时时间 */
 /* REDIS_BLOCK_LIST */
 dict *keys;    /* The keys we are waiting to terminate a blocking
        * operation such as BLPOP. Otherwise NULL. */
 robj *target;   /* The key that should receive the element,
        * for BRPOPLPUSH. */
 /* REDIS_BLOCK_WAIT */
 int numreplicas;  /* Number of replicas we are waiting for ACK. */
 long long reploffset; /* Replication offset to reach. */
} blockingState;
//继续列表
typedef struct readyList {
 redisDb *db;//就绪键所在的数据库
 robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
 int btype;    /* Type of blocking op if REDIS_BLOCKED. */
 blockingState bpop;  /* blocking state */
}
//服务器有关属性
struct redisServer {
  /* Blocked clients */
 unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
 list *unblocked_clients; /* list of clients to unblock before next loop */
 list *ready_keys;  /* List of readyList structures for BLPOP & co */
}
//数据库有关属性
typedef struct redisDb {
  //keys->redisCLient映射
  dict *blocking_keys;  /* Keys with clients waiting for data (BLPOP) */
 dict *ready_keys;   /* Blocked keys that received a PUSH */
}redisDB

必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为

void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
  != REDIS_OK) return;//将超时时间保存在timeout中
 for (j = 1; j < c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
  if (o != NULL) {//如果不为空
   if (o->type != REDIS_LIST) {//不是链表类型
    addReply(c,shared.wrongtypeerr);//报错
    return;
   } else {
    if (listTypeLength(o) != 0) {//链表不为空
     /* Non empty list, this is like a non normal [LR]POP. */
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);//从链表中pop出一个元素
     redisAssert(value != NULL);
     //给客户端发送pop出来的元素信息
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     /* 省略一部分 */
    }
   }
  }
 }
  /* 如果链表为空,则阻塞客户端 */
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞

void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空
 if (target != NULL) incrRefCount(target);//不为空,增加引用计数
 for (j = 0; j < numkeys; j++) {
  /* 将阻塞的key存入c.bpop.keys字典中 */
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  /* And in the other "side", to map keys -> clients */
  //将阻塞的key和客户端添加进c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   /* For every key we take a list of clients blocked for it */
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddNodeTail(l,c);//添加到阻塞键的客户点链表中
 }
 blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}

blockClient函数只是简单的设置客户端属性,如下

void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;//设置标志
 c->btype = btype;//阻塞操作类型
 server.bpop_blocked_clients++;
}

由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了?

插入一个元素解阻塞

任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句

if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();

这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了?

原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:

//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 //判断是否为链表类型,如果是,调用有链表已经ready函数
 if (val->type == REDIS_LIST) signalListAsReady(db, key);
 if (server.cluster_enabled) slotToKeyAdd(key);
 }
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
 readyList *rl;
 /* 没有客户端阻塞在这个键上,则直接返回. */
 if (dictFind(db->blocking_keys,key) == NULL) return;
 /* 这个键已近被唤醒了,所以没必要重新入队 */
 if (dictFind(db->ready_keys,key) != NULL) return;
 /* Ok, 除了上述两情况,把这个键放入server.ready_keys */
 rl = zmalloc(sizeof(*rl));
 rl->key = key;
 rl->db = db;
 incrRefCount(key);
 listAddNodeTail(server.ready_keys,rl);//添加链表末尾
 /* We also add the key in the db->ready_keys dictionary in order
  * to avoid adding it multiple times into a list with a simple O(1)
  * check. */
 incrRefCount(key);
 //同时将这个阻塞键放入db->ready_keys
 redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

OK,这时server.ready_keys上已经有就绪键了,这时就调用processCommand函数中的handleClientsBlockedOnLists()函数来处理阻塞客户端,在这个函数中,

void handleClientsBlockedOnLists(void) {
 while(listLength(server.ready_keys) != 0) {
  list *l;
  /* 将server.ready_keys赋给一个新的list,再将server.ready_keys清空 */
  l = server.ready_keys;
  server.ready_keys = listCreate();
  /* 迭代每一个就绪的每一个readyList */
  while(listLength(l) != 0) {
   listNode *ln = listFirst(l);//获取第一个就绪readyList
   readyList *rl = ln->value;
   /* 从rl所属的数据库中删除rl */
   dictDelete(rl->db->ready_keys,rl->key);
   /* 查询rl所属的数据库查找rl->key ,给阻塞客户端回复rl->key链表中的第一个元素*/
   robj *o = lookupKeyWrite(rl->db,rl->key);
   if (o != NULL && o->type == REDIS_LIST) {
    dictEntry *de;
    /* 在rl->db->blocking_keys查找阻塞在rl->key的客户端链表 */
    de = dictFind(rl->db->blocking_keys,rl->key);
    if (de) {
     list *clients = dictGetVal(de);//转换为客户端链表
     int numclients = listLength(clients);
     while(numclients--) {//给每个客户端发送消息
      listNode *clientnode = listFirst(clients);
      redisClient *receiver = clientnode->value;//阻塞的客户端
      robj *dstkey = receiver->bpop.target;//brpoplpush命令目的链表
      int where = (receiver->lastcmd &&
          receiver->lastcmd->proc == blpopCommand) ?
         REDIS_HEAD : REDIS_TAIL;//获取取出的方向
      robj *value = listTypePop(o,where);//取出就绪链表的元素
      if (value) {
       /* Protect receiver->bpop.target, that will be
        * freed by the next unblockClient()
        * call. */
       if (dstkey) incrRefCount(dstkey);
       unblockClient(receiver);//设置客户端为非阻塞状态
       if (serveClientBlockedOnList(receiver,
        rl->key,dstkey,rl->db,value,
        where) == REDIS_ERR)
       {
        /* If we failed serving the client we need
         * to also undo the POP operation. */
         listTypePush(o,value,where);
       }//给客户端回复链表中的元素内容
       if (dstkey) decrRefCount(dstkey);
       decrRefCount(value);
      } else {
       break;
      }
     }
    }
    //如果链表为空,则从数据库中删除
    if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
    /* We don't call signalModifiedKey() as it was already called
     * when an element was pushed on the list. */
   }
   /* 回收rl */
   decrRefCount(rl->key);
   zfree(rl);
   listDelNode(l,ln);
  }
  listRelease(l); /* We have the new list on place at this point. */
 }
}

从这个源码可知,如果有两个客户端,同时阻塞在一个链表上面,那么如果链表插入一个元素之后,只有先阻塞的那个客户端收到消息,后面阻塞的那个客户端继续阻塞,这也是先阻塞先服务的思想。handleClientsBlockedOnLists函数调用了unblockClient(receiver) ,该函数功能为接触客户端阻塞标志,以及找到db阻塞在key上的客户端链表,并将接触阻塞的客户端从链表删除。然后调用serveClientBlockOnList给客户端回复刚在链表插入的元素。

int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
 robj *argv[3];
 if (dstkey == NULL) {
  /* Propagate the [LR]POP operation. */
  argv[0] = (where == REDIS_HEAD) ? shared.lpop :
           shared.rpop;
  argv[1] = key;
  propagate((where == REDIS_HEAD) ?
   server.lpopCommand : server.rpopCommand,
   db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
  /* BRPOP/BLPOP */
  addReplyMultiBulkLen(receiver,2);
  addReplyBulk(receiver,key);
  addReplyBulk(receiver,value);
 } else {
  /* BRPOPLPUSH */
   /* 省略 */
 }
}

propagate函数主要是将命令信息发送给aof和slave。函数中省略部分是brpoplpush list list1 0命令的目的链表list1非空时,将从list链表pop出来的元素插入list1中。当给客户端发送消息之后,客户端就从read函数调用中返回,变为不阻塞。

通过超时时间解阻塞

如果链表一直没有数据插入,那么客户端将会一直阻塞下去,这肯定是不行的,所以brpop还支持超时阻塞,即阻塞时间超过一定值之后,服务器返回一个空值,这样客户端就解脱阻塞了。

对于时间超时,都放在了100ms执行一次的时间事件中;超时解脱阻塞函数也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout

int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
 time_t now = now_ms/1000;
 //..........
 else if (c->flags & REDIS_BLOCKED) {
  /* Blocked OPS timeout is handled with milliseconds resolution.
   * However note that the actual resolution is limited by
   * server.hz. */
  if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
   /* Handle blocking operation specific timeout. */
   replyToBlockedClientTimedOut(c);
   unblockClient(c);
  }
 }
 //.............

把这个函数不相干的代码删除,主要部分先判断这个客户端是否阻塞,如果是,超时时间是否到期,如果是,则调用replyToBlockedClientTimedOut给客户端回复一个空回复,以及接触客户端阻塞。

总结

链表消息队列实现暂时分析到这了,大家都学会了吗?希望这篇文章给大家能带来一定的帮助,如果有疑问可以留言交流。

(0)

相关推荐

  • 详解thinkphp+redis+队列的实现代码

    1,安装Redis,根据自己的PHP版本安装对应的redis扩展(此步骤简单的描述一下) 1.1,安装 php_igbinary.dll,php_redis.dll扩展此处需要注意你的php版本如图: 1.2,php.ini文件新增 extension=php_igbinary.dll;extension=php_redis.dll两处扩展 ok此处已经完成第一步redis环境搭建完成看看phpinfo 项目中实际使用redis 2.1,第一步配置redis参数如下,redis安装的默认端口为6

  • Redis 实现队列原理的实例详解

    Redis 实现队列原理的实例详解 场景说明: ·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 ·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求 ·抢购场景,先入先出的模式 命令: rpush + blpop 或 lpush + brpop rpush : 往列表右侧推入数据 blpop : 客户端阻塞直到队列有值输出 简单队列: simple.php $stmt = $pdo->prepare('select id, c

  • PHP实现电商订单自动确认收货redis队列

    一.场景 之前做的电商平台,用户在收到货之后,大部分都不会主动的点击确认收货,导致给商家结款的时候,商家各种投诉,于是就根据需求,要做一个订单在发货之后的x天自动确认收货.所谓的订单自动确认收货,就是在在特定的时间,执行一条update语句,改变订单的状态. 二.思路 最笨重的做法,通过linux后台定时任务,查询符合条件的订单,然后update.最理想情况下,如果每分钟都有需要update的订单,这种方式也还行.奈何平台太小,以及卖家发货时间大部分也是密集的,不会分散在24小时的每分钟.那么,

  • Redis实现分布式队列浅析

    Redis是什么? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. redis的安装和配置 Linux系统下: apt-get install redis-server Windows下: 下载链接 下载安装msi文件就好了 配置主从同步 需要实现分布式队列,至少要有一个master(192.168.45.1)和一个slave(192

  • redis 队列操作的例子(php)

    入队操作 复制代码 代码如下: <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); while(True){ try{ $value = 'value_'.date('Y-m-d H:i:s'); $redis->LPUSH('key1',$value); sleep(rand()%3); echo $value."\n"; }catch(Exception $e){ echo $e->g

  • PHP基于Redis消息队列实现发布微博的方法

    本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]https://github.com/ErikDubbelboer/phpRedisAdmin.git[/url] cd phpRedisAdmin git clone [url]https://github.com/nrk/predis.git[/url] vendor 首先安装上述的Redis图形化管理

  • redis实现简单队列

    在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话,建议使用第三方封装好的 EasyNetQ,rabbitMQ比较适合对安全性,稳定性要求较高的地方,但有时我们也会有对这方面要求不是很高的场景,比如:文章阅读数,实时性要求不是很高的地方,所以我想到了用redis来做队列. redis 的List结构本身就是一个链表 (双向链表),所以符合我们

  • 详解Redis用链表实现消息队列

    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换.个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样. 链表实现消息队列 Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型.可以利用lpush和rpop来实现.但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源

  • 详解Python操作RabbitMQ服务器消息队列的远程结果返回

    先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

  • 详解Redis的慢查询日志

    Redis慢查询日志帮助开发和运维人员定位系统存在的慢操作.慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(慢查询ID,发生时间戳,耗时,命令的详细信息)记录下来. Redis客户端一条命令分为如下四部分执行: 需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题.需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题. 一.慢查询的配置参数: 慢查询的预设阀值 slow

  • 详解Redis命令和键_动力节点Java学院整理

    Redis命令用于在redis服务器上执行某些操作. 要在Redis服务器上运行的命令,需要一个Redis客户端. Redis客户端在Redis的包,这已经我们前面安装使用过了. 语法 Redis客户端的基本语法如下: $redis-cli 例子 下面举例说明如何使用Redis客户端. 要启动redis客户端,打开终端,输入命令Redis命令行:redis-cli.这将连接到本地服务器,现在就可以运行各种命令了. $redis-cli redis 127.0.0.1:6379> redis 12

  • 详解redis数据结构之sds

    详解redis数据结构之sds 字符串在redis中使用非常广泛,在redis中,所有的数据都保存在字典(Map)中,而字典的键就是字符串类型,并且对于很大一部分字典值数据也是又字符串组成的.以下是sds的具体存储结构: 从图中可以看出,sds的属性有三个:len.free和buf数组.这里len字段是用来保存sds字符串中所包含字符数目的,free字段则是用来保存buf数组中空余的部分的长度的,而buf数组则是实际用来保存字符串的.比如如下结构保存了"Hello World!"这个字

  • 详解redis数据结构之压缩列表

     详解redis数据结构之压缩列表 redis使用压缩列表作为列表键和哈希键的底层实现之一.当一个列表键只包含少量的列表项,并且每个列表项都是由小整数值或者是短字符串组成,那么redis就会使用压缩列表存储列表项:同理,当一个哈希表包含的键值对都是由小整数值或者是短字符串组成,并且存储的键值对数目不多时,redis也会使用压缩列表来存储哈希表.以下是压缩列表存储结构: zlbytes长度为4个字节,记录了整个压缩列表所占用的字节数 zltail长度为4个字节,记录了压缩列表起始位置到压缩列表尾节

  • 详解Redis 数据类型

    Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合). String(字符串) string 是 redis 最基本的类型,你可以理解成与 Memcached 一模一样的类型,一个 key 对应一个 value. string 类型是二进制安全的.意思是 redis 的 string 可以包含任何数据.比如jpg图片或者序列化的对象. string 类型是 Redis 最基本的数据类型,string 类

  • 详解redis是如何实现队列消息的ack

    前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列. 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整. 大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉:每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从li

  • 深入理解redis分布式锁和消息队列

    最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了. redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String loc

  • 详解redis在nodejs中的应用

    redis是一个性能非常好的内存数据库,部署在应用程序和mysql数据中间做缓存数据库,可以极大的提升应用程序的性能,这里简单介绍nodejs客户端操作redis的demo程序 redis里面总共可以存储5种数据类型,分别是字符串,列表.集合.三列.有序集合:这里将会对这5种数据类型的增删查改一一处理: 1.redis在mac上的安装: https://redis.io/download,当前我用的版本稳定版本是4.0.9,解压之后,进入redis-4.0.9目录,执行make && su

随机推荐

其他