一个执行MySQL常规读写操作的Python脚本

应用层面对数据库的操作集中在读和写上面,具体可以主要细分为以下三个常规需求。

①写(DDL、DML):不需要实际返回值(非程序退出码的查询结果集)

②读(DQL):需要返回查询结果集(或None)

③读(DQL):虽然也关注查询结果,但只关注查询结果的布尔(bool)性,即判断是否存在。

本脚本主要就是为了实现MySQL / MariaDB这三方面的操作需求,同时做了一些额外但常规的封装,具体文件定义如下。

├── baseutil.py
    base function utility
├── log.py
    log logging handler
└── mdbutil.py
    MySQL/MariaDB handler utility

baseutil.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-06-06
program       : *_* base function utility *_*
"""def str_dict_key(_dict):"""Convert dict keys in Python's built-in Bytes data type into formatted str type."""dict_ = {}for subs in _dict:dict_[subs.decode("utf-8")] = _dict[subs]return dict_def str_dict_value(_dict):"""Convert dict values in Python's built-in Bytes and / or other data types into formatted str type."""for subs in _dict:if _dict[subs]:try:_dict[subs] = _dict[subs].decode("utf-8")except AttributeError:_dict[subs] = str(_dict[subs])def combine_lines_str(multi_line_str: str) -> str:"""Convert str in lines into a single line."""single_line_str = ''for line in multi_line_str.split('\n'):if line:single_line_str += line.lstrip() + ' 'return single_line_str

log.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-03-20
program       : *_* log logging handler *_*
"""import sys
import logging
import functoolsclass LOG(object):"""Log logging definition."""def __init__(self, level=logging.INFO, stream=sys.stdout, filename=None, filemode='a', datefmt="%Y-%m-%d %H:%M:%S",format="%(asctime)s\t%(levelname)s\t< Module: %(module)s, Function: %(funcName)s >\t%(message)s",**kwargs):"""LOG init.:param level: arg pass to standard library logging.basicConfig().:param stream: arg pass to standard library logging.basicConfig().:param filename: arg pass to standard library logging.basicConfig().:param filemode: arg pass to standard library logging.basicConfig().:param datefmt: arg pass to standard library logging.basicConfig().:param format: arg pass to standard library logging.basicConfig().:param kwargs: arg pass to standard library logging.basicConfig()."""self.level = levelself.stream = streamself.filename = filenameself.filemode = filemodeself.datefmt = datefmtself.format = formatself.kwargs = kwargsdef logger(self, name=__name__):"""Logger object generates.:param name: Logger name(parameters pass to standard library logging.getLogger()).:return: Logger object."""args = {"level": self.level,"stream": self.stream,"filename": self.filename,"filemode": self.filemode,"datefmt": self.datefmt,"format": self.format,}try:if not self.filename:del args["filename"]except KeyError:passlogging.basicConfig(**args, **self.kwargs)return logging.getLogger(name)def raise_log(raised_except, msg=''):"""Raise exception by hand to escape error exit caused by built-in [raise].:param raised_except: Exception object.:param msg: Exception feedback message.:return: Python's built-in exit code.Output Exception of [raised_except]."""try:raise raised_except(msg)except raised_except as E:logging.exception(E)def log(logger=None, exc_msg='', if_exit=False, exit_msg='', result_check=False, check_except=None, check_msg=''):"""Log logging decorator function.:param logger: Logger object(see also logging.getLogger()).:param exc_msg: extra message to return when exceptions catch.:param if_exit: Boolean.Whether to exit or not when catching exception.:param exit_msg: extra message to return when exceptions catch and exit.:param result_check: Boolean.Whether or not to check Python's False status result of [func]'s return.:param check_except: Exception object to raise when [result_check].:param check_msg: Exception feedback message to return when [result_check].:return: decorated function [func]'s return."""if not logger:logger = LOG().logger()def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):result = Nonetry:result = func(*args, **kwargs)except BaseException:logger.exception(exc_msg)if if_exit:sys.exit(exit_msg)finally:if result_check:if not result:raise_log(check_except, check_msg)return resultreturn wrapperreturn decoratorif __name__ == "__main__":LOG_Test = LOG# LOG().logger().info("INFO:Come...")# LOG().logger().error("ERROR:Come...")# @log()# def get(a, b):#     return a / b# get(a=1, b=0)

