Не могу подключится к базе данных YDB
Не могу подключиться к базе данных созданных в яндекс облаке YDB. Какую-то конкретную ошибку не выдаёт. Просто никак не может подключится. Читал руководство YDB по подключению, Chat GPT использовал, в интернете искал. Ничего не помогает. Использую Python, FAST API.
async def get_ydb_client():
endpoint = os.getenv("YDB_ENDPOINT")
database = os.getenv("YDB_DATABASE")
oauth_token = os.getenv("OAUTH_TOKEN")
if not endpoint:
logger.error("Переменная окружения YDB_ENDPOINT не установлена или пуста.")
raise HTTPException(status_code=500, detail="YDB_ENDPOINT не установлена")
if not database:
logger.error("Переменная окружения YDB_DATABASE не установлена или пуста.")
raise HTTPException(status_code=500, detail="YDB_DATABASE не установлена")
if not oauth_token:
logger.error("Переменная окружения OAUTH_TOKEN не установлена или пуста.")
raise HTTPException(status_code=500, detail="OAUTH_TOKEN не установлена")
# Используем синхронный AccessTokenCredentials с асинхронным драйвером
credentials = ydb.AccessTokenCredentials(oauth_token)
driver_config = ydb.aio.Driver(endpoint=endpoint, database=database, credentials=credentials)
driver = ydb.aio.Driver(driver_config)
# Ожидаем подключения асинхронно
if not await driver.wait(timeout=20):
logger.error("Не удалось подключиться к YDB за отведенное время")
raise HTTPException(status_code=500, detail="Ошибка подключения к YDB")
logger.info("Подключение к YDB успешно установлено.")
return driver
Ответы (1 шт):
Автор решения: Nikolay Matrosov
→ Ссылка
- Если вы используете OAuth токен, то вы должны использовать другой класс
YandexPassportOAuthIamCredentials
. - У вас и
driver_config
иdriver
— это объектыydb.aio.Driver
. Это не правильно. Нужно убрать это дублирование.
Вот полный рабочий код.
import logging
import os
import ydb
from ydb.iam.auth import YandexPassportOAuthIamCredentials
from fastapi import FastAPI
from fastapi import HTTPException
app = FastAPI()
endpoint = os.getenv("YDB_ENDPOINT")
database = os.getenv("YDB_DATABASE")
oauth_token = os.getenv("OAUTH_TOKEN")
@app.get("/")
async def read_root():
driver = await get_ydb_client()
res = await driver.scheme_client.describe_path(database)
return {"name": res.name, "owner": res.owner, "type": res.type}
logger = logging.getLogger(__name__)
async def get_ydb_client():
if not endpoint:
logger.error("Переменная окружения YDB_ENDPOINT не установлена или пуста.")
raise HTTPException(status_code=500, detail="YDB_ENDPOINT не установлена")
if not database:
logger.error("Переменная окружения YDB_DATABASE не установлена или пуста.")
raise HTTPException(status_code=500, detail="YDB_DATABASE не установлена")
if not oauth_token:
logger.error("Переменная окружения OAUTH_TOKEN не установлена или пуста.")
raise HTTPException(status_code=500, detail="OAUTH_TOKEN не установлена")
# Используем синхронный AccessTokenCredentials с асинхронным драйвером
credentials = YandexPassportOAuthIamCredentials(oauth_token)
driver = ydb.aio.Driver(endpoint=endpoint, database=database, credentials=credentials)
# Ожидаем подключения асинхронно
await driver.wait(timeout=10)
logger.info("Подключение к YDB успешно установлено.")
return driver