PHP7生产环境队列Beanstalkd用法详解

应用场景

为什么要用呢,有什么好处?这应该放在最开头说,一件东西你只有了解它是干什么的,适合干什么,才能更好的与自己的项目相结合,用到哪里学到哪里,学了不用等于不会,我们平时就应该多考虑一些这样的问题:自己做个什么项目功能能跟 xx 技术相结合呢?这个 xx 技术放在这种业务场景下行不行呢?而不是 “学了这个 xx 技术能干嘛呢,公司现在也没有用这个的呀,学了也没用啊”,带着这样心情去学习 xx 技术,肯定很痛苦。

队列大家都知道是将一些耗时的操作先不去做,先埋点,再异步去处理,这样对一些发邮件发短信之类的耗时操作,用户是感觉不到的,因为埋点结束,操作也就结束了,消费队列都是在服务器上做的。主要应用在短信或邮件通知,访问第三方接口订阅消息,商城的一些秒杀活动,都可以结合队列来完成。

Beanstalkd 介绍

Beanstalkd 是一个高性能,轻量级的分布式内存队列,C 代码,典型的类 Memcached 设计,协议和使用方式都是同样的风格,所以使用过 memcached 的用户会觉得 Beanstalkd 似曾相识。

beanstalkd 的最初设计意图是在高并发的网络请求下,通过异步执行耗时较多的请求,及时返回结果,减少请求的响应延迟。

Ubuntu 安装

sudo apt-get install beanstalkd

配置文件

vim /etc/default/beanstalkd

查看状态

service beanstalkd status
# 命令回显 #
root@:/www/server/php/72/etc# service beanstalkd status
● beanstalkd.service - Simple, fast work queue
  Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
  Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
   Docs: man:beanstalkd(1)
 Main PID: 7033 (beanstalkd)
  Tasks: 1 (limit: 4634)
  CGroup: /system.slice/beanstalkd.service
      └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd
Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.

配置连通性 + 持久化

ip 用 0.0.0.0 允许所有连接,靠配置安全组或防火墙去约束连接,放开 -b 参数 (默认没有持久化),内存的队列消息可以落地到硬盘 binlog 实现持久化,断电可重新读取队列消息。

vim /etc/default/beanstalkd
BEANSTALKD_LISTEN_ADDR=0.0.0.0
BEANSTALKD_LISTEN_PORT=11300
BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

beanstalkd 任务状态

状态 注释
delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态

管理工具

亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。

命令行:https://github.com/src-d/beanstool

web 界面:https://github.com/ptrofimov/beanstalk_console

编程语言客户端

PHP 客户端

https://packagist.org/packages/pda/pheanstalk

composer require pda/pheanstalk

写入 job

<?php
//创建队列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
$jobData = [
  'email' => '123456@163.com',
  'message' => 'Hello World !!',
  'dtime' => date('Y-m-d H:i:s'),
];
$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

消费 job

<?php
ini_set('default_socket_timeout', 86400*7);
ini_set( 'memory_limit', '256M' );
// 消费队列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
while ( true )
{
  // 获取队列信息, reserve 阻塞获取
  $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
  if ( $job !== false )
  {
    $data = $job->getData();
    /* TODO 逻辑操作 */
    /* 处理完成,删除 job */
    $pheanstalk->delete( $job );
  }
}

default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。

关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

就是这样的代码,供参考:

public function watchJob()
{
  $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
  if ( $job !== false )
  {
    $job_data = $job->getData();
    $this->subscribe( $job_data );
    $this->pheanstalk->delete( $job );
    /* 继续 Watch 下一个 job */
    $this->watchJob();
  }
  else
  {
    $this->log->error( 'reserve false', 'reserve false' );
  }
}

监控 beanstalkd 状态

<?php
//监控服务状态
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump( $isAlive );

可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。

一些相关命令

查看 beanstalkd 服务内存占用

top -u beanstalkd

后台运行 consumer 脚本

nohup php googlehome_subscribe.php &

查看 consumer 脚本运行时间

ps -A -opid,stime,etime,args | grep consumer.php

手工重启 consumer 脚本

ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9
nohup php googlehome_subscribe.php &

一些总结

php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。

一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。

try
{
  /* TODO: 逻辑操作 */
}
catch ( ClientException $e )
{
  $results['mid']  = $this->mid;
  $results['code']  = $e->getResponse()->getStatusCode();
  $results['reason'] = $e->getResponse()->getReasonPhrase();
  $this->log->error( 'properties-changed ClientException', $results );
}
catch ( ServerException $e )
{
  $results['mid']  = $this->mid;
  $results['code']  = $e->getResponse()->getStatusCode();
  $results['reason'] = $e->getResponse()->getReasonPhrase();
  $this->log->error( 'properties-changed ServerException', $results );
}
catch ( ConnectException $e )
{
  $results['mid'] = $this->mid;
  $this->log->error( 'properties-changed ConnectException', $results );
}

job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。

以上就是PHP7生产环境队列Beanstalkd用法详解的详细内容,更多关于PHP7中Beanstalkd正确用法的资料请关注我们其它相关文章!

时间: 2020-05-18

关于finalize机制和引用、引用队列的用法详解

