Не могу подключится к базе данных YDB

Не могу подключиться к базе данных созданных в яндекс облаке YDB. Какую-то конкретную ошибку не выдаёт. Просто никак не может подключится. Читал руководство YDB по подключению, Chat GPT использовал, в интернете искал. Ничего не помогает. Использую Python, FAST API.

async def get_ydb_client():
endpoint = os.getenv("YDB_ENDPOINT")
database = os.getenv("YDB_DATABASE")
oauth_token = os.getenv("OAUTH_TOKEN")

if not endpoint:
    logger.error("Переменная окружения YDB_ENDPOINT не установлена или пуста.")
    raise HTTPException(status_code=500, detail="YDB_ENDPOINT не установлена")

if not database:
    logger.error("Переменная окружения YDB_DATABASE не установлена или пуста.")
    raise HTTPException(status_code=500, detail="YDB_DATABASE не установлена")

if not oauth_token:
    logger.error("Переменная окружения OAUTH_TOKEN не установлена или пуста.")
    raise HTTPException(status_code=500, detail="OAUTH_TOKEN не установлена")

# Используем синхронный AccessTokenCredentials с асинхронным драйвером
credentials = ydb.AccessTokenCredentials(oauth_token)
driver_config = ydb.aio.Driver(endpoint=endpoint, database=database, credentials=credentials)

driver = ydb.aio.Driver(driver_config)
# Ожидаем подключения асинхронно
if not await driver.wait(timeout=20):
    logger.error("Не удалось подключиться к YDB за отведенное время")
    raise HTTPException(status_code=500, detail="Ошибка подключения к YDB")

logger.info("Подключение к YDB успешно установлено.")
return driver

Ответы (1 шт):

Автор решения: Nikolay Matrosov
  1. Если вы используете OAuth токен, то вы должны использовать другой класс YandexPassportOAuthIamCredentials.
  2. У вас и driver_config и driver — это объекты ydb.aio.Driver. Это не правильно. Нужно убрать это дублирование.

Вот полный рабочий код.

import logging
import os

import ydb
from ydb.iam.auth import YandexPassportOAuthIamCredentials
from fastapi import FastAPI
from fastapi import HTTPException

app = FastAPI()
endpoint = os.getenv("YDB_ENDPOINT")
database = os.getenv("YDB_DATABASE")
oauth_token = os.getenv("OAUTH_TOKEN")

@app.get("/")
async def read_root():
    driver = await get_ydb_client()
    res = await driver.scheme_client.describe_path(database)
    return {"name": res.name, "owner": res.owner, "type": res.type}


logger = logging.getLogger(__name__)


async def get_ydb_client():
    if not endpoint:
        logger.error("Переменная окружения YDB_ENDPOINT не установлена или пуста.")
        raise HTTPException(status_code=500, detail="YDB_ENDPOINT не установлена")

    if not database:
        logger.error("Переменная окружения YDB_DATABASE не установлена или пуста.")
        raise HTTPException(status_code=500, detail="YDB_DATABASE не установлена")

    if not oauth_token:
        logger.error("Переменная окружения OAUTH_TOKEN не установлена или пуста.")
        raise HTTPException(status_code=500, detail="OAUTH_TOKEN не установлена")

    # Используем синхронный AccessTokenCredentials с асинхронным драйвером
    credentials = YandexPassportOAuthIamCredentials(oauth_token)
    driver = ydb.aio.Driver(endpoint=endpoint, database=database, credentials=credentials)

    # Ожидаем подключения асинхронно
    await driver.wait(timeout=10)

    logger.info("Подключение к YDB успешно установлено.")
    return driver
→ Ссылка