Python如何快速实现分布式任务

深入读了读python的官方文档,发觉Python自带的multiprocessing模块有很多预制的接口可以方便的实现多个主机之间的通讯,进而实现典型的生产者-消费者模式的分布式任务架构。

之前,为了在Python中实现生产者-消费者模式,往往就会选择一个额外的队列系统,比如rabbitMQ之类。此外,你有可能还要设计一套任务对象的序列化方式以便塞入队列。如果没有队列的支持,那不排除有些同学不得不从socket服务器做起,直接跟TCP/IP打起交道来。

其实multiprocessing.managers中有个BaseManager就为开发者提供了这样一个快速接口。

我们假定的场景是1个生产者(producer.py)+8个消费者(worker.py)的系统,还有一个中央节点负责协调(server.py)实现如下:

server.py

from multiprocessing.managers import BaseManager
import Queue

queue = Queue.Queue() #初始化一个Q,用于消息传递
class QueueManager(BaseManager):
  pass

QueueManager.register('get_queue', callable=lambda:queue) # 在系统中发布get_queue这个业务

if __name__ == '__main__':
  m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
 # 监听所有10.239.85.193的50000口
  s = m.get_server()
  s.serve_forever()

worker.py

from multiprocessing.managers import BaseManager
from multiprocessing import Pool

class QueueManager(BaseManager):
 pass

QueueManager.register('get_queue') 

def feb(i): #经典的'山羊增殖'
  if i < 2: return 1
  if i < 5 : return feb(i-1) + feb(i-2)
  return feb(i-1) + feb(i-2) - feb(i-5)

def worker(i):
  m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
#连接server
  m.connect()
  while True:
    queue = m.get_queue()
# 获取Q
   c = queue.get()
 print feb(c)

if __name__ == '__main__':

  p = Pool(8) # 分进程启动8个worker
  p.map(worker, range(8))
producer.py

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
  pass
QueueManager.register('get_queue')

if __name__ == '__main__':
 m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
 m.connect()
 i = 0
 while True:
   queue = m.get_queue()
   queue.put(48)

   i+=1

系统会直接将Queue() 对象中的数据直接封装后通过TCP 50000端口在主机之间传递。不过需要注意的是,由于authkey的缘故,各个节点要求python的版本一致。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2017-07-05

python 如何快速找出两个电子表中数据的差异

最近刚接触python,找点小任务来练练手,希望自己在实践中不断的锻炼自己解决问题的能力. 公司里会有这样的场景:有一张电子表格的内容由两三个部门或者更多的部门用到,这些员工会在维护这些表格中不定期的跟新一些自己部门的数据,时间久了,大家的数据就开始打架了,非常不利于管理.怎样快速找到两个或者多个电子表格中数据的差异呢? 解决办法: 1. Excel自带的方法(有兴趣的自行百度) 2. python 写一个小脚本 #!/usr/bin/env python # -*- coding: utf-8

Python实现快速多线程ping的方法

本文实例讲述了Python实现快速多线程ping的方法.分享给大家供大家参考.具体如下: #!/usr/bin/python #_*_coding:utf-8_*_ # ''' 名称:快速多线程ping程序 开发:gyhong gyh9711 日期:20:51 2011-04-25 ''' import pexpect import datetime from threading import Thread host=["192.168.1.1","192.168.1.123

教你用Python脚本快速为iOS10生成图标和截屏

简介 这两天更新完Xcode8之后发现Xcode对图标的要求又有了变化,之前用的一个小应用"IconKit"还没赶上节奏,已经不能满足Xcode8的要求了. 于是就想起来用Python自己做个脚本来生成图标. 其实这个脚本很早就写了,现在为了适应iOS10,就修改完善下,并且放到了 GitHub . 可以看看效果图: 代码: #encoding=utf-8 #by 不灭的小灯灯 #create date 2016/5/22 #update 2016/9/21 #support iOS

Python如何快速上手? 快速掌握一门新语言的方法

那么Python如何快速上手?找来了一篇广受好评的新语言学习方法介绍,供大家参考. 听说,你决定要为你的 "技能树" 再添加一门特定的编程语言.那该怎么办呢? 在这篇文章中,作者提出了 12 项关于学习技术的建议.记住每个人学习的方式都不一样.其中一些可能对你十分有用,而其他的则可能无法满足你的需求.如果你开始担心一个策略,请尝试另一个策略并看看它哪里适合你. 1. 将其与类似的语言进行比较.当你首次观看有关该语言的第一个教程或阅读代码时,请尝试猜测该语言的每个部分将会做什么,并检查你

1 行 Python 代码快速实现 FTP 服务器

摘要: 当你想快速共享一个目录的时候,这是特别有用的,只需要1行代码即可实现. 当你想快速共享一个目录的时候,这是特别有用的,只需要1行代码即可实现. FTP 服务器,在此之前我都是使用Linux的vsftpd软件包来搭建FTP服务器的,现在发现了利用pyftpdlib可以更加简单的方法即可实现FTP服务器的功能. 环境要求 Python 2.7 Windows / Linux 环境搭建 一行代码实现FTP服务器 通过Python的-m选项作为一个简单的独立服务器来运行,当你想快速共享一个目录的

python字典快速保存于读取的方法

在使用python编程过程中,我们往往需要借助字典来提高编程效率.同时为了调试方便,我们希望将某些变量保存为中间文件. 例如,在协同过滤算法中,相似性的训练结果可以保存为中间文件,方便调试.python对字典的保存与读取可以借助于json方便的实现. #保存 dict_name = {1:{1:2,3:4},2:{3:4,4:5}} f = open('temp.txt','w') f.write(str(dict_name)) f.close() #读取 f = open('temp.txt'

Python多进程入门、分布式进程数据共享实例详解

本文实例讲述了Python多进程入门.分布式进程数据共享.分享给大家供大家参考,具体如下: python多进程入门 https://docs.python.org/3/library/multiprocessing.html 1.先来个简单的 # coding: utf-8 from multiprocessing import Process # 定义函数 def addUser(): print("addUser") if __name__ == "__main__&qu

transform python环境快速配置方法

经常在数据开发中需要搞udf,最近发现transform更加方便易用,但是经常会涉及到集群python版本不一.包不全或者部分机器上没有安装python. 所以咱们需要快速的进行环境配置. 因为mac自带安装好的python,所以就不讲怎么安装了.可以去官网下个: https://www.python.org/downloads/source/ 1.安装虚拟环境工具: 执行:pip install virtualenv,如果没有pip的话可以google一把,自行安装 2.创建虚拟环境: 新建一

Python实现的redis分布式锁功能示例

本文实例讲述了Python实现的redis分布式锁功能.分享给大家供大家参考,具体如下: #!/usr/bin/env python # coding=utf-8 import time import redis class RedisLock(object): def __init__(self, key): self.rdcon = redis.Redis(host='', port=6379, password="", db=1) self._lock = 0 self.lock