Python通过zookeeper实现分布式服务代码解析

借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.

这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.

zk_server.py

import threading
import json
import socket
import sys
from kazoo.client import KazooClient

# TCP服务端绑定端口开启监听,同时将自己注册到zk
class ZKServer(object):
  def __init__(self, host, port):
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.host = host
    self.port = port
    self.sock.bind((host, port))
    self.zk = None

  def serve(self):
    """
    开始服务,每次获取得到一个信息,都新建一个线程处理
    """
    self.sock.listen(128)
    self.register_zk()
    print("开始监听")
    while True:
      conn, addr = self.sock.accept()
      print("建立链接%s" % str(addr))
      t = threading.Thread(target=self.handle, args=(conn, addr))
      t.start()

  # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
  def handle(self, conn, addr):
    while True:
      data=conn.recv(1024)
      if not data or data.decode('utf-8') == 'exit':
        break
      print(data.decode('utf-8'))
    conn.close()
    print('My work is done!!!')

  # 将自己注册到zk,临时节点,所以连接不能中断
  def register_zk(self):
    """
    注册到zookeeper
    """
    self.zk = KazooClient(hosts='127.0.0.1:2181')
    self.zk.start()
    self.zk.ensure_path('/rpc') # 创建根节点
    value = json.dumps({'host': self.host, 'port': self.port})
    # 创建服务子节点
    self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)

if __name__ == '__main__':
  if len(sys.argv) < 3:
    print("usage:python server.py [host] [port]")
    exit(1)
  host = sys.argv[1]
  port = sys.argv[2]
  server = ZKServer(host, int(port))
  server.serve()

zk_client.py

import random
import sys
import time
import json
import socket

from kazoo.client import KazooClient

# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient(object):
  def __init__(self):
    self._zk = KazooClient(hosts='127.0.0.1:2181')
    self._zk.start()
    self._get_servers()

  def _get_servers(self, event=None):
    """
    从zookeeper获取服务器地址信息列表
    """
    servers = self._zk.get_children('/rpc', watch=self._get_servers)
    # print(servers)
    self._servers = []
    for server in servers:
      data = self._zk.get('/rpc/' + server)[0]
      if data:
        addr = json.loads(data.decode())
        self._servers.append(addr)

  def _get_server(self):
    """
    随机选出一个可用的服务器
    """
    return random.choice(self._servers)

  def get_connection(self):
    """
    提供一个可用的tcp连接
    """
    sock = None
    while True:
      server = self._get_server()
      print('server:%s' % server)
      try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((server['host'], server['port']))
      except ConnectionRefusedError:
        time.sleep(1)
        continue
      else:
        break
    return sock
if __name__ == '__main__':
  # 模拟多个客户端批量生成任务,推送给服务器执行
  client = ZKClient()
  for i in range(40):
    sock = client.get_connection()
    sock.send(bytes(str(i), encoding='utf8'))
    sock.close()
    time.sleep(1)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2020-07-22

windows系统搭建zookeeper服务器的教程

安装&配置 在apache的官方网站提供了好多镜像下载地址,然后找到对应的版本 下载地址: http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz Windows下安装 把下载的zookeeper的文件解压到指定目录 C:\ZK\zookeeper-3.4.14> 修改conf下增加一个zoo.cfg 内容如下: # The number of milliseconds of each

Netty + ZooKeeper 实现简单的服务注册与发现

一. 背景 最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理. 二. Netty 的使用 在接收到派单任务之后,通过 Netty 推送到指定门店相关的设备.在我们的系统中 Netty 实现了消息推送.长连接以及心跳机制. 2.1 Netty Server 端: 每个 Netty 服务端通过 ConcurrentHashMap 保存了客户端的 clientId 以及它连接的 SocketChannel. 服务器端向客户端发送消息时,只要获取

基于 ZooKeeper 搭建 Hadoop 高可用集群 的教程图解

一.高可用简介 Hadoop 高可用 (High Availability) 分为 HDFS 高可用和 YARN 高可用,两者的实现基本类似,但 HDFS NameNode 对数据存储及其一致性的要求比 YARN ResourceManger 高得多,所以它的实现也更加复杂,故下面先进行讲解: 1.1 高可用整体架构 HDFS 高可用架构如下: 图片引用自: https://www.edureka.co/blog/how-to-set-up-hadoop-cluster-with-hdfs-hi

Zookeeper连接超时问题与拒绝连接的解决方案

今天在工作中突然遇到这个问题,开始郁闷得不行,查阅了很多资料才解决.话不多少先上图 ①解决连接超时问题 1:在Linux下输入命令ifconfig -a   看看是否ip地址输入错误 2:关闭Linux防火墙,输入  chkconfig iptables off 命令 ②解决拒绝连接问题 报错图忘截了,不好意思,还是直接说解决方案吧! 将前面的127.0.0.1删掉,输入:wq 命令保存就行了,原因是与输入的地址发生冲入,所以拒绝连接. 总结 以上就是这篇文章的全部内容了,希望本文的内容对大家的

zookeeper python接口实例详解

本文主要讲python支持zookeeper的接口库安装和使用.zk的python接口库有zkpython,还有kazoo,下面是zkpython,是基于zk的C库的python接口. zkpython安装 前提是zookeeper安装包已经在/usr/local/zookeeper下 cd /usr/local/zookeeper/src/c ./configure make make install wget --no-check-certificate http://pypi.python

Docker搭建Zookeeper&Kafka集群的实现

最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64

viper配置框架的介绍支持zookeeper的读取和监听

