Python实现将MongoDB中的数据导入到MySQL

本文主要介绍了一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql。该工具类实现了获取 MongoDB 数据类型、创建 MySQL 表结构以及将数据从 MongoDB 推送到 MySQL 等功能。

通过该工具类,用户可以轻松地将 MongoDB 中的数据导入到 MySQL 中,实现数据的转移和使用。

使用该工具类,用户需要传入相应的参数,包括 MongoDB 的连接信息,MySQL 的连接信息,以及表名、是否设置最大长度、批处理大小和表描述等信息。具体使用可以参考代码中的注释。

实现代码

import pymysql
from loguru import logger

class MongoToMysql:
    def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
                 mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
        self.mongo_host = mongo_host
        self.mongo_port = mongo_port
        self.mongo_db = mongo_db
        self.mongo_collection = mongo_collection
        self.mysql_host = mysql_host
        self.mysql_port = mysql_port
        self.mysql_user = mysql_user
        self.mysql_password = mysql_password
        self.mysql_db = mysql_db
        self.table_name = table_name
        self.set_max_length = set_max_length
        self.batch_size = batch_size
        self.table_description = table_description
        self.data_types = self.get_mongo_data_types()
        self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
        self.push_data_to_mysql(self.batch_size)

    def get_mongo_data_types(self):
        logger.debug('正在获取mongo中字段的类型!')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        data_types = {}
        for field in collection.find_one().keys():
            data_types[field] = type(collection.find_one()[field]).__name__
        return data_types

    def check_mysql_table_exists(self):
        logger.debug('检查是否存在该表,有则删之!')
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def get_max_length(self, field):
        logger.debug(f'正在获取字段 {field} 最大长度......')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        max_length = 0
        for item in collection.find({},{field:1,'_id':0}):
            value = item.get(field)
            if isinstance(value, str) and len(value) > max_length:
                max_length = len(value)
        return max_length

    def create_mysql_table(self, data_types,set_max_length,table_description):
        logger.debug('正在mysql中创建表结构!')
        self.check_mysql_table_exists()
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        fields = []
        for field, data_type in data_types.items():
            if data_type == 'int':
                fields.append(f"{field} INT")
            elif data_type == 'float':
                fields.append(f"{field} FLOAT")
            elif data_type == 'bool':
                fields.append(f"{field} BOOLEAN")
            else:
                if set_max_length:
                    fields.append(f"{field} TEXT)")
                else:
                    max_length = self.get_max_length(field)
                    fields.append(f"{field} VARCHAR({max_length + 200})")
        fields_str = ','.join(fields)
        sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def push_data_to_mysql(self, batch_size=10000):
        logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        # table_name = self.mongo_collection
        data = []
        count = 0
        for item in collection.find():
            count += 1
            row = []
            for field, data_type in self.data_types.items():
                value = item.get(field)
                if value is None:
                    row.append(None)
                elif data_type == 'int':
                    row.append(str(item.get(field, 0)))
                elif data_type == 'float':
                    row.append(str(item.get(field, 0.0)))
                elif data_type == 'bool':
                    row.append(str(item.get(field, False)))
                else:
                    row.append(str(item.get(field, '')))
            data.append(row)
            if count % batch_size == 0:
                placeholders = ','.join(['%s'] * len(data[0]))
                sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
                cursor.executemany(sql, data)
                conn.commit()
                data = []
                logger.debug(f'--- 已完成推送:{count} 条数据! ----')
        if data:
            placeholders = ','.join(['%s'] * len(data[0]))
            sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
            cursor.executemany(sql, data)
            conn.commit()
            logger.debug(f'--- 已完成推送:{count} 条数据! ----')
        conn.close()

if __name__ == '__main__':
    """MySQL"""
    mongo_host = '127.0.0.1'
    mongo_port = 27017
    mongo_db = 'db_name'
    mongo_collection = 'collection_name'

    """MongoDB"""
    mysql_host = '127.0.0.1'
    mysql_port = 3306
    mysql_user = 'root'
    mysql_password = '123456'
    mysql_db = 'mysql_db'

    table_description = ''  # 表描述

    mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
                                  mysql_user, mysql_password, mysql_db,table_description=table_description)

    #
    # table_name = None  # 默认为None 则MySQL中的表名跟Mongo保持一致
    # set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型
    # batch_size = 10000   # 控制每次插入数据量的大小
    # table_description = '' # 表描述
    # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
    #                               mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)

