Python脚本:MySQL表写入MongoDB


     近日工作中接到一个工单,是将现有的MySQL表写入MongoDB供前端使用。于是就写了一个Python脚本。趁着周末再修修补补完善一下,以后就可以满足很多基本的工作需要了。脚本具体如下。

# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
author      : 蛙鳜鸡鹳狸猿
create_time : 2017年 03月 19日 星期日 05:55:05 CST
program     : *_* Read table from MySQL and Write to MongoDB collection *_*
"""
import re
import time
import pymongo
import MySQLdbclass MySQL2Mongo:"""Self test successfully under Python2.7.12"""def __init__(self, ms_con, ms_db, ms_tb, mg_con, mg_db, mg_cl, default_key=True, ids_=None, add_tag=False):"""Arguments set for data transformation job.Of main are database of MySQL and MongoDB.Default parameters are free for choosing.:param ms_con: stringconnection of MySQL:param ms_db: stringdatabase of MySQL data reads from:param ms_tb: stringtable of MySQL data reads from:param mg_con: stringconnection of MongoDB:param mg_db: stringdatabase of MongoDB data write to:param mg_cl: stringcollection of MongoDB data write to:param default_key: bool, default : Truewhether to add and use MongoDB default "_id" (it is very necessary if not MySQL table owns an unique column):param ids_: string, default : Nonean MySQL table column uses to substitute as MongoDB collection "_id"(if not "default_key" an "ids_" is needed):param add_tag: bool, default : Falsewhether to add an timestamp column named "ts" as tag for each MongoDB collection documents when write"""self.ms_con      = ms_conself.ms_db       = ms_dbself.ms_tb       = ms_tbself.mg_con      = mg_conself.mg_db       = mg_dbself.mg_cl       = mg_clself.default_key = default_keyself.ids_        = ids_self.add_tag     = add_tagdef __repr__(self):return "<...{0} ^^^ {1}...>".format(self.ms_con, self.mg_con)def intnull(self, values):"""……\(^o^)/ Avoid TypeError of built_in int() used for None \(^o^)/……:param values: any which can transfer to built_in int() and None for a plus:return: int values"""if values:values = int(values)return valuesdef getkeys(self, column):"""……o(>﹏<)o Just do a friendly check whether MySQL column "ids_" can be as the role of MongoDB "_id" or not o(>﹏<)o……:param column: MySQL table column:return: key of the column"""sql = """SELECT c.COLUMN_KEYFROM information_schema.`COLUMNS` cWHERE c.COLUMN_NAME = '{0}'AND c.TABLE_NAME = '{1}'AND c.TABLE_SCHEMA = '{2}';""".format(column, self.ms_tb, self.ms_db)cur_ = self.ms_con.cursor()cur_.execute(sql)data = cur_.fetchone()return data[0]def gettype(self, column):"""……o(>﹏<)o Just do a friendly check whether a MySQL column is needed to int() or not o(>﹏<)o……:param column: MySQL table column:return: type of the column"""sql = """SELECT c.COLUMN_TYPEFROM information_schema.`COLUMNS` cWHERE c.COLUMN_NAME = '{0}'AND c.TABLE_NAME = '{1}'AND c.TABLE_SCHEMA = '{2}';""".format(column, self.ms_tb, self.ms_db)cur_ = self.ms_con.cursor()cur_.execute(sql)data = cur_.fetchone()return data[0]def getitem(self):"""Do some judgement and get an original "key : value" relationship of MySQL table:return: an stored dict {column : record} of MySQL table"""sql = """SELECT c.COLUMN_NAME, c.ORDINAL_POSITION - 1 AS ORDINAL_POSITIONFROM information_schema.`COLUMNS` cWHERE c.TABLE_NAME = '{0}'AND c.TABLE_SCHEMA = '{1}';""".format(self.ms_tb, self.ms_db)cur_ = self.ms_con.cursor()cur_.execute(sql)res = cur_.fetchall()DIC = {}if   self.default_key == True:if self.ids_:print "Notice: argument set for \"ids_\"", "\033[1;31;40m", self.ids_, "\033[0m", "will be ignored"for row in res:typ = self.gettype(column=row[0])if re.findall(r"\Dt\(\d", typ):DIC[row[0]] = "self.intnull(row[{0}])".format(int(row[1]))else:DIC[row[0]] = "row[{0}]".format(int(row[1]))elif self.default_key == False:if   self.ids_ == None:raise TypeError, "Argument \"ids_\" is not Given"elif self.ids_ != None:try:key = self.getkeys(column=self.ids_)if   key != "PRI" and key != "UNI":print "Warning: ununique MySQL column", "\033[1;31;40m", self.ids_, "\033[0m", "may not use for MongoDB \"_id\""elif key == "PRI" or  key == "UNI":print "Get unique MySQL column", "\033[1;31;40m", self.ids_, "\033[0m", "use for MongoDB \"_id\""except Exception, e:print(e)for row in res:typ = self.gettype(column=row[0])if re.findall(r"\Dt\(\d", typ):if row[0] == self.ids_:DIC["_id"] = "self.intnull(row[{0}])".format(int(row[1]))else:DIC[row[0]] = "self.intnull(row[{0}])".format(int(row[1]))else:if row[0] == self.ids_:DIC["_id"] = "row[{0}]".format(int(row[1]))else:DIC[row[0]] = "row[{0}]".format(int(row[1]))return DICdef getdata(self):"""Read MySQL table data:return: tuples of MySQL table columns data"""sql = "SELECT * FROM {0}.{1};".format(self.ms_db, self.ms_tb)cur_ = self.ms_con.cursor()cur_.execute(sql)data = cur_.fetchall()return datadef wridata(self):"""Write MongoDB collection data:return: None"""if self.getdata():dire = "self.mg_con.{0}.{1}.insert(DATA)".format(self.mg_db, self.mg_cl)ITEM = self.getitem()for row in self.getdata():DATA = ITEM.copy()for i, j in DATA.items():DATA[(i)] = eval(j)if self.add_tag:DATA["ts"] = time.strftime("%Y-%m-%d %X")exec direself.ms_con.close()self.mg_con.close()# self test
if __name__ == "__main__":MySQLDB = {"host": "localhost","user": "root","passwd": "520","port": 3306,"db": "information_schema","charset": "UTF8"}con_mysql = MySQLdb.connect(**MySQLDB)MongoDB = {"host": "localhost","port": 27017}con_mongo = pymongo.MongoClient(**MongoDB)STT = MySQL2Mongo(ms_con=con_mysql, ms_db="information_schema", ms_tb="TABLES",mg_con=con_mongo, mg_db="information_schema", mg_cl="TABLES")print(STT)STT.wridata()

     除了MySQL表数据的完整迁移以为,还做了一些细节处理,主要如下。

①主键的设定和选择

②数值字段的正常写入

③自定义写入标识(时间戳)

     不同数据库之间的数据迁移工作,还是蛮有意思的。用各种数据库自带的导出导入工具可能会更简单方便一点。之后用到了再写其他脚本吧。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部