vue-element-admin+flask实现数据查询项目

本文分享一个使用vue-element-admin+flask实现的一个数据查询项目,

填写数据库连接信息和查询语句,即可展示查询到的数据。

 

前提:已下载vue-element-admin并编译成功

前端

1、添加路由

src-router-index.js添加路由

{path: '/idata',component: Layout,redirect: '/idata/index',hidden: false,children: [{path: 'index',component: () => import('@/views/idata/index'),name: 'idata',meta: { title: 'iData设置', icon: 'list', noCache: true }}]},

2、添加页面元素

在src-views下新建idata目录,目录下新建index.vue

3、添加请求

在src-api新建idata.js

import request from '@/utils/request'export function doConnect (data) {return request({url: '/api/test',method: 'post',data})
}export function exec (data) {return request({url: '/api/idata',method: 'post',data})
}

----拿到ret结果,根据结果来显示默认按钮/成功按钮。
----如何展示查询到的数据呢? 前端实际有一个table,动态读取结果字段显示。
----查询数据时,前端一直报错,显示error,控制台显示uncanght promise报错。
查了一会,才发现是后端返回的ret没有code这个字段,加上就可以了。
----如何将textraea输入框宽度增加
----弹框消息
  this.$message({
          message: '连接测试成功!',
          type: 'success'
        })
----前端方法里的response.data实际是取的返回结果data这个key的值
---- this.$refs 用来给标签或者组件添加属性。需要这个标签或者组件中有ref=
 <el-form  :inline="true"  ref="formData" :rules="rules" :model="formData"
label-width="70px">
----控制台可以调试前端代码
-----form加:rules="rules" 并且在return里写上规则,并且每个el-form-item要有prop,可以实现
校验必填 其实在el-form这个组件有写对于表单的校验
-----加了校验后报错 Cannot read properties of undefined (reading 'validate')
传参需要加引号submit('formName')。
 this.$refs[formName].validate 这一行是对的

 

后端

这里只贴主要代码,有时间再继续贴。

1、添加路由

app.py添加

 app.route('/api/idata', methods=['POST'])(idata)app.route('/api/test',methods=['POST'])(onconnect)

2、添加函数

controller下data.py

import json
from flask import request, current_app as app
from constant import SQLTYPE, DBKEYTYPE
from lib.dbpool import DBPooldef idata():return query(request.json)def onconnect():data=request.jsonconn_info = warp_db_conn_info(data)print(conn_info)db_conn = DBPool.get_conn(**conn_info)ret = {"success": False,"msg": None,'code':0}if not db_conn:ret['msg'] = '连接目标DB失败,请确认连接信息是否正确'ret['code']=-1else:ret['msg'] = '连接成功'ret['success']=Trueret['code']=0return retdef warp_db_conn_info(data):d = {}keys = ['db_host', 'db_port', 'db_name', 'db_user', 'db_passwd', 'db_charset']for key in keys:if key in data and data[key]:d[key] = data[key].strip() if isinstance(data[key], (str, bytes)) else data[key]return ddef query(data):"""根据DB和SQL信息执行远程查询并返回结果:param data:     {"db_host": "x.x.x.x","db_port": 3306,"db_name": "test","db_user": "root","db_passwd": "root","db_charset": "utf8","sql": "select * from tbl_key_info where id=:id limit 1","param": {"id": 100110},"key": "all"}:return: {"success": True,"data": [{"id": 100110, "xxx", "xxx"}],"type": "SELECT","msg": ""}"""app.logger.info(f'query data {json.dumps(data)}')ret = {"success": False,"data": [],"type": None,"msg": None,"code":0}if not data:return retconn_info = warp_db_conn_info(data)# print(conn_info)db_conn = DBPool.get_conn(**conn_info)if not db_conn:ret['msg'] = '连接目标DB失败,请确认连接信息是否正确'return retsql = data.get('sql')if not sql:ret['msg'] = '查询语句不能为空'return ret# app.logger.info(f'查询语句: {sql}, 查询参数: {data.get("param")}')app.logger.info(f'查询语句: {sql}, 查询参数: {data.get("param")}')# rows = db_conn.query(sql, **data.get('param'))rows = db_conn.query(sql)results = []upper_sql = sql.upper().strip()if upper_sql.startswith(SQLTYPE.INSERT):sql_type = SQLTYPE.INSERTelif upper_sql.startswith(SQLTYPE.UPDATE):sql_type = SQLTYPE.UPDATEelif upper_sql.startswith(SQLTYPE.DELETE):sql_type = SQLTYPE.DELETEelif upper_sql.startswith(SQLTYPE.SELECT):sql_type = SQLTYPE.SELECTkey = data.get('key', DBKEYTYPE.FIRST)if key == 'all':results = rows.as_dict()else:  # 为了减少网络传输,默认只查询一行first = rows.first()results = [first.as_dict()] if first else []else:sql_type = SQLTYPE.UNKNOWNret.update({"success": True,"data": results,"type": sql_type,"msg": '','code':0})app.logger.info(f'查询结果: {results}')return ret

dbpool.py用来进行数据库连接

import records
from urllib import parseclass DBPool:db_pool = {}@staticmethoddef get_conn(db_host=None, db_name=None, db_port="3306", db_user='root',db_passwd='root', db_charset='utf8'):if db_host is None or db_name is None:raise ValueError("host and db_name can't be None.")key = DBPool.make_uniq_key(db_host, db_port, db_name)if key not in DBPool.db_pool:sql_str = DBPool.make_conn_str(db_host, db_name, db_port, db_user, db_passwd, db_charset)print(sql_str)DBPool.db_pool[key] = records.Database(sql_str, pool_pre_ping=True)try:db = DBPool.db_pool[key]db.query('''select 1''')except:db = Nonedel DBPool.db_pool[key]return db@staticmethoddef make_uniq_key(db_host, db_name, db_port):return f'{db_host}:{db_port}:{db_name}'@staticmethoddef make_conn_str(db_host, db_name, db_port, db_user, db_passwd, db_charset):return f'mysql+pymysql://{db_user}:{parse.quote_plus(db_passwd)}@{db_host}:{db_port}/{db_name}?charset={db_charset}'

今天晚了,改天有时间再详细讲解。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部