Почему метод users.search VK API находит меньше пользователей, чем сторонние инструменты, такие как TargetHunter?

Я пытаюсь оптимизировать поиск пользователей ВКонтакте через API с использованием метода users.search. Для повышения эффективности добавил использование метода execute. Однако, даже при этом, результативность ниже, чем у сторонних инструментов, таких как TargetHunter, которые находят значительно больше пользователей при тех же вводных данных. Я ищу пользователей по ВУЗу, году выпуска и городу университета.

Что я пробовал:

  • Использование стандартного метода users.search с фильтрацией по university, university_year и city.
  • Дополнение запросов методом execute, чтобы объединять результаты и искать пользователей обоих полов (мужчины и женщины).

Почему использование методов users.search и execute находит меньше пользователей, чем аналогичный поиск в TargetHunter? Какие настройки или подходы могут улучшить эффективность поиска?

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
from ratelimit import limits, sleep_and_retry
import os
import pandas as pd
from pathlib import Path
import requests
import asyncio
import zipfile
from typing import List, Dict, Optional

from .logger import get_logger

# Загрузка переменных окружения
load_dotenv()

# Инициализация логгера
logger = get_logger()

app = FastAPI()

# Настройка статических файлов
BASE_DIR = Path(__file__).resolve().parent
STATIC_DIR = BASE_DIR / "static"
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")

# Настройка шаблонов
templates = Jinja2Templates(directory=BASE_DIR / "templates")

# Конфигурация VK API
ACCESS_TOKEN = os.getenv('VK_ACCESS_TOKEN')
API_VERSION = '5.199'
API_URL = 'https://api.vk.com/method/'

# Папка для сохранения ID пользователей
OUTPUT_DIR = BASE_DIR / "user_ids"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Папка для загрузки файлов
UPLOAD_DIR = BASE_DIR / "uploads"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)

ONE_MINUTE = 60
MAX_CALLS_PER_MINUTE = 60  # Настройте в соответствии с требованиями VK API
city_cache: Dict[str, int] = {}

city_cache: Dict[str, int] = {}


@sleep_and_retry
@limits(calls=MAX_CALLS_PER_MINUTE, period=ONE_MINUTE)
def get_city_id(city_name: str, country_id: int = 1) -> Optional[int]:
    """
    Получение ID города по его названию.
    Кэширует результаты для уменьшения количества запросов к API.
    """
    if city_name.lower() in city_cache:
        return city_cache[city_name.lower()]

    method = 'database.getCities'
    params = {
        'access_token': ACCESS_TOKEN,
        'v': API_VERSION,
        'country_id': country_id,
        'q': city_name,
        'count': 1
    }

    try:
        response = requests.get(API_URL + method, params=params)
        response.raise_for_status()
        data = response.json()
        logger.debug(f"Получен ответ от VK API для поиска города '{city_name}': {data}")
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP Ошибка при поиске города '{city_name}': {e}")
        return None
    except ValueError:
        logger.error(f"Ошибка декодирования JSON при поиске города '{city_name}'.")
        return None

    if 'error' in data:
        logger.error(f"Ошибка API при поиске города '{city_name}': {data['error']['error_msg']}")
        return None

    if data['response']['count'] > 0:
        city_id = data['response']['items'][0]['id']
        city_cache[city_name.lower()] = city_id
        logger.info(f"Найден город '{city_name}' с ID: {city_id}")
        return city_id
    else:
        logger.warning(f"Город '{city_name}' не найден.")
        return None


@sleep_and_retry
@limits(calls=MAX_CALLS_PER_MINUTE, period=ONE_MINUTE)
def execute_user_search(university_id: int, graduation_year: int, city_id: int) -> List[int]:
    """
    Выполняет пакетный поиск пользователей с учетом возраста и пола через VK API method `execute`.
    """
    method = 'execute'
    code = f"""
    var offset = 0;
    var male_results = [];
    var female_results = [];

    // Цикл для поиска мужчин
    while (offset < 1000) {{
        var male_part = API.users.search({{
            "university": {university_id},
            "university_year": {graduation_year},
            "city": {city_id},
            "count": 100,
            "sex": 2,
            "offset": offset
        }}).items;
        male_results = male_results + male_part;
        offset = offset + 100;
    }}

    offset = 0; // Сброс смещения

    // Цикл для поиска женщин
    while (offset < 1000) {{
        var female_part = API.users.search({{
            "university": {university_id},
            "university_year": {graduation_year},
            "city": {city_id},
            "count": 100,
            "sex": 1,
            "offset": offset
        }}).items;
        female_results = female_results + female_part;
        offset = offset + 100;
    }}

    // Объединение списков мужчин и женщин
    return male_results + female_results;
    """

    params = {
        'access_token': ACCESS_TOKEN,
        'v': API_VERSION,
        'code': code,
        'university_id': university_id,
        'graduation_year': graduation_year,
        'city_id': city_id
    }

    try:
        response = requests.post(API_URL + method, params=params)
        response.raise_for_status()
        data = response.json()
        if 'error' in data:
            raise ValueError(f"VK API error: {data['error']['error_msg']}")
        return [user['id'] for user in data['response']]
    except Exception as e:
        print(f"Ошибка при выполнении запроса execute: {e}")
        return []


