aiomysql是一个从asyncio(PEP-3156/tulip)框架访问MySQL数据库的库。它依赖并重用PyMySQL的大部分部分。aiomysql试图成为一个很棒的aiopg库,并保留相同的api、外观和感觉。
在内部aimysql是PyMySQL的副本,底层io调用切换到async,基本上是等待并在适当的位置添加async def coroutine。从aiopg移植的sqlalchemy支持。
安装模块
pip3 install aiomysql
简单示例
import asyncio
import aiomysql
loop = asyncio.get_event_loop()
async def test_example():
conn = await aiomysql.connect(host='127.0.0.1', port=3306,
user='root', password='', db='mysql',
loop=loop)
cur = await conn.cursor()
await cur.execute("SELECT Host,User FROM user")
print(cur.description)
r = await cur.fetchall()
print(r)
await cur.close()
conn.close()
loop.run_until_complete(test_example())
环境说明
操作系统:centos 7.6
mysql版本:5.7
数据库名:test
数据库默认编码:utf8mb4
具体表结构以及数据,请参考链接:
https://www.cnblogs.com/xiao987334176/p/12721498.html
这里面有2个表
单次执行
执行select和update
#!/usr/bin/env python3
# coding: utf-8
"""
mysql 异步版本
"""
import traceback
import logging
import aiomysql
import asyncio
import time
logobj = logging.getLogger('mysql')
class Pmysql:
def __init__(self):
self.coon = None
self.pool = None
async def initpool(self):
try:
logobj.debug("will connect mysql~")
__pool = await aiomysql.create_pool(
minsize=5, # 连接池最小值
maxsize=10, # 连接池最大值
host='192.168.31.230',
port=3306,
user='root',
password='abcd1234',
db='test',
autocommit=True, # 自动提交模式
)
return __pool
except:
logobj.error('connect error.', exc_info=True)
async def getCurosr(self):
conn = await self.pool.acquire()
# 返回字典格式
cur = await conn.cursor(aiomysql.DictCursor)
return conn, cur
async def query(self, query, param=None):
"""
查询操作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
return await cur.fetchall()
except:
logobj.error(traceback.format_exc())
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def execute(self, query, param=None):
"""
增删改 操作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
if cur.rowcount == 0:
return False
else:
return True
except:
logobj.error(traceback.format_exc())
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def getAmysqlobj():
mysqlobj = Pmysql()
pool = await mysqlobj.initpool()
mysqlobj.pool = pool
return mysqlobj
async def test_select():
mysqlobj = await getAmysqlobj()
# UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
exeRtn = await mysqlobj.query("select * from users")
# print("查询结果",exeRtn)
return exeRtn
async def test_update():
mysqlobj = await getAmysqlobj()
# UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'")
# print("exeRtn", exeRtn, type(exeRtn))
if exeRtn:
# print('操作成功')
return '操作成功'
else:
# print('操作失败')
return '操作失败'
async def main(): # 调用方
tasks = [test_select(), test_update()] # 把所有任务添加到task中
done, pending = await asyncio.wait(tasks) # 子生成器
for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result()
# print('协程无序返回值:'+r.result())
print(r.result())
if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
try:
loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
finally:
loop.close() # 结束事件循环
print('所有IO任务总耗时%.5f秒' % float(time.time() - start))
执行输出:
操作成功
[{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '123@qq.com', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}]
所有IO任务总耗时0.03948秒
批量插入
批量插入使用executemany
插入3万条数据
#!/usr/bin/env python3
# coding: utf-8
import time
import asyncio
import aiomysql
start = time.time()
loop = asyncio.get_event_loop()
async def test_example():
conn = await aiomysql.connect(host='192.168.31.230', port=3306,
user='root', password='abcd1234',
db='test', loop=loop)
# create default cursor
cursor = await conn.cursor()
# execute sql query
data = []
for i in range(1,30000):
data.append(('xiao%s'%i, '123', '12345678910', '123@qq.com', '2020-04-10 01:22:07'),)
stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);"
await cursor.executemany(stmt, data)
await conn.commit()
# detach cursor from connection
await cursor.close()
# close connection
conn.close()
loop.run_until_complete(test_example())
print('所有IO任务总耗时%.5f秒' % float(time.time() - start))
执行输出:
所有IO任务总耗时11.96885秒
本文参考链接:
https://www.cnblogs.com/ygy1997/p/11753335.html