C++有析构函数这个东西,能够很好地在对象销毁前做一些释放外部资源的工作,但是java没有.Object.finalize()提供了与析构函数类似的机制,但是它不安全.会导致严重的内存消耗和性能降低,应该避免使用.best practice是:像java类库的IO流.数据库连接.socket一样,提供显示的资源释放接口,程序员使用完这些资源后,必须要显示释放.所以可以忘记Object.finalize()的存在.JVM启动的时候,会创建一个Finalizer线程来支持finalize方法的执行.

java中栈和队列的实现和API的用法(详解)

在java中要实现栈和队列,需要用到java集合的相关知识,特别是Stack.LinkedList等相关集合类型. 一.栈的实现 栈的实现,有两个方法:一个是用java本身的集合类型Stack类型:另一个是借用LinkedList来间接实现Stack. 1.Stack实现 直接用Stack来实现非常方便,常用的api函数如下: boolean        isEmpty() // 判断当前栈是否为空 synchronized E        peek() //获得当前栈顶元素 synchro

脚手架vue-cli工程webpack的基本用法详解

webpack的打包依赖于它的一个重要配置文件webpack.config.js,在这个配置文件中就可以指定所有在源代码编译过程中的工作了,就一个配置就可以与冗长的Gruntfile或者Gulpfile说再见了. 一个完整的工程项目中的webpack的配置远远没有这么简单,随着工程的构建要求的增加,webpack.config.js内的配置项目也会随之增加,webpack还有许许多多的选项提供给我们进行灵活配置,它只是一个构建工具,我们只需要了解在Vue项目中它基本能为我们做到的工作.最小化的配

OGNL表达式基本语法与用法详解

一.OGNL中的#.%和$符号 #.%和$符号在OGNL表达式中经常出现,而这三种符号也是开发者不容易掌握和理解的部分.在这里我们简单介绍它们的相应用途. 1.#符号的三种用法 1)访问非根对象属性,例如示例中的#session.msg表达式,由于Struts 2中值栈被视为根对象,所以访问其他非根对象时,需要加#前缀.实际上,#相当于ActionContext. getContext():#session.msg表达式相当于ActionContext.getContext().getSessi

Python 中 Virtualenv 和 pip 的简单用法详解

本文介绍了Python 中 Virtualenv 和 pip 的简单用法详解,分享给大家,具体如下: 0X00 安装环境 我们在 Python 开发和学习过程中需要用到各种库,然后在各个不同的项目和作品里可能用的版本还不一样,正因为有这种问题的存在才催生了virtualenv的诞生.virtualenv 可以在电脑上创建一个虚拟环境,可以针对每一个项目创建一个虚拟环境,这样就不用担心各个不同的项目用不同版本的库的时候出现的冲突了. 下面的内容只适用于 Linux/OSX,未经 Windows 环

php 中的closure用法详解

Closure,匿名函数,是php5.3的时候引入的,又称为Anonymous functions.字面意思也就是没有定义名字的函数.比如以下代码(文件名是do.php) <?php function A() { return 100; }; function B(Closure $callback) { return $callback(); } $a = B(A()); print_r($a);//输出:Fatal error: Uncaught TypeError: Argument 1

Linux中 sed 和 awk的用法详解

sed用法: sed是一个很好的文件处理工具,本身是一个管道命令,主要是以行为单位进行处理,可以将数据行进行替换.删除.新增.选取等特定工作,下面先了解一下sed的用法 sed命令行格式为: sed [-nefri] 'command' 输入文本 常用选项: -n∶使用安静(silent)模式.在一般 sed 的用法中,所有来自 STDIN的资料一般都会被列出到萤幕上.但如果加上 -n 参数后,则只有经过sed 特殊处理的那一行(或者动作)才会被列出来. -e∶直接在指令列模式上进行 sed 的

Mac OS X 下有关Android adb用法详解

Mac OS X 下有关Android adb用法详解 一.什么是adb? ADB的全称是Android Debug Bridge,用来调试Android程序的,白话点就是debug工具! 位置:一般下载Android的SDK时候在platform-tools中有adb程序.  二.在mac上配置adb命令环境 1. 运行命令 cd $home 进入到用户home目录 2. 创建 .bash_profile文件 :touch .bash_profile 打开文件命令: open -e .bash

C++中stack、queue、vector的用法详解

一.栈(stack) 引入头文件 #include<stack> 常用的方法 empty() 堆栈为空则返回真 pop() 移除栈顶元素 push() 在栈顶增加元素 size() 返回栈中元素数目 top() 返回栈顶元素 3.实例代码 #include<iostream> #include<stack> using namespace std; int main(){ //创建栈 s stack<int> s; //将元素压入栈 for(int i=0;

PHP设计模式之适配器模式定义与用法详解

本文实例讲述了PHP设计模式之适配器模式定义与用法.分享给大家供大家参考,具体如下: 适配器很容易理解, 大多数人家庭都有手机转接器, 用来为移动电话充电,这就是一种适配器. 如果只有USB接头, 就无法将移动电话插到标准插座上. 实际上, 必须使用一个适配器, 一端接USB插头, 一端接插座. 当然, 你可以拿出电气工具,改装USB连接头, 或者重新安装插座, 不过这样会带来很多额外的工作, 而且可能会把连接头或插座弄坏. 所以, 最可取的方法就是找一个适配器. 软件开发也是如此. 类适配器模