异步redis队列实现 数据入库的方法

业务需求

app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送

出问题之前业务逻辑:

php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据

该用户已经存在 和新增的id

入另一种详情表

问题所在:

当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下

优化思路:

1、异步队列处理

2、redis 过滤(就是只处理当天第一次请求)

3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)

4、拼接插入的以及新增的如详细表中

解决办法:

1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中

首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值

<?php
/**
 * Created by haiyong.
 * User: jia
 * Date: 2017/9/18
 * Time: 20:06
 */
namespace App\Http\Controllers\App;

use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;

class OtherAppController extends Controller{

 /**
  * app应用统计接口
  * @param Request $request
  * @return string
  */
 public function appTotal(Request $request)
 {
  // //历史数据入库
  //$redis = Redis::connection('web_active');
  // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
  // $str = '';
  // foreach ($app_name as $key => $val) {
  //  $str.= "{$val} {$key} ";
  // }
  // $redis->hmset('app_name', $app_name);
  // echo $str;exit;
  $result = $request->input('res');
  $list = json_decode($result, true);
  if (empty ($list) || !is_array($list)) {
   return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
  }
  $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
  $data['time'] = date('Y-m-d');
  $redis_key = 'log_app:'.$data['time'];
  //redis 过滤
  $redis = Redis::connection('web_active');
  //redis 键值过期设置
  if (empty($redis->exists($redis_key))) {
   $redis->hset($redis_key, 1, 'start');
   $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
  }
  //值确定
  if ($redis->hexists($redis_key, $data['uid'])) {
   return json_encode(['result' => 'SUCCESS']);
  } else {
   //推入队列
   $redis->hset($redis_key, $data['uid'], $result);
   $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
   return json_encode(['result' => 'SUCCESS']);
  }
 }
}

2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出

mget 获取该用户的app id 不存在就会返回null

通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;

class AppTotal extends Command
{
 /**
  * The name and signature of the console command.
  *
  * @var string
  */
 protected $signature = 'AppTotal:run';

 /**
  * The console command description.
  *
  * @var string
  */
 protected $description = 'Command description';

 /**
  * Create a new command instance.
  *
  * @return void
  */
 public function __construct()
 {
  parent::__construct();
 }

 /**
  * Execute the console command.
  *
  * @return mixed
  */
 public function handle()
 {
   //历史数据入库
  // $redis = Redis::connection('web_active');
  // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
  // $redis->hmset('app_name', $app_name);
  // exit;
   while(1) {
   $redis = Redis::connection('web_active');
   //队列名称
   $res = $redis->lpop('log_app_list');
   //开关按钮
   $lock = $redis->get('log_app_lock');
   if (!empty($res)) {
    list($date,$uid) = explode(':',$res);
    $result = $redis->hget('log_app:'.$date, $uid);
    if (!empty($result)) {
      $table_name = 'app_total'.date('Ym');
      $list = json_decode($result, true);
      $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
      $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
      $data['device'] = isset($list['device']) ? $list['device'] : '' ;
      $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
      //数据去重 flip比unique更节约性能
      $data['appList'] = array_flip($data['appList']);
      $data['appList'] = array_flip($data['appList']);
      $data['time'] = date('Y-m-d');
      //app应用过滤
      $app_res = $redis->hmget('app_name', $data['appList']);
      //新增加app数组
      $new_app = [];
      //mysql 入库数组
      $mysql_new_app = [];
   //获取当前redis 自增指针
   $total = $redis->get('app_name_total');
   foreach ($app_res as $key =>& $val) {
    if (is_null($val)) {
     $total += 1;
     $new_app[$data['appList'][$key]] = $total;
     $val = $total;
     array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
    }
   }
   if (count($new_app)){
    $str = "INSERT IGNORE INTO app_set_name (id,appName) values";
    foreach ($new_app as $key => $val) {
    $str.= "(".$val.",'".$key."'),";
    }
    $str = trim($str, ',');
    //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
    $mysql_res = DB::connection('phpLog')->statement($str);
    if ($mysql_res) {
     // 设置redis 指针
     $redis->set('app_name_total', $total);
     // redis 数据入库
     $redis->hmset('app_name', $new_app);
    }
  }
    // 详情数据入库
    $data['appList'] = implode(',', $app_res);
      //app统计入库
      DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList)
  values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
      //log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志
      Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n");
  } else {
   Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n");
  }
   }
   //执行间隔
   sleep(1);
   //结束按钮
   if ($lock == 2) {
    exit;
   }
   //内存检测
   if(memory_get_usage()>1000*1024*1024){
    exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端
   }
  }
 }
}

3、执定 定时任务监控脚本执行情况

crontab -e

/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1

test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)

#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`
if [ ! $alive ]
then
 /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
fi

记得授权哦 chmod +x test.sh

笔者用的laravel 框架 将命令激活丢入后台

执行命令

 /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id

这样就实现队列异步入库

还有很多问题需要优化!!大致功能已经实现!!!!!!

优化完成后cpu

以上这篇异步redis队列实现 数据入库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

时间: 2019-10-08

Redis 实现队列原理的实例详解

