Python confluent kafka客户端配置kerberos认证流程详解

kafka的认证方式一般有如下3种:

1.SASL/GSSAPI 从版本0.9.0.0开始支持

2.SASL/PLAIN 从版本0.10.0.0开始支持

3.SASL/SCRAM-SHA-256 以及 SASL/SCRAM-SHA-512 从版本0.10.2.0开始支持

其中第一种SASL/GSSAPI的认证就是kerberos认证,对于java来说有原生的支持,但是对于python来说配置稍微麻烦一些,下面说一下具体的配置过程,confluent kafka模块底层依赖于librdkafka,这是使用c编写的高性能的kafka客户端库,有好多语言的库都是依赖于这个,所以GSSAPI接口的开启也需要在librdkafka编译的时候支持

librdkafka源码:https://github.com/edenhill/librdkafka

编译之前需要先安装必要的开发包,否则相关的接口编译不进去

首先是openssl库,使用yum安装为:yum -y install openssl openssl-devel,编译openssl只能支持默认的PLAIN还有SCRAM这两种机制,无法支持GSSAPI的机制,还需要编译libsasl2依赖,yum安装命令如下:

yum install cyrus-sasl-gssapi cyrus-sasl-devel

在ubuntu下使用命令:apt-get install libsasl2-modules-gssapi-mit libsasl2-dev安装libsasl2开发包

然后确认一下是否有zlib库,这个是方便对kafka消息压缩使用的,一般都会存在,安装命令:yum install zlib-devel,如果需要更高的性能可以手动编译安装zstd并且启用压缩,这里不再详细叙述

上面的库都安装成功就可以开始编译librdkafka源码了,这里源码包为:librdkafka-1.2.1.tar.gz,安装命令如下:

# 解压包
tar -xvzf librdkafka-1.2.1.tar.gz
cd librdkafka-1.2.1
# 编译源码
./configure
make
make install

上面注意一下在执行命令./configure之后,根据输出确认libssl以及libsasl2是否被开启,如下:

这里libssl以及libsasl2都显示ok说明是可以的,现在SSL和SASL SCRAM以及SASL GSSAPI都已经支持了,执行configure阶段没指定prefix则默认安装位置为/usr/local,动态库位置就为:/usr/local/lib,需要将这个目录添加到动态库连接列表中,比如加到/etc/ld.so.conf,保存后执行ldconfig生效

最后可以编译和安装python的confluent kafka模块,这里安装的版本是1.2.0,安装之后可以运行下面的代码测试:

#!/usr/bin/env python3
# coding=utf-8
from confluent_kafka import Producer

def delivery_report(err, msg):
  """ Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush(). """
  if err is not None:
    print('Message delivery failed: {}'.format(err))
  else:
    print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == '__main__':
  producer_conf = {
    "bootstrap.servers": '192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
    "security.protocol": 'sasl_plaintext',
    'sasl.kerberos.service.name': 'kafka',
    'sasl.kerberos.keytab': '/opt/user.keytab',
    'sasl.kerberos.principal': 'kafkauser',
  }
  p = Producer(producer_conf)

  p.poll(0)
  p.produce('testTopic', 'confluent kafka test'.encode('utf-8'),
    callback=delivery_report)

  p.flush()
  print('done')

如果生产消息正常就配置成功了,使用GSSAPI只需要配置security.protocol以及keytab的路径即可,其他的认证参数比如用户名和密码在不同的认证机制下配置,更多的配置参数参考文档:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

librdkafka SASL认证的详细配置流程参考:https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2020-10-12

在python环境下运用kafka对数据进行实时传输的方法

背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性.先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要. kafka简介: Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外ka

python消费kafka数据批量插入到es的方法

1.es的批量插入 这是为了方便后期配置的更改,把配置信息放在logging.conf中 用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2 from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg&

python读取Kafka实例

1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum

深入了解如何基于Python读写Kafka

这篇文章主要介绍了深入了解如何基于Python读写Kafka,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇会给出如何使用python来读写kafka, 包含生产者和消费者. 以下使用kafka-python客户端 生产者 爬虫大多时候作为消息的发送端, 在消息发出去后最好能记录消息被发送到了哪个分区, offset是多少, 这些记录在很多情况下可以帮助快速定位问题, 所以需要在send方法后加入callback函数, 包括成功和失败的处理

python操作kafka实践的示例代码

1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

python3实现从kafka获取数据,并解析为json格式,写入到mysql中

项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中. 配置信息: [Global] kafka_server=xxxxxxxxxxx:9092 kafka_topic=mes consumer_group=test100 passwd = tracking port = 3306 host = xxxxxxxxxx user = track schema = track