def get_university_id(country_id: int, city_id: int, name: str) -> Optional[int]:
    """
    Получение ID университета по названию, стране и городу.
    """
    method = 'database.getUniversities'
    params = {
        'access_token': ACCESS_TOKEN,
        'v': API_VERSION,
        'country_id': country_id,
        'city_id': city_id,
        'q': name,
        'count': 1
    }

    try:
        response = requests.get(API_URL + method, params=params)
        response.raise_for_status()
        data = response.json()
        logger.debug(f"Получен ответ от VK API для поиска университета '{name}': {data}")
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP Ошибка при получении ID университета '{name}': {e}")
        return None
    except ValueError:
        logger.error(f"Ошибка декодирования JSON при получении ID университета '{name}'.")
        return None

    if 'error' in data:
        logger.error(f"Ошибка API при получении ID университета '{name}': {data['error']['error_msg']}")
        return None

    if data['response']['count'] > 0:
        university_id = data['response']['items'][0]['id']
        logger.info(f"Найден университет '{name}' с ID: {university_id}")
        return university_id
    else:
        logger.warning(f"Университет '{name}' не найден.")
        return None


async def get_all_user_ids_via_execute(university_id: int, graduation_year: int, city_id: int) -> List[int]:
    """
    Асинхронно вызывает `execute_user_search` и объединяет результаты.
    """
    user_ids = execute_user_search(university_id, graduation_year, city_id)
    return user_ids


def write_user_info_to_file(university: str, graduation_year: int, city: str, user_ids: List[int], output_dir: Path) -> str:
    """
    Запись количества найденных пользователей и их ID в txt файл.
    Имя файла содержит название ВУЗа, город и год выпуска.
    Возвращает путь к файлу.
    """
    # Удаляем запрещенные символы для имени файла
    safe_university = ''.join(c for c in university if c.isalnum() or c in (' ', '_', '-')).rstrip()
    safe_city = ''.join(c for c in city if c.isalnum() or c in (' ', '_', '-')).rstrip()
    file_name = f"{safe_university}_{safe_city}_{graduation_year}.txt"
    file_path = output_dir / file_name
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(f"ВУЗ: {university}\n")
            f.write(f"Город: {city}\n")
            f.write(f"Год выпуска: {graduation_year}\n")
            f.write(f"Количество найденных пользователей: {len(user_ids)}\n")
            f.write("ID пользователей:\n")
            for uid in user_ids:
                f.write(f"{uid}\n")
        logger.info(f"Сохранен файл: {file_path} с {len(user_ids)} пользователями.")
    except Exception as e:
        logger.error(f"Ошибка при записи файла {file_path}: {e}")
    return str(file_path)


def archive_files(file_paths: List[str], archive_name: str = "user_ids.zip") -> str:
    """
    Архивирование файлов в ZIP.
    Возвращает путь к архиву.
    """
    zip_path = OUTPUT_DIR / archive_name
    try:
        with zipfile.ZipFile(zip_path, 'w') as zipf:
            for file in file_paths:
                zipf.write(file, Path(file).name)
        logger.info(f"Создан архив: {zip_path}")
    except Exception as e:
        logger.error(f"Ошибка при создании архива {zip_path}: {e}")
    return str(zip_path)


@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    logger.info("Получен GET-запрос на главную страницу.")
    return templates.TemplateResponse("index.html", {"request": request})