Redis 实现队列原理的实例详解 场景说明: ·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 ·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求 ·抢购场景,先入先出的模式 命令: rpush + blpop 或 lpush + brpop rpush : 往列表右侧推入数据 blpop : 客户端阻塞直到队列有值输出 简单队列: simple.php $stmt = $pdo->prepare('select id, c

详解thinkphp+redis+队列的实现代码

1,安装Redis,根据自己的PHP版本安装对应的redis扩展(此步骤简单的描述一下) 1.1,安装 php_igbinary.dll,php_redis.dll扩展此处需要注意你的php版本如图: 1.2,php.ini文件新增 extension=php_igbinary.dll;extension=php_redis.dll两处扩展 ok此处已经完成第一步redis环境搭建完成看看phpinfo 项目中实际使用redis 2.1,第一步配置redis参数如下,redis安装的默认端口为6

redis实现简单队列

在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话,建议使用第三方封装好的 EasyNetQ,rabbitMQ比较适合对安全性,稳定性要求较高的地方,但有时我们也会有对这方面要求不是很高的场景,比如:文章阅读数,实时性要求不是很高的地方,所以我想到了用redis来做队列. redis 的List结构本身就是一个链表 (双向链表),所以符合我们

redis中队列消息实现应用解耦的方法

消息队列的场景 我们都知道,消息是在两台计算机之间传送的数据单位,这个"消息"可以非常简单,例如只包含文本字符串,也可以更复杂,可能包含嵌入对象.而所谓的"消息队列"是在消息的传输过程中保存消息的容器.在web程序中,可能我们需要将用户的请求数据更新或者添加到数据库中,但是在高炳发的情况下,虽然作为用户的我们不知道后台是什么原因,但是依旧会抱怨或者吐槽这个程序反应缓慢,比如在过去的几年里,你有没有吐槽过12306抢票很难?反应很慢?有没有在使用某个程序的时候收到&q

Python 抓取数据存储到Redis中的操作

redis是一个key-value存储结构.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set 有序集合)和hash(哈希类型),数据存储如下图分析 为了分别为ID存入多个键值对,此次仅对Hash数据进行操作,例子如下 import os,sys import requests import bs4 import redis #连接Redis r = redis.Redis(host='127

详解redis是如何实现队列消息的ack

前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列. 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整. 大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉:每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从li

使用 Redis 流实现消息队列的代码

在介绍了 Redis 流的基本功能之后, 现在是时候使用这些功能来构建一些实际的应用了. 消息队列作为流的典型应用之一, 具有非常好的示范性, 因此我们将使用 Redis 流的相关功能构建一个消息队列应用, 这个消息队列跟我们之前使用其他 Redis 数据结构构建的消息队列具有相似的功能. 代码清单 10-1 展示了一个具有基本功能的消息队列实现: 代码最开头的是几个转换函数, 它们负责对程序的相关输入输出进行转换和格式化: MessageQueue 类用于实现消息队列, 它的添加消息.移除消息

golang实现redis的延时消息队列功能示例

前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

Redis中5种数据结构的使用场景介绍

一.redis 数据结构使用场景 原来看过 redisbook 这本书,对 redis 的基本功能都已经熟悉了,从上周开始看 redis 的源码.目前目标是吃透 redis 的数据结构.我们都知道,在 redis 中一共有5种数据结构,那每种数据结构的使用场景都是什么呢? String--字符串 Hash--字典 List--列表 Set--集合 Sorted Set--有序集合 下面我们就来简单说明一下它们各自的使用场景: 1. String--字符串 String 数据结构是简单的 key-

Laravel 4.2 中队列服务(queue)使用感受

这半个月,我参与重写了一个微信公众号后端系统,首次使用了laravel 4.2,以及laravel引以为傲的队列服务(queue). 由于整个系统涉及到多端交互,又有大量语音传输.处理的业务,我们在一些地方发现响应时间过长.之前的系统基于node.js和mongoDB,由于node天生就是异步,有守护进程,所以并没有出现过这个问题,而这次重写必然要引入异步流程了.Queue进入了我们的视线. 根据这一页几乎还全是英文的"中文文档" ,laravel恰好在4.2版本中刚刚引入了redis

python 监听salt job状态,并任务数据推送到redis中的方法

salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送 #coding=utf-8 import fnmatch,json,logging import salt.config import salt.utils.event from salt.utils.redis import RedisPool import sys,os,datetime,random import multiprocessing,threading from joi.util

Redis中一些最常见的面试问题总结

前言 经过长达一周的奔波和面试,电话面试,回首今天终于成功的入职了,总共面试了大概10家公司,包括阿里,京东,IBM等等,京东技术过了,学历因为非统招就被pass了,阿里面了2次电话面试就没下文了,估计是我当时最后提问题的时候减分了吧,其他的也有一些offer,不是不想去,就是了无音讯了,眼看年关将近,也由不得我挑挑拣拣了,就直接进了我现在这家公司,主要是感觉公司人不错,薪水这方面也就没有计较太多.好了,书归正文,今天小编就大家送上我精心准备的关于Redis方面的面试题,希望可以帮到还在求职路上

redis中事务机制及乐观锁的实现

Redis事务机制 在MySQL等其他数据库中,事务表示的是一组动作,这组动作要么全部执行,要么全部不执行. Redis目前对事物的支持相对简单.Redis只能保证一个client发起的事务中的命令可以连续的执行,而中间不会插入其他的client命令.当一个client在一个链接中发出multi命令时,这个链接会进入一个事务上下文,该连接后续的命令不会立即执行,而是先放到一个队列中,当执行exec命令时,redis会顺序的执行队列中的所有命令. Multi 开启事务: 127.0.0.1:637