代码使用了 PyMongo、PyMySQL 和 Loguru 等 Python 库,并封装了一个 MongoToMysql 类。在类的初始化时,会自动获取 MongoDB 中字段的类型,并根据字段类型创建 MySQL 表结构。在将数据从 MongoDB 推送到 MySQL 时,还可以控制每次插入数据量的大小,以避免一次性插入大量数据造成系统崩溃或性能下降。

需要注意的是,在创建 MySQL 表结构时,如果用户选择了设置最大长度,则会创建 TEXT 类型的字段,否则会根据 MongoDB 中字段的最大长度加上200来设置 VARCHAR 类型的字段长度。

总之,本文介绍的 MongoToMysql 工具类非常方便实用,对于需要将 MongoDB 数据迁移到 MySQL 的用户来说,是一种很好的解决方案。

到此这篇关于Python实现将MongoDB中的数据导入到MySQL的文章就介绍到这了,更多相关Python MongoDB数据导入到MySQL内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python中MySQL数据迁移到MongoDB脚本的方法

    MongoDB简介 MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写.旨在为 WEB 应用提供可扩展的高性能数据存储解决方案. MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. MongoDB是一个文档数据库,在存储小文件方面存在天然优势.随着业务求的变化,需要将线上MySQL数据库中的行记录,导入到MongoDB中文档记录. 一.场景:线上MySQL数据库某表迁移到MongoDB,字段无变化. 二.Python

  • python连接MySQL、MongoDB、Redis、memcache等数据库的方法

    用Python写脚本也有一段时间了,经常操作数据库(MySQL),现在就整理下对各类数据库的操作,如后面有新的参数会补进来,慢慢完善. 一,python 操作 MySQL:详情见:[apt-get install python-mysqldb] 复制代码 代码如下: #!/bin/env python# -*- encoding: utf-8 -*-#-------------------------------------------------------------------------

  • Python实现将数据框数据写入mongodb及mysql数据库的方法

    本文实例讲述了Python实现将数据框数据写入mongodb及mysql数据库的方法.分享给大家供大家参考,具体如下: 主要内容: 1.数据框数据写入mongdb方法 2.数据框数据写入mysql方法 为了以后不重复造轮子,这里总结下,如何把数据框数据写入mysql和mongodb的方法记录下来,省得翻来翻去.下面记录的都是精华. 写入mongodb代码片段(使用pymongo库): ##########################写入mongodb 数据库#################

  • Python操作MySQL MongoDB Oracle三大数据库深入对比

    目录 1. Python操作Oracle数据库 2. Python操作MySQL数据库 3. Python操作MongoDB数据库 作为数据分析师,掌握一门数据库语言,是很有必要的. 今天黄同学就带着大家学习两个关系型数据库MySQL.Oracle,了解一个非关系数据库MongoDB. 1. Python操作Oracle数据库 这一部分的难点在于:环境配置有点繁琐.不用担心,我为大家写了一篇关于Oracle环境配置的文章. Python操作Oracle使用的是cx_Oracle库.需要我们使用如

  • Java将excel中的数据导入到mysql中

    我们在实际工作中的一些时候会需要将excel中的数据导入数据库,如果你的数据量成百上千甚至更多,相信一点点ctrlc.ctrlv也不是办法,这里我们以mysql数据库为例,将excel中的数据存入数据库. 我的思路是:先将excel中的数据取出来,再把数据传入数据库,操作excel需要jxl.jar,操作数据库可以用最基本的jdbc,需要mysql-connector-java-5.0.8-bin.jar这个jar包. 下面我们先看一下excel截图: 再来看最后的效果图: 下面贴出整个代码:

  • Python 结构化字符串中提取数据详情

    目录 前言 从结构化字符串中提取数据 字符串解析 前言 在许多自动化任务中,我们都需要从已知格式结构化的输入文本中提取相关信息.例如,我们可能需要在一段电影评论数据中提取观影时间.电影名.评分等信息,以便存储后进行进一步分析.在本节中,我们将以提取电影评论数据信息为例讲解如何从结构化字符串中提取数据. 从结构化字符串中提取数据 假设我们具有以下结构的电影评分数据,我们需要解析存储观影时间.电影名.评分等信息: [<Timestamp>] - MOVIE ID: <movie id>

  • Python读取excel文件中的数据,绘制折线图及散点图

    目录 一.导包 二.绘制简单折线 三.pandas操作Excel的行列 四.pandas处理Excel数据成为字典 五.绘制简单折线图 六.绘制简单散点图 一.导包 import pandas as pd import matplotlib.pyplot as plt 二.绘制简单折线 数据:有一个Excel文件lemon.xlsx,有两个表单,表单名分别为:Python 以及student. Python的表单数据如下所示: student的表单数据如下所示:  1.在利用pandas模块进行

  • 将sqlite3中数据导入到mysql中的实战教程

    前言 sqlite3只小巧轻便,但是并不支持并发访问,当网站并发量较大时候,数据库请求队列边长,有可能导致队列末尾去数据库操作超时,从而操作失败.因此需要切换到支持并发访问的数据库.切换数据库需要将老的数据导出,再导入到新的数据库中,但是sqlite3和mysql的数据库并不完全兼容,需要做部分调整才能正常导入到mysql中.我最近工作中就遇到了这个问题. 最近一个项目中使用magenetico抓取磁力链接,由于它使用的是sqlite3, 文件会越来越大,而且不支持分布式:所以需要将其改造成My

  • asp.net中EXCEL数据导入到数据库的方法

    本文实例讲述了asp.net中EXCEL数据导入到数据库的方法.分享给大家供大家参考.具体分析如下: excel是办公中非常常用的一个办公表格了,但我们在开发中通常会需要直接把excel数据快速导入到数据库中了,这里整理了一个asp.net中EXCEL数据导入到数据库的例子供各位参考学习. 注意:EXCEL中的第一行不能导入. 下面是源码:IntoExcel.aspx: 复制代码 代码如下: <%@ Page  AutoEventWireup="true" CodeFile=&q

  • Python使用xlrd模块操作Excel数据导入的方法

    本文实例讲述了Python使用xlrd模块操作Excel数据导入的方法.分享给大家供大家参考.具体分析如下: xlrd是一个基于python的可以读取excel文件的产品.和pyExcelerator相比,xlrd的主要特点在于读的功能比较强大,提供了表单行数.列数.单元格数据类型等pyExcelrator无法提供的详细信息,使得开发人员无须了解表单的具体结构也能对表单中的数据进行正确的分析转换. 但是xlrd仅仅提供了读取excel文件的功能,不能像pyExcelrator那样生成excel文

  • asp实现excel中的数据导入数据库

    asp实现excel中的数据导入数据库 <% Response.CodePage=65001%> <% Response.Charset="UTF-8" %> <% wenjian = request.Form("select") '获取文件扩展名 ext = FileExec(wenjian) '判断文件扩展名 if ext <> "xls" then response.Write("<

  • Python实现将SQLite中的数据直接输出为CVS的方法示例

    本文实例讲述了Python实现将SQLite中的数据直接输出为CVS的方法.分享给大家供大家参考,具体如下: 对于SQLite来说,目前查看还是比较麻烦,所以就像把SQLite中的数据直接转成Excel中能查看的数据,这样也好在Excel中做进一步分数据处理或分析,如前面文章中介绍的<使用Python程序抓取新浪在国内的所有IP>.从网上找到了一个将SQLite转成CVS的方法,贴在这里,供需要的朋友使用: import sqlite3 import csv, codecs, cStringI

  • 如何把ACCESS的数据导入到Mysql中

    如何把ACCESS的数据导入到Mysql中  www.Alltips.Com 2001-10-6  极限技术网 在建设网站的过程中,经常要处理一些数据的导入及导出.在Mysql数据库中,有两种方法来处理数据的导出(一般).   1. 使用select * from table_name into outfile "file_name";   2. 使用mysqldump实用程序   下面我们来举例说明:   假设我们的数据库中有一个库为samp_db,一个表为samp_table.现在

  • Python解析Excle文件中的数据方法

    在公司里面,人力资源部每到发工资的时候就会头疼,如果公司内部有100多号员工,那么发完工资后需要给员工发送工资条的话,那么就需要截图如下图, 但是在公司的薪水保密协议不允许公开所有人的薪水,因此我们需要一个一个的发,现在我们给张三发一下薪资条 如果我们给1000人发的话,我们每个人都截图两次,面上的标题和线面的数据两栏,那么这个工程是比较大的.这个工作是循环的,死板的,那么我们就需要使用程序来解决这个问题. #coding=utf-8 import xlrd data = xlrd.open_w

随机推荐