@app.post("/search", response_class=HTMLResponse)
async def perform_search(
    request: Request,
    file: UploadFile = File(...)
):
    """
    Обработка загруженного файла и запуск поиска пользователей.
    """
    logger.info("Получен POST-запрос на /search.")

    # Сохранение загруженного файла
    upload_path = UPLOAD_DIR / file.filename
    try:
        with open(upload_path, 'wb') as buffer:
            buffer.write(await file.read())
        logger.info(f"Загружен файл: {upload_path}")
    except Exception as e:
        logger.error(f"Ошибка при сохранении файла {upload_path}: {e}")
        return templates.TemplateResponse("index.html", {
            "request": request,
            "message": "Ошибка при загрузке файла.",
            "message_type": "danger"
        })

    # Определение типа файла по расширению
    file_extension = upload_path.suffix.lower()
    logger.debug(f"Тип загруженного файла: {file_extension}")

    # Чтение таблицы
    try:
        if file_extension == '.csv':
            df = pd.read_csv(upload_path)
        elif file_extension in ['.xlsx', '.xls']:
            df = pd.read_excel(upload_path)
        else:
            logger.error(f"Неподдерживаемый формат файла: {file_extension}")
            return templates.TemplateResponse("index.html", {
                "request": request,
                "message": "Неподдерживаемый формат файла. Пожалуйста, загрузите .csv или .xlsx файл.",
                "message_type": "danger"
            })
        logger.info(f"Прочитана таблица с {len(df)} строками.")
        logger.debug(f"Названия столбцов: {df.columns.tolist()}")
    except Exception as e:
        logger.error(f"Ошибка при чтении таблицы: {e}")
        return templates.TemplateResponse("index.html", {
            "request": request,
            "message": "Ошибка при чтении таблицы. Убедитесь, что формат файла корректен.",
            "message_type": "danger"
        })

    # Проверка наличия необходимых столбцов
    required_columns = ['ВУЗ', 'Год выпуска', 'Город']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        logger.error(f"Таблица не содержит необходимых столбцов: {missing_columns}")
        return templates.TemplateResponse("index.html", {
            "request": request,
            "message": f"Таблица должна содержать столбцы: {', '.join(required_columns)}. Отсутствуют: {', '.join(missing_columns)}.",
            "message_type": "danger"
        })

    # Обработка каждой тройки "ВУЗ + год выпуска + город"
    file_paths = []

    # Интеграция с существующими функциями
    async def process_and_write_with_execute(university: str, graduation_year: int, city: str):
        try:
            city_id = get_city_id(city)
            if not city_id:
                logger.warning(f"Город '{city}' не найден. Пропуск.")
                await manager.broadcast(f"Город '{city}' не найден. Пропуск.")
                return

            university_id = get_university_id(1, city_id, university)  # Предполагается Россия (1)
            if not university_id:
                logger.warning(f"Университет '{university}' не найден. Пропуск.")
                await manager.broadcast(f"Университет '{university}' не найден. Пропуск.")
                return

            user_ids = await get_all_user_ids_via_execute(university_id, graduation_year, city_id)
            if not user_ids:
                logger.info(f"Не удалось найти пользователей для тройки: {university}, {graduation_year}, {city}")
                await manager.broadcast(f"Не удалось найти пользователей для тройки: {university}, {graduation_year}, {city}")
                return

            file_path = write_user_info_to_file(university, graduation_year, city, user_ids, OUTPUT_DIR)
            file_paths.append(file_path)
        except Exception as e:
            logger.error(f"Ошибка при обработке тройки {university}, {graduation_year}, {city}: {e}")
            await manager.broadcast(f"Ошибка при обработке тройки: {university}, {graduation_year}, {city}")

    async def archive_and_notify():
        # Ждём завершения всех задач
        await asyncio.sleep(10)  # Увеличена задержка для большего времени обработки
        if not file_paths:
            logger.info("Не было найдено ни одного пользователя для всех троек.")
            await manager.broadcast("Не было найдено ни одного пользователя для всех троек.")
            return
        archive_path = archive_files(file_paths)
        # Используем archive_path для отправки ссылки на скачивание
        await manager.broadcast("Архив создан: /download_archive")

    # Запуск асинхронных задач
    for index, row in df.iterrows():
        university = row['ВУЗ']
        graduation_year = row['Год выпуска']
        city = row['Город']

        logger.info(f"Обработка тройки {index + 1}: {university}, {graduation_year}, {city}")

        # Запуск асинхронной задачи
        asyncio.create_task(process_and_write_with_execute(university, graduation_year, city))

    # Запуск архивации
    asyncio.create_task(archive_and_notify())

    return templates.TemplateResponse("index.html", {
        "request": request,
        "message": "Обработка начата. Следите за прогрессом через WebSocket.",
        "message_type": "info"
    })


@app.get("/download_archive")
def download_archive():
    """
    Возвращает сформированный ZIP-архив для загрузки.
    """
    zip_path = UPLOAD_DIR / "user_ids.zip"
    return FileResponse(
        path=zip_path,
        filename="user_ids.zip",
        media_type="application/octet-stream"
    )

Ответы (0 шт):