mdbutil.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-06-09
program       : *_* MySQL/MariaDB handler utility *_*
"""import sys
import typingimport baseutilimport log
logger = log.LOG().logger()c_flag = False
try:import _mysql_connector
except ModuleNotFoundError:logger.warning('Import "_mysql_connector" failed, fall back to strictly use "mysql.connector"...')
else:c_flag = True
finally:import mysql.connectorDBException = _mysql_connector.MySQLInterfaceError if c_flag else mysql.connector.errors.Errordef generate_insert(table, items: dict, database='', is_escape_string=False, con=None, charset="utf-8") -> str:"""Generate INSERT statement of SQL DML."""destination = "`{database}`.`{table}`".format(database=database, table=table) if database else "`{table}`".format(table=table)sql_l = "INSERT INTO {destination} ("sql_r = ") VALUES ("for column in items:data = items[column]if data == 0 or data:if is_escape_string:assert c_flag and condata = con.escape_string(str(data)).decode(charset)sql_l += "`{key}`, ".format(key=column)sql_r += "'{value}', ".format(value=data)sql = sql_l[0:-2] + sql_r[0:-2] + ");"return sql.format(destination=destination)def generate_where(items: dict, alias: str = '') -> str:"""Generate WHERE statement of SQL."""alias += '.' if alias else ''statement = "WHERE"for column in items:statement += " {alias}`{column}` {value} AND".format(alias=alias, column=column, value=items[column])return statement[:-4]def generate_group_by(columns, alias: str = '') -> str:"""Generate GROUP BY statement of SQL."""alias += '.' if alias else ''statement = "GROUP BY"for column in columns:statement += " {alias}`{column}`,".format(alias=alias, column=column)return statement[:-1]def get_con(use_c_api=True, charset="utf8", use_unicode=False, autocommit=False, **kwargs):"""Get MySQL connection."""if use_c_api and c_flag:con = _mysql_connector.MySQL()con.connect(**kwargs)con.set_character_set(charset)con.use_unicode(use_unicode)con.autocommit(autocommit)con.query("SET NAMES {charset};".format(charset=charset))con.query("SET CHARACTER SET {charset};".format(charset=charset))con.query("SET character_set_connection={charset};".format(charset=charset))con.commit()return conif use_c_api and not c_flag:logger.warning('Get "_mysql_connector" failed, fall back to strictly use "mysql.connector"...')return mysql.connector.connect(charset=charset, use_unicode=use_unicode, autocommit=autocommit, **kwargs)@log.log(logger=logger)
def execute_sql_quiet(con, sql, use_c_api=True, is_commit=True, is_close=True, is_exit=False,is_raise=True, is_info=True):"""Execute SQL(DDL, DML, DCL etc) in quiet mode(with no return)."""cur = con.cursor() if not use_c_api else Nonetry:if use_c_api:con.query(sql)else:cur.execute(sql)except DBException as E:con.rollback()if is_exit:sys.exit(E)if is_raise:raiseelse:if is_commit:con.commit()finally:if is_close:if not use_c_api:cur.close()con.close()if is_info:logger.info(baseutil.combine_lines_str(sql))@log.log(logger=logger)
def execute_sql_return(con, sql, use_c_api=True, dictionary=True, is_close=True, is_exit=False, is_raise=True,is_info=True, **kwargs) -> typing.Generator:"""Execute SQL(DQL) in return mode(with return)."""cur = con.cursor(dictionary=dictionary, **kwargs) if not use_c_api else Nonetry:if use_c_api:con.query(sql)else:cur.execute(sql)except DBException as E:con.rollback()if is_exit:sys.exit(E)if is_raise:raiseelse:if use_c_api:column_list = []if dictionary:columns = con.fetch_fields()for column in columns:column_list.append(column[4])row_tuple = con.fetch_row()while row_tuple:if dictionary:row_zip = zip(column_list, row_tuple)row = {}for sub in row_zip:row[sub[0]] = sub[1]baseutil.str_dict_value(row)yield baseutil.str_dict_key(row)else:yield row_tuplerow_tuple = con.fetch_row()else:for row in cur:if dictionary:baseutil.str_dict_value(row)yield baseutil.str_dict_key(row)else:yield rowfinally:if is_close:con.free_result()if not use_c_api:cur.close()con.close()if is_info:logger.info(baseutil.combine_lines_str(sql))@log.log(logger=logger)
def check_dql_existence(con, sql, use_c_api=True, is_exit=False, is_raise=True, is_close=True, is_info=True) -> bool:"""Check whether SQL(DQL) query has result to return or not."""cur = con.cursor(raw=True) if not use_c_api else Nonetry:if use_c_api:con.raw(True)con.query(sql)else:cur.execute(sql)except DBException as E:con.rollback()if is_exit:sys.exit(E)if is_raise:raiseelse:if use_c_api:return True if con.fetch_row() else Falseelse:return True if cur.fetchone() else Falsefinally:if is_close:con.free_result()if not use_c_api:cur.close()con.close()if is_info:logger.info(baseutil.combine_lines_str(sql))if __name__ == "__main__":CON = {"con": {"host": "localhost","port": 3306,"user": "root","password": "1024","database": "information_schema",},"charset": "utf8","use_unicode": True,"autocommit": False,}con_c = get_con(**CON["con"])con_p = get_con(use_c_api=False, **CON["con"])items = {"col1": "= 1","col2": "> 2",}print(generate_insert(table="tab", database="dbs", items=items))print(generate_where(items, "_tmp"))print(generate_group_by(items.keys(), "_tmp"))# print(execute_sql_quiet(con_c, "CREATE DATABASE `mdbutil`;"))# print(execute_sql_quiet(con_p, "DROP DATABASE `mdbutil`;", use_c_api=False))# sql_t = 'SELECT `TABLE_NAME`, `CREATE_TIME` FROM `information_schema`.`TABLES` LIMIT 2;'sql_f = 'SELECT `TABLE_NAME`, `CREATE_TIME` FROM `information_schema`.`TABLES` WHERE `TABLE_NAME` = "NULL" LIMIT 2;'print(check_dql_existence(con_p, sql_f, use_c_api=False))# print(check_dql_existence(con_c, sql_t))# for r in execute_sql_return(con_p, sql_t, use_c_api=False, is_info=False, dictionary=False):#     print(r)# for r in execute_sql_return(con_c, sql_t, use_c_api=True, is_info=False, dictionary=False):#     print(r)

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部