async with lock - возможности
Не совсем разобрался с конструкций
async with lock: для python.
Мне нужно сделать так что одномоментно выполнялась только одна функция для работы с базой данных. То есть было монополное обращение к базе. Функций для работ с базой несколько.
asyncio scheduler.add_job() запускает по расписанию N функций doWork1, doWork2, doWork3 и т.д., которые в свою очередь вызывают функции для работы с базой данных (query1, query2, query3). Вопрос может ли async with lock помочь с ограничением на одномоментное исполнение только одной функции для работы с базой данных? Если да, то как? По идее для этой задачи нужно использовать очередь, но возможно и lock может помочь упростить реализацию.
Ответы (3 шт):
а в чем проблема проверить на подобном примере? видно, что сами функции выполняются параллельно, кроме их with lock блоков
import asyncio
async def t1():
print('t1 start')
# await lock.acquire()
async with lock:
print('t1 lock')
await asyncio.sleep(2)
print('t1 sleep')
# lock.release()
print('t1 release')
async def t2():
print('t2 start')
async with lock:
# await lock.acquire()
print('t2 lock')
await asyncio.sleep(2)
print('t2 sleep')
# lock.release()
print('t2 release')
lock = asyncio.Lock()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([loop.create_task(t1()), loop.create_task(t2()),]))
loop.close()
out
t1 start
t1 lock
t2 start
t1 sleep
t1 release
t2 lock
t2 sleep
t2 release
Ну видимо lock = asyncio.Lock() надо вынести в блок if __name__ == "__main__": и все получится
Спасибо vadim vaduxa Выкладываю рабочий вариант скрипта
import random
import asyncio
import aiosqlite as aiosql
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import uuid
import numpy as np
from datetime import datetime
async def dbConnect():
sqlConn = await aiosql.connect('test.db')
await sqlConn.execute("""
CREATE TABLE IF NOT EXISTS tbl(
id TEXT NOT NULL,
side TEXT NOT NULL,
price NUMERIC NOT NULL,
parent_id TEXT NULL,
buy_ids TEXT NULL
)""")
return sqlConn
async def check():
return (random.uniform(0, 1) < 0.05)
async def fetchPrice():
return random.uniform(0, 1)
async def sell():
pass
async def getOpenBuyOrders(sqlConn):
cursor = await sqlConn.execute(
"""SELECT id, price, parent_id
FROM tbl
WHERE side='buy'
AND parent_id IS NULL""")
return await cursor.fetchall()
async def saveBuyOrder(sqlConn, price):
buyId = uuid.uuid1()
await sqlConn.execute(f"INSERT INTO tbl(id, price, side) VALUES (?, ?, 'buy')", (buyId.hex, price))
await sqlConn.commit()
async def saveSellOrder(sqlConn, price, buyIDs):
sellId = uuid.uuid1()
await sqlConn.execute(f"INSERT INTO tbl (id, price, buy_ids, side) VALUES (?, ?, ?, 'sell')", (sellId.hex, price, str(buyIDs)))
buyIDString = "'"+("','".join(buyIDs*len(buyIDs)))+"'"
await sqlConn.execute(f"UPDATE tbl SET parent_id = '{sellId.hex}' WHERE id in ({buyIDString})")
await sqlConn.commit()
async def strategyBuy(sqlConn, i):
if(await check()):
currentPrice = await fetchPrice()
if not lock.locked():
async with lock:
global myState
openBuyOrders = await getOpenBuyOrders(sqlConn)
if(openBuyOrders):
avgPrice = np.mean([openBuyOrder[1]
for openBuyOrder in openBuyOrders])
if(avgPrice > (currentPrice * 1.05)):
await saveBuyOrder(sqlConn, currentPrice)
myState = myState + 1
print(
f"{datetime.now()} Bought1. Task: {i}, myState:{myState}")
else:
await saveBuyOrder(sqlConn, currentPrice)
myState = myState + 1
print(f"{datetime.now()} Bought2. Task: {i}, myState:{myState}")
return None
async def strategySell(sqlConn, i):
if(await check()):
currentPrice = await fetchPrice()
if not lock.locked():
async with lock:
openBuyOrders = await getOpenBuyOrders(sqlConn)
if(openBuyOrders):
avgPrice = np.mean([openBuyOrder[1]
for openBuyOrder in openBuyOrders])
if((avgPrice * 1.05) < currentPrice):
buyIDs = [openBuyOrder[0]
for openBuyOrder in openBuyOrders]
await saveSellOrder(sqlConn, currentPrice, buyIDs)
global myState
myState = myState + 1
print(
f"{datetime.now()} Sold. Task: {i}, myState:{myState}")
return None
async def main():
sqlConn = await dbConnect()
scheduler = AsyncIOScheduler()
for i in range(1000):
scheduler.add_job(strategyBuy, 'interval', seconds=20, args=[sqlConn, i],
start_date='2000-01-01 00:00:00', timezone='UTC')
scheduler.add_job(strategySell, 'interval', seconds=20, args=[sqlConn, i],
start_date='2000-01-01 00:00:00', timezone='UTC')
scheduler.start()
if __name__ == "__main__":
lock = asyncio.Lock()
myState = 0
loop = asyncio.new_event_loop()
task = loop.create_task(main())
loop.run_forever()