网站首页 > 教程文章 正文
此 demo 是结合线程池链接数据库操作,线程池不是越多越好,有时候多了反而降低性能
一般情况下 推荐线程池设置如下:
核心线程数
- CPU密集型:CPU核心数 * 2
- I/O密集型:CUP核心数的 1.5 到 2 倍
最大链接
-根据实际情况调整,最好是读取内存配置,便于实时调整
1.首先安装模块
pip install pymysql
pip install DBUtils
2.创建工具类包utils,再创建MySQLUtil工具类
import pymysql
from dbutils.pooled_db import PooledDB
# 安装模块: pip install DBUtils
"""
注意线程不是越多越好,有时线程太多反而影响性能,根据节点情况设置
一般情况下 推荐线程池设置如下:
核心线程数
- CPU密集型:CPU核心数 * 2
- I/O密集型:CUP核心数的 1.5 到 2 倍
最大链接
-根据实际情况调整,最好是读取内存配置,便于实时调整
"""
## MySQL工具类(线程池版本)
class MySQLPoolUtil:
def __init__(self, host="localhost", port=3306, user=None, passwd=None, db=None, mincached=0, maxconnections=50,
charset="utf8"):
self.__pool = PooledDB(
host=host,
port=port,
user=user,
password=passwd,
database=db,
# 使用链接数据库的模块
creator=pymysql,
# 初始化时,链接池中至少创建的链接,0表示不创建
mincached=mincached,
# 连接池允许的最大连接数,0和None表示不限制连接数
maxconnections=maxconnections,
# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
blocking=True,
charset=charset,
)
self.__conn = self.__pool.connection()
# 默认查询的结果以元祖返回
# self.__cursor = self.__conn.cursor()
# cursor=pymysql.cursors.DictCursor 将查询结果以字典的形式返回
self.__cursor = self.__conn.cursor(cursor=pymysql.cursors.DictCursor)
def __del__(self):
self.__cursor.close()
self.__conn.close()
# 获取连接
def get_conn(self):
return self.__conn
# 获取游标
def get_cursor(self, cursor=None):
return self.__conn.cursor(cursor)
# 获取MySQL版本
def get_version(self, args=None):
self.__cursor.execute("SELECT VERSION()", args)
version = self.__cursor.fetchone()
print("MySQL Version : %s" % version)
return version
# 查询所有数据库
def list_databases(self, args=None):
self.__cursor.execute("SHOW DATABASES", args)
dbs = []
for db in self.__cursor.fetchall():
dbs.append(list(db.values())[0])
return dbs
# 查询所有表
def list_tables(self, args=None):
self.__cursor.execute("SHOW TABLES", args)
tables = []
for item in self.__cursor.fetchall():
tables.append(list(item.values())[0])
return tables
# 查询所有表的元数据信息
def list_table_metadata(self, args=None):
sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
# 查询表字段的元数据信息
def table_metadata(self, db, table, args=None):
db = "'" + db + "'"
table = "'" + table + "'"
sql = """SELECT column_name,column_type,ordinal_position,column_comment,column_default
FROM information_schema.COLUMNS
WHERE table_schema = %s AND table_name = %s;
""" % (db, table)
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
# 获取表字段信息
def get_table_fields(self, db, table, args=None):
sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' + db + '" AND table_name="' + table + '"'
self.__cursor.execute(sql, args)
fields = []
for field in self.__cursor.fetchall():
fields.append(list(field.values())[0])
return fields
# 获取SQL执行结果
def execute(self, sql, args=None):
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
# 查询单条数据
def select_one(self, sql, *args):
self.__cursor.execute(sql, args)
result = self.__cursor.fetchone()
return result
# 查询多条数据 - 带参数
def select_all(self, sql, args=None):
self.__cursor.execute(sql, args)
result = self.__cursor.fetchall()
return result
# 查询多条数据 - 无参
def select(self, sql):
self.__cursor.execute(sql)
result = self.__cursor.fetchall()
return result
# 查询多条数据
def select_table(self, tablename, database, orderby=""):
self.__cursor.execute(
"SELECT COLUMN_NAME FROM information_schema.columns WHERE table_schema = '" + database + "' AND table_name = '" + tablename + "' order by ORDINAL_POSITION")
columns_result = self.__cursor.fetchall()
columns = []
for crt in columns_result:
columns.append(crt["COLUMN_NAME"])
self.__cursor.execute("SELECT * from " + tablename + " " + orderby)
result = self.__cursor.fetchall()
data = []
# 输出查询结果
for row in result:
record = {}
for i, column in enumerate(columns):
record[column] = row[column]
data.append(record)
return data
# 查询多条数据
def select_columns(self, sql, columns):
self.__cursor.execute(sql)
result = self.__cursor.fetchall()
data = []
# 输出查询结果
for row in result:
record = {}
for i, column in enumerate(columns):
record[column] = row[i]
data.append(record)
return data
# 插入单条数据
def insert_one(self, sql, args):
self.execute(sql, args)
# 插入多条批量插入
def insert_all(self, sql, datas):
try:
self.__cursor.executemany(sql, datas)
self.__conn.commit()
return {'result': True, 'id': int(self.__cursor.lastrowid)}
except Exception as err:
self.__conn.rollback()
return {'result': False, 'err': err}
# 更新数据
def update_one(self, sql, args):
self.execute(sql, args, isCommit=True)
# 删除数据
def delete_one(self, sql, *args):
self.execute(sql, args, isCommit=True)
# 执行:param isCommit是否需要提交
def execute(self, sql, args=None, isCommit=True):
if isCommit:
try:
self.__cursor.execute(sql, args)
self.__conn.commit()
except Exception as e:
print(f"sql:{sql} ,params:{args} 执行异常, error message: {e}")
self.__conn.rollback()
else:
self.__cursor.execute(sql, args)
self.__conn.commit()
3.测试工具类
from utils.MySQLPoolUtil import MySQLPoolUtil
mySqlPoolUtil = MySQLPoolUtil(host="127.0.0.1", port=3306, user="root", passwd="123456", db="rick-db")
## 查询 mysql 版本
# mySqlPoolUtil.get_version()
## 查询所有的数据库
# dbs = mySqlPoolUtil.list_databases()
# print(dbs)
## 查询当前库下所有的表
# table_list = mySqlPoolUtil.list_tables()
# print('table_list:', table_list)
## 获取表元数据信息
# all_metadatas = mySqlPoolUtil.list_table_metadata("sys_user")
# print(table_metadata)
## 获取指定库指定表元数据信息
# table_metadatas = mySqlPoolUtil.table_metadata("rick-db", "sys_user")
# print(table_metadatas)
## 获取指定表字段信息
# result = mySqlPoolUtil.get_table_fields("rick-db", "sys_user")
# for filed in result:
# print(filed)
## 查询数据
# sql = "select * from sys_user"
# result = mySqlPoolUtil.select_all(sql)
# for item in result:
# print(item)
# 插入操作
# sql = "insert into sys_user(user_name,nick_name,pass_word) values (%s, %s, %s)"
# data = ['rick', 'rick', '123456']
# mySqlPoolUtil.insert_one(sql, data)
# 更新操作
sql = "update sys_user set nick_name=%s where nick_name=%s"
data = ['tom~~', 'rick']
mySqlPoolUtil.update_one(sql, data)
猜你喜欢
- 2025-02-04 肇庆创汇取得内开内倒 90 度隐形铰链上连接件专利,让窗户关闭更贴合
- 2025-02-04 中建五局取得空间立体悬吊廊道专利,提高空中廊道安全性
- 2025-02-04 成都弗格森液压机电取得一种锁紧结构专利,增加连接强度且小尺寸实现受力最大
- 2025-02-04 构建中东最大边缘云网络,白山云助力全球企业布局中东、持续增长
- 2025-02-04 深观察丨“连美国都没保证把军费增加到这个水平”
- 2025-02-04 文耀中华|《最大法宝》(文耀中华|第一集《高擎旗帜》)
- 2025-02-04 天坛韶乐连古今(天坛神乐署中和韶乐)
- 2025-02-04 连清川、刘勃对谈:历史上的曹操是怎样成为奸雄的?|活动回顾
- 2025-02-04 新春走基层丨海拔3000米的30多年接续守护
- 2025-02-04 全球最大票务系统背后的“最强大脑”
- 05-11阿里开源MySQL中间件Canal快速入门
- 05-11MyBatis插件开发实战:手写一个分页插件
- 05-11Flask数据库——SQLAlchemy
- 05-11MySQL 到 Hazelcast Cloud 实时数据同步实操分享
- 05-11sqlmap 详解
- 05-11一篇文章让你学会Elasticsearch中的查询
- 05-11Mysql性能优化这5点你知道吗?简单却容易被初学者忽略!
- 05-11Spring Boot 实现 MySQL 读写分离技术
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- md5 sha1 (32)
- mybatis plus page (35)
- semaphore 使用详解 (32)
- update from 语句 (32)
- vue @scroll (38)
- 堆栈区别 (33)
- 在线子域名爆破 (32)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)