Асинхронный подход к бд python работает хуже, чем синхронный
Следующий вопрос по поводу асинхронной работы API: Есть функция API, которая отдает все устройства по конкретному пользователю:
api_methods.py
async def unpacking_plc(data):
json_answer = {}
for i in data:
try:
id_device = i[0]
json_answer[id_device] = {'id_device': id_device, 'number_control_cabinet': i[1],
'center_lat': i[2],
'center_lon': i[3], 'angle': i[4], 'IMEI': i[5], 'status_device': i[6],
'outflow_mm': i[7], 'control_status_panel': i[8], 'outflow_precent': i[9],
'power': i[10], 'press': i[11], 'temp': i[12], 'online': i[13],
'analog_inputs': i[14],
'job_mod': i[15], 'outflow_hour': i[16], 'uiMovedCmd': i[17],
'xOnWaterSupply': i[18],
'udiMinOutflowAtMaxSpeed': i[19], 'uiTimeOneCircle': i[20],
'radius': i[21],
'type_device': i[22], 'start_angle': i[23], 'end_angle': i[24],
'power_error_galileosky': i[25], 'control_cabinet_id': i[26],
'IndicateLeft': i[27],
'IndicateRight': i[28], 'IndicateStop': i[29], 'stop_angle_status': i[30],
'stop_angle_value': i[31], 'protocol_type': i[32], 'plc_id': i[33],
'last_update_data': i[34], 'time_to_stop': i[35],
'sector_angle': i[36],
'start_angle_fill': i[37], 'irrigation_start_angle': i[38],
'start_gen_st': i[39],
'status_water_gun1': i[40], 'data_water_gun1': i[41],
'status_water_gun2': i[42],
'data_water_gun2': i[43], 'lite_version': i[44], 'stop_plc': i[45],
'him': i[46],
'press_2': i[47], 'consumption': i[48], 'target_pressure': i[49],
'start_program_barrier': i[50], 'end_program_barrier': i[51],
'enable_barrier_start': i[52], 'last_update': i[53],
'position_list': i[54]}
except Exception as e:
print(f'{e} \n{traceback.format_exc()}')
return json_answer
async def get_plc(request):
try:
start_time = datetime.datetime.now().timestamp()
response_obj = {}
device_list = []
try:
id_user = request.rel_url.query['id_user']
except KeyError:
response_obj = {'status': 'failed', 'reason': 'Invalid query params'}
return web.Response(text=json.dumps(response_obj), status=400)
try:
# Запрос в базу данных
sql_data = await select_device(int(id_user))
except ValueError:
response_obj = {'status': 'failed', 'reason': 'Invalid variable data type'}
return web.Response(text=json.dumps(response_obj), status=400)
# Распаковка в json
json_data = await unpacking_plc(sql_data)
for device in json_data:
start = 0
message_status = ""
uiMoveCmd = 0
xOnWaterSupply = 0
status_protect = 0
protect_message = ""
uiMoveCmd_protect = 0
xOnWaterSupply_protect = 0
analog_status = {'protect': False, 'cabel': False,
'periphery': False, 'right': False,
'left': False, 'water': False}
# Состояния онлайн
last_update = json_data[device]['last_update']
time_m = datetime.datetime.now().timestamp() - last_update.timestamp()
if time_m >= 320:
status_ping = 0
else:
status_ping = 1
if json_data[device]['job_mod'] == 'control_only':
uiMoveCmd = json_data[device]['uiMovedCmd']
xOnWaterSupply = json_data[device]['xOnWaterSupply']
status_protect = 'false'
outflow_precent = json_data[device]['outflow_precent'] / 100
IndicateLeft = json_data[device]['IndicateLeft']
IndicateRight = json_data[device]['IndicateRight']
IndicateStop = json_data[device]['IndicateStop']
status_plc_mask = json_data[device]['status_device']
stop_angle_status = json_data[device]['stop_angle_status']
stop_angle_value = json_data[device]['stop_angle_value']
last_update_data = json_data[device]['last_update_data'].strftime("%d.%m.%Y %H:%M:%S")
time_to_stop = json_data[device]['time_to_stop']
sector_fill = json_data[device]['sector_angle']
start_angle_fill = json_data[device]['start_angle_fill']
irrigation_start_angle = json_data[device]['irrigation_start_angle']
start_gen_st = json_data[device]['start_gen_st']
status_water_gun1 = json_data[device]['status_water_gun1']
data_water_gun1 = json_data[device]['data_water_gun1']
status_water_gun2 = json_data[device]['status_water_gun2']
data_water_gun2 = json_data[device]['data_water_gun2']
stat_2 = status_plc_mask.split(',')
protocol_type = json_data[device]['protocol_type']
status_device = await decode_mask_status(stat_2)
if json_data[device]['job_mod'] == 'protect_only':
# Первая версия обработки охранного прибора
if protocol_type == '1':
analog_data = json_data[device]['analog_inputs'].split(',')
analog_status = {'protect': str_to_bool(analog_data[2]), 'cabel': str_to_bool(analog_data[1]),
'periphery': str_to_bool(analog_data[0]), 'right': str_to_bool(analog_data[3]),
'left': str_to_bool(analog_data[4]), 'water': str_to_bool(analog_data[5])}
if analog_status['protect']:
protect_message = 'Поставлен на охрану'
if analog_status['protect'] and analog_status['cabel'] is True:
protect_message = 'Кража кабеля'
if analog_status['protect'] and analog_status['periphery'] is True:
protect_message = 'Кража периферии'
if analog_status['protect'] and analog_status['cabel'] is True and analog_status[
'periphery'] is True: \
protect_message = 'Кража кабеля и периферии'
if analog_status['protect'] is False:
protect_message = 'Снят с охраны'
# Вторая версия обработки охранного прибора
if protocol_type == '2':
analog_data = json_data[device]['analog_inputs'].split(',')
analog_status = {'protect': str_to_bool(analog_data[0]), 'cabel': str_to_bool(analog_data[1]),
'periphery': str_to_bool(analog_data[2]),
'status_move': str_to_bool(analog_data[3]),
'water': str_to_bool(analog_data[4])}
if analog_status['protect']:
protect_message = 'Поставлен на охрану'
if analog_status['protect'] and analog_status['cabel'] is True:
protect_message = 'Кража кабеля'
if analog_status['protect'] and analog_status['periphery'] is True:
protect_message = 'Кража периферии'
if analog_status['protect'] and analog_status['cabel'] is True \
and analog_status['periphery'] is True:
protect_message = 'Кража кабеля и периферии'
if analog_status['protect'] is False:
protect_message = 'Снят с охраны'
if json_data[device]['job_mod'] == 'protect_only':
if protocol_type == '1':
if analog_status['right']:
start = 1
if analog_status['left']:
start = 2
if analog_status['right'] is False and analog_status['left'] is False:
start = 0
if analog_status['water']:
water = 1
else:
water = 0
if start == 0 and water == 0:
protect = "Остановлен"
uiMoveCmd_protect = 0
xOnWaterSupply_protect = 0
if start == 0 and water == 1:
protect = "Остановлен"
uiMoveCmd_protect = 0
xOnWaterSupply_protect = 1
if start == 1 and water == 0:
protect = "Движение вперед без воды"
uiMoveCmd_protect = 1
xOnWaterSupply_protect = 0
if start == 2 and water == 0:
protect = "Движение назад без воды"
uiMoveCmd_protect = 2
xOnWaterSupply_protect = 0
if start == 2 and water == 1:
protect = "Движение назад с водой"
uiMoveCmd_protect = 2
xOnWaterSupply_protect = 1
if start == 1 and water == 1:
protect = "Движение вперед с водой"
uiMoveCmd_protect = 1
xOnWaterSupply_protect = 1
if protocol_type == '2':
if analog_status['status_move']:
start = 1
if analog_status['status_move'] is False:
start = 0
if analog_status['protect']:
start = 0
if analog_status['water']:
water = 1
else:
water = 0
if start == 0 and water == 0:
protect = "Остановлен"
uiMoveCmd_protect = 0
xOnWaterSupply_protect = 0
if start == 0 and water == 1:
protect = "Остановлен"
uiMoveCmd_protect = 0
xOnWaterSupply_protect = 1
if start == 1 and water == 0:
protect = "Движение без воды"
uiMoveCmd_protect = 1
xOnWaterSupply_protect = 0
if start == 1 and water == 1:
protect = "Движение с водой"
uiMoveCmd_protect = 1
xOnWaterSupply_protect = 1
if json_data[device]['job_mod'] == 'control_only':
direction = False
irrigation = False
position = json_data[device]['angle']
rate = json_data[device]['outflow_precent'] / 100
date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if status_device['start_right']:
start = 1
if status_device['start_left']:
start = 2
if status_device['stop']:
start = 0
if status_device['water']:
water = 1
else:
water = 0
if status_device['crash']:
crash = 1
else:
crash = 0
if status_device['him']:
him = 1
else:
him = 0
if start == 0 and water == 0:
message_status = "Остановлен"
if start == 1 and water == 0:
message_status = "Движение вперед без воды"
if start == 2 and water == 0:
message_status = "Движение назад без воды"
if start == 2 and water == 1:
message_status = "Движение назад с водой"
if start == 1 and water == 1:
message_status = "Движение вперед с водой"
if start == 1 and water == 1 and him == 1:
message_status = "Движение вперед с фертигацией"
if start == 2 and water == 1 and him == 1:
message_status = "Движение назад с фертигацией"
if crash == 1:
message_status = "Остановлено, авария"
if status_device['power_error']:
message_status = 'Отсутствует питание.'
if message_status == 'Движение вперед без воды':
irrigation = 0
direction = True
if message_status == 'Движение назад без воды':
irrigation = 0
direction = False
if message_status == 'Остановлен':
irrigation = 0
direction = False
if message_status == 'Движение назад с водой':
irrigation = 1
direction = False
if message_status == 'Движение вперед с водой':
irrigation = 1
direction = True
# db_con.write_develop(position, rate, irrigation, date, i[0], direction)
if json_data[device]['job_mod'] == 'protect_only':
uiMoveCmd = uiMoveCmd_protect
xOnWaterSupply = xOnWaterSupply_protect
status_protect = 'true'
lite_version = json_data[device]['lite_version']
stop_plc = json_data[device]['stop_plc']
him_status = json_data[device]['him']
press_2 = json_data[device]['press_2']
consumption = json_data[device]['consumption']
target_pressure = json_data[device]['target_pressure']
start_program_barrier = json_data[device]['start_program_barrier']
end_program_barrier = json_data[device]['end_program_barrier']
enable_barrier = json_data[device]['enable_barrier_start']
position_list = json_data[device]['position_list']
device_list.append([json_data[device]['id_device'], json_data[device]['number_control_cabinet'],
json_data[device]['angle'], message_status, "", status_ping, "",
json_data[device]['outflow_mm'],
json_data[device]['control_status_panel'], str(status_device['water']), outflow_precent,
json_data[device]['power'], json_data[device]['press'], json_data[device]['temp'],
'', 0, 0, str(json_data[device]['outflow_hour']), uiMoveCmd, xOnWaterSupply,
json_data[device]['udiMinOutflowAtMaxSpeed'], json_data[device]['uiTimeOneCircle'],
json_data[device]['center_lat'], json_data[device]['center_lon'],
json_data[device]['radius'], json_data[device]['type_device'],
json_data[device]['start_angle'], json_data[device]['end_angle'],
status_protect, analog_status, '', IndicateLeft, IndicateRight, IndicateStop,
status_plc_mask, stop_angle_status, stop_angle_value, protect_message, last_update_data,
time_to_stop, start_angle_fill, sector_fill, irrigation_start_angle, start_gen_st,
status_water_gun1, data_water_gun1, status_water_gun2, data_water_gun2, lite_version,
stop_plc, him_status, press_2, consumption, target_pressure, start_program_barrier,
end_program_barrier, enable_barrier, position_list, json_data[device]['plc_id']])
response_obj['devices'] = [device_list, 1]
response_obj['update_status'] = 'true'
time_answer = datetime.datetime.now().timestamp() - start_time
response_obj['time_operation'] = f'{time_answer} сек.'
return web.Response(text=json.dumps(response_obj), status=200)
except Exception as e:
response_obj = {'status': 'failed', 'reason': str(e)}
return web.Response(text=json.dumps(response_obj), status=500)
Функция отдающая ответ с БД(асинхронная): db.py
async def connection():
global pool
try:
if pool is None:
pool = await aiomysql.create_pool(host='',
user='',
password='',
db='WEB_PANEL',
charset='utf8mb4',
maxsize=350)
return pool
except Exception as e:
print(f'{e} {traceback.format_exc()}')
async def select_device(id_user: int):
try:
pool = await connection()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(f'SELECT web_app_plk.ip, web_app_control_cabinet.number_control_cabinet, '
f'web_app_plk.center_lat, web_app_plk.center_lon, web_app_plk.angle, '
f'web_app_plk.IMEI, '
f'web_app_plk.status_device, web_app_plk.outflow_mm, '
f'web_app_plk.control_status_panel, '
f'web_app_plk.outflow_precent, web_app_plk.power, web_app_plk.press, '
f'web_app_plk.temp, '
f'web_app_plk.online, web_app_plk.analog_inputs, web_app_plk.job_mod, '
f'web_app_plk.outflow_hour, web_app_plk.uiMovedCmd, web_app_plk.xOnWaterSupply,'
f'web_app_plk.udiMinOutflowAtMaxSpeed, web_app_plk.uiTimeOneCircle, '
f'web_app_plk.radius, '
f'web_app_plk.type_device, web_app_plk.start_angle, web_app_plk.end_angle, '
f'web_app_plk.power_error_galileosky, web_app_control_cabinet.id, '
f'web_app_plk.IndicateLeft, web_app_plk.IndicateRight, web_app_plk.IndicateStop, '
f'web_app_plk.stop_angle_status, web_app_plk.stop_angle_value, '
f'web_app_plk.protocol_type, '
f'web_app_plk.id, web_app_plk.last_update_data, web_app_plk.time_to_stop, '
f'web_app_plk.sector_angle, web_app_plk.start_angle_fill, '
f'web_app_plk.irrigation_start_angle, web_app_plk.start_gen_st, '
f'web_app_plk.status_water_gun1, web_app_plk.data_water_gun1, '
f'web_app_plk.status_water_gun2, web_app_plk.data_water_gun2, '
f'web_app_plk.lite_version, '
f'web_app_plk.stop_plc, web_app_plk.him, web_app_plk.press_2, '
f'web_app_plk.consumption, '
f'web_app_plk.target_pressure, web_app_plk.start_program_barrier, '
f'web_app_plk.end_program_barrier, web_app_plk.enable_barrier_start, '
f'web_app_plk.last_update, web_app_access.position_list FROM web_app_access '
f'INNER JOIN '
f'web_app_plk ON web_app_plk.id = web_app_access.id_PLK_id INNER '
f'JOIN web_app_control_cabinet ON web_app_control_cabinet.id_PLK_id = '
f'web_app_access.id_PLK_id WHERE id_user_id= {id_user}')
await conn.commit()
data = await cursor.fetchall()
return data
except Exception as e:
print(f'{e} {traceback.format_exc()}')
Функция отдающая ответ с БД(синхронная):
def select_device(id_user, allow_date):
con = connector()
try:
cursor = con.cursor()
if allow_date:
cursor.execute(f'SELECT web_app_plk.ip, web_app_control_cabinet.number_control_cabinet, '
f'web_app_plk.center_lat, web_app_plk.center_lon, '
f'web_app_plk.angle, web_app_plk.IMEI, '
f'web_app_plk.status_device, web_app_plk.outflow_mm, web_app_plk.control_status_panel, '
f'web_app_plk.outflow_precent, web_app_plk.power, web_app_plk.press, web_app_plk.temp, '
f'web_app_plk.online, web_app_plk.last_update, web_app_plk.analog_inputs FROM '
f'web_app_access INNER JOIN '
f'web_app_plk ON web_app_plk.id = web_app_access.id_PLK_id INNER '
f'JOIN web_app_control_cabinet ON web_app_control_cabinet.id_PLK_id = '
f'web_app_access.id_PLK_id WHERE id_user_id= {id_user}')
else:
cursor.execute(f'SELECT web_app_plk.ip, web_app_control_cabinet.number_control_cabinet, '
f'web_app_plk.center_lat, web_app_plk.center_lon, '
f'web_app_plk.angle, web_app_plk.IMEI, '
f'web_app_plk.status_device, web_app_plk.outflow_mm, web_app_plk.control_status_panel, '
f'web_app_plk.outflow_precent, web_app_plk.power, web_app_plk.press, web_app_plk.temp, '
f'web_app_plk.online, web_app_plk.analog_inputs, web_app_plk.job_mod, '
f'web_app_plk.outflow_hour, web_app_plk.uiMovedCmd, web_app_plk.xOnWaterSupply,'
f'web_app_plk.udiMinOutflowAtMaxSpeed, web_app_plk.uiTimeOneCircle, web_app_plk.radius,'
f'web_app_plk.type_device, web_app_plk.start_angle, web_app_plk.end_angle, '
f'web_app_plk.power_error_galileosky, web_app_control_cabinet.id,'
f'web_app_plk.IndicateLeft, web_app_plk.IndicateRight, web_app_plk.IndicateStop, '
f'web_app_plk.stop_angle_status, web_app_plk.stop_angle_value, web_app_plk.protocol_type, '
f'web_app_plk.id, web_app_plk.last_update_data, web_app_plk.time_to_stop, web_app_plk.sector_angle,'
f'web_app_plk.start_angle_fill, web_app_plk.irrigation_start_angle, web_app_plk.start_gen_st, '
f'web_app_plk.status_water_gun1, web_app_plk.data_water_gun1, web_app_plk.status_water_gun2, '
f'web_app_plk.data_water_gun2, web_app_plk.lite_version, web_app_plk.stop_plc, '
f'web_app_plk.him, web_app_plk.press_2, web_app_plk.consumption, web_app_plk.target_pressure, '
f'web_app_plk.start_program_barrier, web_app_plk.end_program_barrier, '
f'web_app_plk.enable_barrier_start, web_app_plk.last_update, web_app_access.position_list FROM '
f'web_app_access INNER JOIN '
f'web_app_plk ON web_app_plk.id = web_app_access.id_PLK_id INNER '
f'JOIN web_app_control_cabinet ON web_app_control_cabinet.id_PLK_id = '
f'web_app_access.id_PLK_id WHERE id_user_id= {id_user}')
data = cursor.fetchall()
return data
except Exception as e:
print(f'{e} {traceback.format_exc()}')
finally:
con.close()
Проблема в том, что синхронные запросы к базе работают лучше, чем асинхронные запросы, хотя должна быть ситуация обратная. Тестирование API проводил через Яндекс.Танк. Нагрузка линейная от 5 до 300 rps и по времени минуту. Конфигурация базы данных одинаковая при тестирование, сама база MySQL.
Графики тестирования:
Запросы к БД асинхронные:

Запросы к БД синхронные:
Ссылки на графики:
Синхронные запросы к БД: https://overload.yandex.net/568614
Асинхронные запросы к БД:https://overload.yandex.net/568630
Прощу помощи от вас!