kafka-python 获取topic lag值方式

说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o

python每5分钟从kafka中提取数据的例子

我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

python如何在列表、字典中筛选数据

python如何在列表.字典中筛选数据? 实际问题有哪些? 1.过滤掉列表[3,9,-1,10.-2......] 中负数 2.筛选出字典 {'li_ming':90,'xiao_hong':60,'li_kang':95,'bei_men':98} 中值高于90的项 3.筛选出集合{3,9,-1,10.-2......]中能被3整除的数 问题1如何解决? 最普通方法: #!/usr/bin/python3 def filter_l(data): res = [] for i in data:

Python实现将MySQL数据库表中的数据导出生成csv格式文件的方法

本文实例讲述了Python实现将MySQL数据库表中的数据导出生成csv格式文件的方法.分享给大家供大家参考,具体如下: #!/usr/bin/env python # -*- coding:utf-8 -*- """ Purpose: 生成日汇总对账文件 Created: 2015/4/27 Modified:2015/5/1 @author: guoyJoe """ #导入模块 import MySQLdb import time impor

使用python对多个txt文件中的数据进行筛选的方法

一.问题描述 筛选出多个txt文件中需要的数据 二.数据准备 这是我自己建立的要处理的文件,里面是随意写的一些数字和字母 三.程序编写 import os def eachFile(filepath): pathDir =os.listdir(filepath) #遍历文件夹中的text return pathDir def readfile(name): fopen=open(name,'r') for lines in fopen.readlines(): #按行读取text中的内容 lin

Python使用pymysql从MySQL数据库中读出数据的方法

python3.x已经不支持mysqldb了,支持的是pymysql 使用pandas读取MySQL数据时,使用sqlalchemy,出现No module named 'MySQLdb'错误. 安装:打开Windows PowerShell,输入pip3 install PyMySQL即可 import pymysql.cursors import pymysql import pandas as pd #连接配置信息 config = { 'host':'127.0.0.1', 'port'

php录入页面中动态从数据库中提取数据的实现

摘要:用php制作动态web页面时,在提交服务器之前,让php根据用户在当前页面上录入的某字段的值立即从数据库中取出相关的其它字段的值并显示到当前页面上,是php程序开发中的难点.本文以一个具体实例详细介绍了怎样将两个html内嵌式语言php和javascript巧妙结合起来,解决这个难点的具体方法. 关键词:php.动态.html. 现在的网站已经从以前提供静态信息的形式发展到交互式的提供动态信息业务的方式.web的信息服务形式可以概括为两点:向客户提供信息:记录客户提交的信息.要提供这两种服

Python cookbook(数据结构与算法)筛选及提取序列中元素的方法

本文实例讲述了Python筛选及提取序列中元素的方法.分享给大家供大家参考,具体如下: 问题:提取出序列中的值或者根据某些标准对序列做删减 解决方案:列表推导式.生成器表达式.使用内建的filter()函数 1.列表推导式方法:存在一个潜在的缺点,如果输入数据非常大可能会产生一个庞大的结果,考虑到该问题,建议选择生成器表达式 # Examples of different ways to filter data mylist = [1, 4, -5, 10, -7, 2, 3, -1] prin

Python编程实现从字典中提取子集的方法分析

本文实例讲述了Python编程实现从字典中提取子集的方法.分享给大家供大家参考,具体如下: 首先我们会想到使用字典推导式(dictionary comprehension)来解决这个问题,例如以下场景: prices={'ACME':45.23,'APPLE':666,'IBM':343,'HPQ':33,'FB':10} #选出价格大于 200 的 gt200={key:value for key,value in prices.items() if value > 200} print(gt

Python实现从log日志中提取ip的方法【正则提取】

本文实例讲述了Python实现从log日志中提取ip的方法.分享给大家供大家参考,具体如下: log日志内容如下(myjob.log): 124.90.53.68 - - [05/Feb/2018 11:37:07] "GET /favicon.ico HTTP/1.1" 404 - 61.148.245.145 - - [05/Feb/2018 12:37:44] "GET / HTTP/1.1" 200 - 61.148.245.145 - - [05/Feb/

Python cookbook(数据结构与算法)从字典中提取子集的方法示例

本文实例讲述了Python从字典中提取子集的方法.分享给大家供大家参考,具体如下: 问题:想创建一个字典,其本身是另一个字典的子集 解决方案:利用字典推导式(dictionary comprehension)可轻松解决 # example of extracting a subset from a dictionary from pprint import pprint prices = { 'ACME': 45.23, 'AAPL': 612.78, 'IBM': 205.55, 'HPQ':