详解Python实现多进程异步事件驱动引擎

本文介绍了详解Python实现多进程异步事件驱动引擎,分享给大家,具体如下:

多进程异步事件驱动逻辑

逻辑

code

# -*- coding: utf-8 -*-

'''
author:    Jimmy
contact:   234390130@qq.com
file:     eventEngine.py
time:     2017/8/25 上午10:06
description: 多进程异步事件驱动引擎

'''

__author__ = 'Jimmy'

from multiprocessing import Process, Queue

class EventEngine(object):
  # 初始化事件事件驱动引擎
  def __init__(self):
    #保存事件列表
    self.__eventQueue = Queue()
    #引擎开关
    self.__active = False
    #事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
    self.__handlers = {}
    #保存事件处理进程池
    self.__processPool = []
    #事件引擎主进程
    self.__mainProcess = Process(target=self.__run)

  #执行事件循环
  def __run(self):
    while self.__active:
      #事件队列非空
      if not self.__eventQueue.empty():
        #获取队列中的事件 超时1秒
        event = self.__eventQueue.get(block=True ,timeout=1)
        #执行事件
        self.__process(event)
      else:
        # print('无任何事件')
        pass

  #执行事件
  def __process(self, event):
    if event.type in self.__handlers:
      for handler in self.__handlers[event.type]:
        #开一个进程去异步处理
        p = Process(target=handler, args=(event, ))
        #保存到进程池
        self.__processPool.append(p)
        p.start()

  #开启事件引擎
  def start(self):
    self.__active = True
    self.__mainProcess.start()

  #暂停事件引擎
  def stop(self):
    """停止"""
    # 将事件管理器设为停止
    self.__active = False
    # 等待事件处理进程退出
    for p in self.__processPool:
      p.join()
    self.__mainProcess.join()

  #终止事件引擎
  def terminate(self):
    self.__active = False
    #终止所有事件处理进程
    for p in self.__processPool:
      p.terminate()
    self.__mainProcess.join()

  #注册事件
  def register(self, type, handler):
    """注册事件处理函数监听"""
    # 尝试获取该事件类型对应的处理函数列表,若无则创建
    try:
      handlerList = self.__handlers[type]
    except KeyError:
      handlerList = []
      self.__handlers[type] = handlerList

    # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
    if handler not in handlerList:
      handlerList.append(handler)

  def unregister(self, type, handler):
    """注销事件处理函数监听"""
    # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
    try:
      handlerList = self.__handlers[type]

      # 如果该函数存在于列表中,则移除
      if handler in handlerList:
        handlerList.remove(handler)

      # 如果函数列表为空,则从引擎中移除该事件类型
      if not handlerList:
        del self.__handlers[type]
    except KeyError:
      pass

  def sendEvent(self, event):
    #发送事件 像队列里存入事件
    self.__eventQueue.put(event)

class Event(object):
  #事件对象
  def __init__(self, type =None):
    self.type = type
    self.dict = {}

#测试
if __name__ == '__main__':
  import time
  EVENT_ARTICAL = "Event_Artical"

  # 事件源 公众号
  class PublicAccounts:
    def __init__(self, eventManager):
      self.__eventManager = eventManager

    def writeNewArtical(self):
      # 事件对象,写了新文章
      event = Event(EVENT_ARTICAL)
      event.dict["artical"] = u'如何写出更优雅的代码\n'
      # 发送事件
      self.__eventManager.sendEvent(event)
      print(u'公众号发送新文章\n')

  # 监听器 订阅者
  class ListenerTypeOne:
    def __init__(self, username):
      self.__username = username

    # 监听器的处理函数 读文章
    def ReadArtical(self, event):
      print(u'%s 收到新文章' % self.__username)
      print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))

  class ListenerTypeTwo:
    def __init__(self, username):
      self.__username = username

    # 监听器的处理函数 读文章
    def ReadArtical(self, event):
      print(u'%s 收到新文章 睡3秒再看' % self.__username)
      time.sleep(3)
      print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))

  def test():
    listner1 = ListenerTypeOne("thinkroom") # 订阅者1
    listner2 = ListenerTypeTwo("steve") # 订阅者2

    ee = EventEngine()

    # 绑定事件和监听器响应函数(新文章)
    ee.register(EVENT_ARTICAL, listner1.ReadArtical)
    ee.register(EVENT_ARTICAL, listner2.ReadArtical)
    for i in range(0, 20):
      listner3 = ListenerTypeOne("Jimmy") # 订阅者X
      ee.register(EVENT_ARTICAL, listner3.ReadArtical)

    ee.start()

    #发送事件
    publicAcc = PublicAccounts(ee)
    publicAcc.writeNewArtical()

  test()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2017-08-24

