基于python实现操作redis及消息队列

操作 redis

import redis
redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
redis= redis.Redis(connection_pool=redisPool)

redis.set('key','values')
redis.get('com')
redis.append('keys','values')
redis.delete('keys')

print(redis.getset('name','Mike')) #赋值name为Mike并返回上一次的value
print(redis.mget(['name','age']))  #输出name键和age键的value
print(redis.setnx('newname','james')) #如果键值不存在,则赋值
print(redis.mset({'name1':'smith','name2':'curry'})) #批量赋值
print(redis.msetnx({'name3':'ltf','name4':'lsq'}))  #不存在才批量赋值
print(redis.incr('age',1))  #age对应的value 加1
print(redis.decr('age',5))  #age对应的value 减5
print(redis.append('name4','is a sb'))  #在name4的value后追加 is a sb 返回字符串长度
print(redis.substr('name',1,4))  #截取键 name

print(redis.sadd('tags','Book','Tea','Coffee')) #返回集合长度 3
print(redis.srem('tags','Book')) #返回删除的数据个数
print(redis.spop('tags'))  #随机删除并返回该元素
print(redis.smove('tags','tags1','Coffee'))
print(redis.scard('tags')) # 获取tags集合的元素个数
print(redis.sismember('tags', 'Book')) # 判断Book是否在tags的集合中
print(redis.sinter('tags', 'tags1')) # 返回集合tags和集合tags1的交集
print(redis.sunion('tags', 'tags1')) # 返回集合tags和集合tags1的并集
print(redis.sdiff('tags', 'tags1')) # 返回集合tags和集合tags1的差集
print(redis.smembers('tags')) # 返回集合tags的所有元素

print(redis.hset('price','cake',5)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数
print(redis.hsetnx('price','book',6)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数
print(redis.hget('price', 'cake')) # 获取键名为cake的值 返回5
print(redis.hmset('price',{'banana':2,'apple':3,'pear':6,'orange':7}))  #批量添加映射
print(redis.hmget('price', ['apple', 'orange'])) # 查询apple和orange的值 输出 b'3',b'7'
print(redis.hincrby('price','apple',3))  #apple映射加3 为6
print(redis.hexists('price', 'banana')) # 在price中banana是否存在 返回True
print(redis.hdel('price','banana'))  #从price中删除banana 返回1
print(redis.hlen('price')) # 输出price的长度
print(redis.hkeys('price')) # 输出所有的映射键名
print(redis.hvals('price')) # 输出所有的映射键值
print(redis.hgetall('price')) # 输出所有的映射键对

print(redis.rpush('list',1,2,3)) #向键名为list的列表尾部添加1,2,3 返回长度
print(redis.lpush('list',0))  #向键名为list的列表头部添加0 返回长度
print(redis.llen('list'))  #返回列表的长度
print(redis.lrange('list',1,3)) #返回起始索引为1 终止索引为3的索引范围对应的列表
print(redis.lindex('list',1))  #返回索引为1的元素-value
print(redis.lset('list',1,5)) #将list的列表索引为1的重新赋值为5
print(redis.lpop('list')) #删除list第一个元素
print(redis.rpop('list'))  #删除list最后一个元素
print(redis.blpop('list'))  #删除list第一个元素
print(redis.brpop('list'))  #删除最后一个元素
print(redis.rpoplpush('list','list1'))  #删除list的尾元素并将其添加到list1的头部

消息队列使用例子

import redis
import json
redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)

# 顺序插入五条数据到redis队列,sort参数是用来验证弹出的顺序
while True:
  num = 0
  for i in range(0, 100):
    num = num + 1
    # params info
    params_dict = {"name": f"test {num}", "sort":num}

    client.rpush("test", json.dumps(params_dict))

  # 查看目标队列数据
  result = client.lrange("test", 0, 100)
  print(result)
  import time
  time.sleep(10)
import redis
import time
import multiprocessing
import time
import os
import random

redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)

def test1(msg):
  t_start = time.time()
  print("%s开始执行,进程号为%d" % (msg, os.getpid()))
  time.sleep(random.random() * 2)
  t_stop = time.time()
  print("%s执行完成,耗时%.2f" % (msg, t_stop - t_start))

