云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

python连接mysql工具类(线程池版)

jxf315 2025-02-04 14:32:28 教程文章 29 ℃

此 demo 是结合线程池链接数据库操作,线程池不是越多越好,有时候多了反而降低性能

一般情况下 推荐线程池设置如下:
    核心线程数
        - CPU密集型:CPU核心数 * 2
        - I/O密集型:CUP核心数的 1.5 到 2 倍
    最大链接
        -根据实际情况调整,最好是读取内存配置,便于实时调整

1.首先安装模块

pip install pymysql
pip install DBUtils

2.创建工具类包utils,再创建MySQLUtil工具类

import pymysql
from dbutils.pooled_db import PooledDB

# 安装模块: pip install DBUtils

"""
    注意线程不是越多越好,有时线程太多反而影响性能,根据节点情况设置
    一般情况下 推荐线程池设置如下:
        核心线程数
            - CPU密集型:CPU核心数 * 2
            - I/O密集型:CUP核心数的 1.5 到 2 倍
        最大链接
            -根据实际情况调整,最好是读取内存配置,便于实时调整
"""
## MySQL工具类(线程池版本)
class MySQLPoolUtil:
    def __init__(self, host="localhost", port=3306, user=None, passwd=None, db=None, mincached=0, maxconnections=50,
                 charset="utf8"):
        self.__pool = PooledDB(
            host=host,
            port=port,
            user=user,
            password=passwd,
            database=db,
            # 使用链接数据库的模块
            creator=pymysql,
            # 初始化时,链接池中至少创建的链接,0表示不创建
            mincached=mincached,
            # 连接池允许的最大连接数,0和None表示不限制连接数
            maxconnections=maxconnections,
            # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            blocking=True,
            charset=charset,
        )
        self.__conn = self.__pool.connection()
        # 默认查询的结果以元祖返回
        # self.__cursor = self.__conn.cursor()
        # cursor=pymysql.cursors.DictCursor 将查询结果以字典的形式返回
        self.__cursor = self.__conn.cursor(cursor=pymysql.cursors.DictCursor)

    def __del__(self):
        self.__cursor.close()
        self.__conn.close()

        # 获取连接

    def get_conn(self):
        return self.__conn

    # 获取游标
    def get_cursor(self, cursor=None):
        return self.__conn.cursor(cursor)

    # 获取MySQL版本
    def get_version(self, args=None):
        self.__cursor.execute("SELECT VERSION()", args)
        version = self.__cursor.fetchone()
        print("MySQL Version : %s" % version)
        return version

    # 查询所有数据库
    def list_databases(self, args=None):
        self.__cursor.execute("SHOW DATABASES", args)
        dbs = []
        for db in self.__cursor.fetchall():
            dbs.append(list(db.values())[0])
        return dbs

    # 查询所有表
    def list_tables(self, args=None):
        self.__cursor.execute("SHOW TABLES", args)
        tables = []
        for item in self.__cursor.fetchall():
            tables.append(list(item.values())[0])
        return tables

    # 查询所有表的元数据信息
    def list_table_metadata(self, args=None):
        sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    # 查询表字段的元数据信息
    def table_metadata(self, db, table, args=None):
        db = "'" + db + "'"
        table = "'" + table + "'"
        sql = """SELECT column_name,column_type,ordinal_position,column_comment,column_default
            FROM information_schema.COLUMNS
            WHERE table_schema = %s AND table_name = %s;
            """ % (db, table)
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    # 获取表字段信息
    def get_table_fields(self, db, table, args=None):
        sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' + db + '" AND table_name="' + table + '"'
        self.__cursor.execute(sql, args)
        fields = []
        for field in self.__cursor.fetchall():
            fields.append(list(field.values())[0])
        return fields

    # 获取SQL执行结果
    def execute(self, sql, args=None):
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    # 查询单条数据
    def select_one(self, sql, *args):
        self.__cursor.execute(sql, args)
        result = self.__cursor.fetchone()
        return result

    # 查询多条数据 - 带参数
    def select_all(self, sql, args=None):
        self.__cursor.execute(sql, args)
        result = self.__cursor.fetchall()
        return result

    # 查询多条数据 - 无参
    def select(self, sql):
        self.__cursor.execute(sql)
        result = self.__cursor.fetchall()
        return result

    # 查询多条数据
    def select_table(self, tablename, database, orderby=""):
        self.__cursor.execute(
            "SELECT COLUMN_NAME FROM information_schema.columns WHERE table_schema = '" + database + "' AND table_name = '" + tablename + "' order by ORDINAL_POSITION")
        columns_result = self.__cursor.fetchall()
        columns = []
        for crt in columns_result:
            columns.append(crt["COLUMN_NAME"])

        self.__cursor.execute("SELECT * from " + tablename + " " + orderby)
        result = self.__cursor.fetchall()
        data = []
        # 输出查询结果
        for row in result:
            record = {}
            for i, column in enumerate(columns):
                record[column] = row[column]
            data.append(record)
        return data

    # 查询多条数据
    def select_columns(self, sql, columns):
        self.__cursor.execute(sql)
        result = self.__cursor.fetchall()
        data = []
        # 输出查询结果
        for row in result:
            record = {}
            for i, column in enumerate(columns):
                record[column] = row[i]
            data.append(record)
        return data

    # 插入单条数据
    def insert_one(self, sql, args):
        self.execute(sql, args)

    # 插入多条批量插入
    def insert_all(self, sql, datas):
        try:
            self.__cursor.executemany(sql, datas)
            self.__conn.commit()
            return {'result': True, 'id': int(self.__cursor.lastrowid)}
        except Exception as err:
            self.__conn.rollback()
            return {'result': False, 'err': err}

    # 更新数据
    def update_one(self, sql, args):
        self.execute(sql, args, isCommit=True)

    # 删除数据
    def delete_one(self, sql, *args):
        self.execute(sql, args, isCommit=True)

    # 执行:param isCommit是否需要提交
    def execute(self, sql, args=None, isCommit=True):
        if isCommit:
            try:
                self.__cursor.execute(sql, args)
                self.__conn.commit()
            except Exception as e:
                print(f"sql:{sql} ,params:{args} 执行异常, error message: {e}")
                self.__conn.rollback()
        else:
            self.__cursor.execute(sql, args)
            self.__conn.commit()