viper作为配置框架,其功能非常的强大,我们没有理由不去了解一下.我们先看官网对它的功能简介: viper是完整配置解决方案,他可以处理所有类型和格式的配置文件,他有如下功能: 设置默认配置 支持读取 JSON TOML YAML HCL 和 Java 属性配置文件 监听配置文件变化,实时读取读取配置文件内容 读取环境变量值 读取远程配置系统 (etcd Consul) 和监控配置变化 读取命令 Flag 值 读取 buffer 值 读取确切值 乍一看,未免有相见恨晚之感,可仔细一想,不免脑袋

Zookeeper未授权访问测试问题

前言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等. zookeeper 未授权访问是指安装部署之后默认情况下不需要任何身份验证,从而导致 zookeeper 被远程利用,导致大量服务级别的信息泄露. 默认使用端口:2181.2182. 探测Zookeeper服务开放 如使用nmap探测某个目标

Redis未授权访问配合SSH key文件利用详解

前言 Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API. Redis 未授权访问的问题是一直存在的问题,知道创宇安全研究团队历史上也做过相关的应急,今日,又出现 Redis 未授权访问配合 SSH key 文件被利用的情况,导致一大批 Redis 服务器被黑,今天我们来简要的分析下. 一.漏洞概述 Redis 默认情况下,会绑定在 0.0.0.0:6379,这样将会将 Redis 服务暴露到公网上,如果在没有

关于Redis未授权访问漏洞利用的介绍与修复建议

前言 本文主要给大家介绍了关于Redis未授权访问漏洞利用的相关内容,文中对该漏洞进行了详细,并给出了相对应的修复/安全建议,下面话不多说了,来一起看看详细的介绍吧. 一.漏洞介绍 Redis 默认情况下,会绑定在 0.0.0.0:6379,这样将会将 Redis 服务暴露到公网上,如果在没有开启认证的情况下,可以导致任意用户在可以访问目标服务器的情况下未授权访问 Redis 以及读取 Redis 的数据.攻击者在未授权访问 Redis 的情况下可以利用 Redis 的相关方法,可以成功在 Re

windows 2008 iis 提示401未授权 由于凭据无效,访问被拒绝

401 - 未授权: 由于凭据无效,访问被拒绝.您无权使用所提供的凭据查看此目录或页面. 按照IIS默认向导配置完毕,访问时就是这种错误. 解决方案: 点开身份验证 改为启用就OK了 重启一下IIS. 如果你上在办法没有解决可参考 1.打开"IIS信息服务管理器"-->选择你发布的网站-->选择功能视图中的"身份验证"-->右键匿名身份验证,选择"编辑",选择"特定用户IUSR":2.右键要发布的网站文件夹,

python脚本实现Redis未授权批量提权

前言 本文主要给大家介绍了关于redis未授权批量提权的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 安装依赖 sudo easy_install redis 使用 redis python hackredis.py usage: hackredis.py [-h] [-l IPLIST] [-p PORT] [-r ID_RSAFILE] [-sp SSH_PORT] For Example: -----------------------------------

基于Docker的MongoDB实现授权访问的方法

基于Docker部署一个数据库实例通常比直接在服务器上安装数据库还要简单,Gevin在开发环境中经常使用基于docker的数据库服务,docker也渐渐成为Gevin在Linux上安装MongoDB的首选方式,由于MongoDB默认是不用通过认证就能直接连接的,出于安全考虑,在公网上部署MongoDB时,务必设置authentication机制,以避免类似 "黑客赎金" 问题的发生. 那么,基于Docker拉起的MongoDB,如何实现通过用户名密码访问指定数据库呢?方法很简单,但前提

Android实现授权访问网页的方法

本文实例讲述了Android授权访问网页的实现方法,即使用Webview显示OAuth Version 2.a ImplicitGrant方式授权的页,但是对于移动终端不建议使用Authorize code grant方式授权. 具体功能代码如下所示: import android.annotation.SuppressLint; import android.app.Activity; import android.content.Intent; import android.graphics

servlet+jsp实现过滤器 防止用户未登录访问

我们可能经常会用到这一功能,比如有时,我们不希望用户没有进行登录访问后台的操作页面,而且这样的非法访问会让系统极为的不安全,所以我们常常需要进行登录才授权访问其它页面,否则只会出现登录页面,当然我的思路: 一种是在jsp页面进行session的判断,如果不存在该用户的session,就跳转到登录页面,否则执行jsp页面代码,但是你会发现这样做逻辑也简单,但是非常麻烦,如果有很多个jsp,那么就要写多个判断. 另一种则是利用过滤器,访问页面时都进行过滤验证,如果存在该用户session,则访问该页

利用JWT如何实现对API的授权访问详解

什么是JWT JWT(JSON Web Token)是一个开放标准(RFC 7519),它定义了一种紧凑且独立的方式,可以在各个系统之间用JSON作为对象安全地传输信息,并且可以保证所传输的信息不会被篡改. 「JWT」由三部分构成 信息头:指定了使用的签名算法 声明部分:其中也可以包含超时时间 基于指定的算法生成的签名 通过这三部分信息,API 服务端可以根据「JWT」信息头和声明部分的信息重新生成签名.之所以可以这样做,是因为生成签名需要的秘钥存放在服务器端. jwtauth.New("HS2

利用Pyhton中的requests包进行网页访问测试的方法

为了测试一组网页是否能够访问,采取python中的requests包进行批量的访问测试,并输出访问结果. 一.requests包的安装 打开命令行(win+r输入cmd启动); 打开pythion安装目录下的Python\Python36-32\Scripts,将其中的pip文件拖动到命令行中; 在之后输入install requests命令; 二.访问方法 import requests fin = open('urls.txt', 'r') fout = open('result.txt',