while True:
  number = client.llen('test')
  print("现在的队列任务 条数是 ", number)
  p = 100
  if number > p-1:
    print("-----start-----")
    a = []
    for i in range(p):
      result = client.lpop("test")
      a.append(result)
    print("每10条读取一次", a)
    po = multiprocessing.Pool(p)
    for i in range(0, p):
      # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
      # 每次循环将会用空闲出来的子进程去调用目标
      po.apply_async(test1, (a[i],))
    po.close() # 关闭进程池,关闭后po不再接收新的请求
    po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")
    time.sleep(2)
  elif number < p and number > 0:
    print("-----start-----")
    a = []
    for i in range(number):
      a = []
      result = client.lpop("test")
      a.append(result)
    print("小于10条的 读取一次 ", a)
    po = multiprocessing.Pool(number)
    for i in a:
      # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
      # 每次循环将会用空闲出来的子进程去调用目标
      po.apply_async(test1, (a,))

    po.close() # 关闭进程池,关闭后po不再接收新的请求
    po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")
    time.sleep(2)
  elif number == 0:
    print("没有任务需要处理")
    time.sleep(2)
  else:
    time.sleep(2)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2020-08-27

Python进程间通信Queue消息队列用法分析

本文实例讲述了Python进程间通信Queue消息队列用法.分享给大家供大家参考,具体如下: 进程间通信-Queue Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. 1. Queue的使用 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示下Queue的工作原理: 代码如下: #coding=utf-8 from multiprocessing import Queue #初始化一个

Python RabbitMQ消息队列实现rpc

上个项目中用到了ActiveMQ,只是简单应用,安装完成后直接是用就可以了.由于新项目中一些硬件的限制,需要把消息队列换成RabbitMQ. RabbitMQ中的几种模式和机制比ActiveMQ多多了,根据业务需要,使用RPC实现功能,其中踩过的一些坑,有必要记录一下了. 上代码,目录结构分为 c_server.c_client.c_hanlder: c_server: #!/usr/bin/env python # -*- coding:utf-8 -*- import pika import

Python的消息队列包SnakeMQ使用初探

一.关于snakemq的官方介绍 SnakeMQ的GitHub项目页:https://github.com/dsiroky/snakemq 1.纯python实现,跨平台 2.自动重连接 3.可靠发送--可配置的消息方式与消息超时方式 4.持久化/临时 两种队列 5.支持异步 -- poll() 6.symmetrical -- 单个TCP连接可用于双工通讯 7.多数据库支持 -- SQLite.MongoDB-- 8.brokerless - 类似ZeroMQ的实现原理 9.扩展模块:RPC,

详解Python操作RabbitMQ服务器消息队列的远程结果返回

先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

Python中线程的MQ消息队列实现以及消息队列的优点解析

"消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

Python高级编程之消息队列(Queue)与进程池(Pool)实例详解

本文实例讲述了Python高级编程之消息队列(Queue)与进程池(Pool).分享给大家供大家参考,具体如下: Queue消息队列 1.创建 import multiprocessing queue = multiprocessing.Queue(队列长度) 2.方法 方法 描述 put 变量名.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait 变量名.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报

python实现RabbitMQ的消息队列的示例代码

最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

利用Python学习RabbitMQ消息队列

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

使用PHP访问RabbitMQ消息队列的方法示例

本文实例讲述了使用PHP访问RabbitMQ消息队列的方法.分享给大家供大家参考,具体如下: 扩展安装 PHP访问RabbitMQ实际使用的是AMQP协议,所以我们只要安装epel库中的php-pecl-amqp这个包即可 rpm -ivh http://mirror.neu.edu.cn/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm yum install php-pecl-amqp 交换建立 <?php $connection = new

.NetCore利用BlockingCollection实现简易消息队列

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa. 我们用BlockingCollection来实现简单的消息队列. BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现.与其他Concurrent集合一样,每次Add或Take元素,都会导致对集合的lock.只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类. MSDN中的示例用法: using (BlockingColle

Python操作RabbitMQ服务器实现消息队列的路由功能

Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server),这里我们来看一下MQ相关的路由功能. 路由键的实现 比如有一个需要给所有接收端发送消息的场景,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了. 路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键.发送

利用Python操作消息队列RabbitMQ的方法教程

前言 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发

Spring学习笔记3之消息队列(rabbitmq)发送邮件功能

rabbitmq简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ. 本节的内容是用户注册时,将邮