3.测试工具类

from utils.MySQLPoolUtil import MySQLPoolUtil

mySqlPoolUtil = MySQLPoolUtil(host="127.0.0.1", port=3306, user="root", passwd="123456", db="rick-db")
## 查询 mysql 版本
# mySqlPoolUtil.get_version()

## 查询所有的数据库
# dbs = mySqlPoolUtil.list_databases()
# print(dbs)

## 查询当前库下所有的表
# table_list = mySqlPoolUtil.list_tables()
# print('table_list:', table_list)

## 获取表元数据信息
# all_metadatas = mySqlPoolUtil.list_table_metadata("sys_user")
# print(table_metadata)

## 获取指定库指定表元数据信息
# table_metadatas = mySqlPoolUtil.table_metadata("rick-db", "sys_user")
# print(table_metadatas)

## 获取指定表字段信息
# result = mySqlPoolUtil.get_table_fields("rick-db", "sys_user")
# for filed in result:
#     print(filed)

## 查询数据
# sql = "select * from sys_user"
# result = mySqlPoolUtil.select_all(sql)
# for item in result:
#     print(item)

# 插入操作
# sql = "insert into sys_user(user_name,nick_name,pass_word) values (%s, %s, %s)"
# data = ['rick', 'rick', '123456']
# mySqlPoolUtil.insert_one(sql, data)

# 更新操作
sql = "update sys_user set nick_name=%s where nick_name=%s"
data = ['tom~~', 'rick']
mySqlPoolUtil.update_one(sql, data)
最近发表
标签列表