详解python之多进程和进程池(Processing库)

环境:win7+python2.7 一直想学习多进程或多线程,但之前只是单纯看一点基础知识还有简单的介绍,无法理解怎么去应用,直到前段时间看了github的一个爬虫项目涉及到多进程,多线程相关内容,一边看一边百度相关知识点,现在把一些相关知识点和一些应用写下来做个记录. 首先说下什么是进程:进程是程序在计算机上的一次执行活动,当运行一个程序的时候,就启动了一个进程.而进程又分为系统进程和用户进程.只要是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身;而所有由你

详解python中的线程

Python中创建线程有两种方式:函数或者用类来创建线程对象. 函数式:调用 _thread 模块中的start_new_thread()函数来产生新线程. 类:创建threading.Thread的子类来包装一个线程对象. 1.线程的创建 1.1 通过thread类直接创建 import threading import time def foo(n): time.sleep(n) print("foo func:",n) def bar(n): time.sleep(n) prin

详解Python的爬虫框架 Scrapy

网络爬虫,是在网上进行数据抓取的程序,使用它能够抓取特定网页的HTML数据.虽然我们利用一些库开发一个爬虫程序,但是使用框架可以大大提高效率,缩短开发时间.Scrapy是一个使用Python编写的,轻量级的,简单轻巧,并且使用起来非常的方便. 一.概述 下图显示了Scrapy的大体架构,其中包含了它的主要组件及系统的数据处理流程(绿色箭头所示).下面就来一个个解释每个组件的作用及数据的处理过程(注:图片来自互联网). 二.组件 1.Scrapy Engine(Scrapy引擎) Scrapy引擎

详解Python自建logging模块

简单使用 最开始,我们用最短的代码体验一下logging的基本功能. import logging logger = logging.getLogger() logging.basicConfig() logger.setLevel('DEBUG') logger.debug('logsomething') #输出 out>>DEBG:root:logsomething 第一步,通过logging.getLogger函数,获取一个loger对象,但这个对象暂时是无法使用的. 第二步,loggi

详解python中asyncio模块

一直对asyncio这个库比较感兴趣,毕竟这是官网也非常推荐的一个实现高并发的一个模块,python也是在python 3.4中引入了协程的概念.也通过这次整理更加深刻理解这个模块的使用 asyncio 是干什么的? 异步网络操作并发协程 python3.0时代,标准库里的异步网络模块:select(非常底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程 现在的asyncio,有了很多的模块已经在支持:aiohttp,ai

详解python中GPU版本的opencv常用方法介绍

引言 本篇是以python的视角介绍相关的函数还有自我使用中的一些问题,本想在这篇之前总结一下opencv编译的全过程,但遇到了太多坑,暂时不太想回看做过的笔记,所以这里主要总结python下GPU版本的opencv. 主要函数说明 threshold():二值化,但要指定设定阈值 blendLinear():两幅图片的线形混合 calcHist() createBoxFilter ():创建一个规范化的2D框过滤器 canny边缘检测 createGaussianFilter():创建一个Ga

详解Python IO口多路复用

什么是IO 多路复用呢? 我一个SocketServer有500个链接连过来了,我想让500个链接都是并发的,每一个链接都需要操作IO,但是单线程下IO都是串行的,我实现多路的,看起来像是并发的效果,这就是多路复用! 概念说明: 在进行解释之前,首先要说明几个概念: - 用户空间和内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方).操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权

详解python里使用正则表达式的分组命名方式

详解python里使用正则表达式的分组命名方式 分组匹配的模式,可以通过groups()来全部访问匹配的元组,也可以通过group()函数来按分组方式来访问,但是这里只能通过数字索引来访问,如果某一天产品经理需要修改需求,让你在它们之中添加一个分组,这样一来,就会导致匹配的数组的索引的变化,作为开发人员的你,必须得一行一行代码地修改.因此聪明的开发人员又想到一个好方法,把这些分组进行命名,只需要对名称进行访问分组,不通过索引来访问了,就可以避免这个问题.那么怎么样来命名呢?可以采用(?P<nam

详解Python import方法引入模块的实例

详解Python import方法引入模块的实例 在Python用import或者from-import或者from-import-as-来导入相应的模块,作用和使用方法与C语言的include头文件类似.其实就是引入某些成熟的函数库和成熟的方法,避免重复造轮子,提高开发速度. python的import方法可以引入系统的模块,也可以引入我们自己写好的共用模块,这点和PHP非常相似,但是它们的具体细节还不是很一样.因为php是在引入的时候指明引入文件的具体路径,而python中不能够写文件路径进