python关联mysql数据库

1、创建utils包

(1)在utils中创建links.py脚本

import pymysql.cursors
from sqlalchemy import create_engine# 清洗前数据库连接urlBEFORE_MYSQL_URL = "mysql+pymysql://root:123456@localhost:3306/before_test?charset=utf8mb4"
mysql_con_before = create_engine(BEFORE_MYSQL_URL)# 清洗后数据库连接urlAFTER_MYSQL_URL = "mysql+pymysql://root:123456@localhost:3306/after_test?charset=utf8mb4"
mysql_con_after = create_engine(AFTER_MYSQL_URL)# 清洗前数据库连接字典BEFORE_LINK_DICT = {'host': 'localhost','port': 3306,  # MySQL默认端口'user': 'root',  # mysql默认用户名'password': '123456','db': 'before_test',  # 数据库'charset': 'utf8mb4','cursorclass': pymysql.cursors.DictCursor
}# 清洗后数据库连接字典
AFTER_LINK_DICT = {'host': 'localhost','port': 3306,  # MySQL默认端口'user': 'root',  # mysql默认用户名'password': '123456','db': 'after_test',  # 数据库'charset': 'utf8mb4','cursorclass': pymysql.cursors.DictCursor
}

(2)在utils中创建sqltools.py脚本

import pymysql.cursors
import pymysql
import pandas as pd# 连接mysql数据库
from sqlalchemy.orm import sessionmakerfrom utils.links import BEFORE_LINK_DICT, AFTER_LINK_DICT, mysql_con_before, mysql_con_afterdef pd_query(sql, link_type='before'):"""创建连接:param sql: sql语句:param link_type: 连接类型(before-清洗前,after-清洗后):return: df数据"""if link_type == 'after':con = pymysql.connect(**AFTER_LINK_DICT)else:con = pymysql.connect(**BEFORE_LINK_DICT)try:with con.cursor() as cursor:cursor.execute(sql)result = cursor.fetchall()finally:con.close()return pd.DataFrame(result)def execute_mysql(sql, link_type='before'):"""创建插入sql语句(update、drop、alter....):param sql: sql语句:param link_type: 连接类型(before-清洗前,after-清洗后):return: df数据"""if link_type == 'after':engine = mysql_con_beforeelse:engine = mysql_con_aftersession_factory = sessionmaker(bind=engine)session = session_factory()results = session.execute(sql)return results

得到util结构如下:

2、测试

创建test.py文件

from utils.sqltools import pd_query, execute_mysql
from utils.links import mysql_con_after# 获取清洗后test_after.con数据表的数据
sql = """select * from con limit 10"""
data = pd_query(sql, 'before')
# 将data插入test_after.sss数据表中,append-添加,replace-替换
data.to_sql('sss', con=mysql_con_after, if_exists='append', index=False)
# 为test_after.sss数据表创建索引
execute_mysql("ALTER TABLE 表名 ADD INDEX 索引名(`需要添加索引列`)", link_type='before')


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部