ошибка 403 в питоне
пишу репрайсер для озона чтобы он подключался к ozon api и дальше менял сам цены, но выдается ошибка 403. Может надо добавить куки или что то в этом роде ? подскажите пожалуйста.
import tkinter.ttk as ttk
import requests
import time
import threading
import os
import numpy as np
import subprocess
import sys
from sklearn.tree import DecisionTreeRegressor
from statsmodels.tsa.arima.model import ARIMA
from keras.models import Sequential
from keras.layers import LSTM, Dense
import openpyxl
from openpyxl import Workbook
from datetime import datetime
import sqlite3
import logging
import warnings
import base64
import pyperclip
warnings.filterwarnings("ignore")
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class OzonRepricer:
def __init__(self):
self.root = tk.Tk()
self.root.title("Ozon Repricer")
self.root.geometry("800x600")
self.root.configure(background="#2e2e2e")
self.client_id_label = tk.Label(self.root, text="Client ID:", background="#2e2e2e", foreground="white")
self.client_id_label.pack()
self.client_id_entry = tk.Entry(self.root, background="#2e2e2e", foreground="white")
self.client_id_entry.pack()
self.client_secret_label = tk.Label(self.root, text="Client Secret:", background="#2e2e2e", foreground="white")
self.client_secret_label.pack()
self.client_secret_entry = tk.Entry(self.root, show="*", background="#2e2e2e", foreground="white")
self.client_secret_entry.pack()
self.product_ids_label = tk.Label(self.root, text="Product IDs (через запятую):", background="#2e2e2e", foreground="white")
self.product_ids_label.pack()
self.product_ids_entry = tk.Entry(self.root, background="#2e2e2e", foreground="white")
self.product_ids_entry.pack()
self.interval_label = tk.Label(self.root, text="Интервал (мин):", background="#2e2e2e", foreground="white")
self.interval_label.pack()
self.interval_entry = tk.Entry(self.root, background="#2e2e2e", foreground="white")
self.interval_entry.pack()
self.price_change_label = tk.Label(self.root, text="Процент изменения цены:", background="#2e2e2e", foreground="white")
self.price_change_label.pack()
self.price_change_entry = tk.Entry(self.root, background="#2e2e2e", foreground="white")
self.price_change_entry.pack()
self.start_button = tk.Button(self.root, text="Start", command=self.start_repricer, background="#2e2e2e", foreground="white")
self.start_button.pack()
self.status_label = tk.Label(self.root, text="Статус:", background="#2e2e2e", foreground="white")
self.status_label.pack()
self.error_label = tk.Label(self.root, text="", background="#2e2e2e", foreground="red")
self.error_label.pack()
self.color_button = tk.Button(self.root, text="Сменить цвет", command=self.change_color, background="#2e2e2e", foreground="white")
self.color_button.pack(side=tk.BOTTOM, anchor=tk.SW)
self.color = "white"
self.running = False
self.model = None
self.arima_model = None
self.lstm_model = None
self.cache = {}
self.product_table = ttk.Treeview(self.root, columns=("ID", "Название", "Цена", "Количество"), show="headings")
self.product_table.heading("ID", text="Product ID")
self.product_table.column("ID", width=100)
self.product_table.heading("Название", text="Название товара")
self.product_table.column("Название", width=200)
self.product_table.heading("Цена", text="Цена")
self.product_table.column("Цена", width=100)
self.product_table.heading("Количество", text="Количество")
self.product_table.column("Количество", width=100)
self.product_table.pack()
self.clipboard = pyperclip
self.ctrl_pressed = False
self.root.bind_all("<Control-c>", self.copy_to_clipboard)
self.root.bind_all("<Control-v>", self.paste_from_clipboard)
self.load_data()
self.client_secret_label = tk.Label(self.root, text="Client Secret:", background="#2e2e2e", foreground="white")
self.client_secret_label.pack()
self.client_secret_entry = tk.Entry(self.root, show="*", background="#2e2e2e", foreground="white")
self.client_secret_entry.pack()
# Вставляем код здесь
self.show_password_button = tk.Button(self.root, text="Показать пароль", command=self.show_password)
self.show_password_button.pack()
def show_password(self):
if self.client_secret_entry.cget('show') == '*':
self.client_secret_entry.config(show='')
self.show_password_button.config(text='Скрыть пароль')
else:
self.client_secret_entry.config(show='*')
self.show_password_button.config(text='Показать пароль')
def load_data(self):
try:
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS settings (
client_id TEXT,
client_secret TEXT,
product_ids TEXT,
interval INTEGER,
price_change REAL
)
''')
cursor.execute('SELECT * FROM settings')
data = cursor.fetchone()
if data:
self.client_id_entry.delete(0, tk.END)
self.client_id_entry.insert(0, data[0])
self.client_secret_entry.delete(0, tk.END)
self.client_secret_entry.insert(0, data[1])
self.product_ids_entry.delete(0, tk.END)
self.product_ids_entry.insert(0, data[2])
self.interval_entry.delete(0, tk.END)
self.interval_entry.insert(0, data[3])
self.price_change_entry.delete(0, tk.END)
self.price_change_entry.insert(0, data[4])
conn.close()
except sqlite3.Error as e:
logging.error(f"Ошибка при загрузке данных: {e}")
def copy_to_clipboard(self, event):
self.clipboard.copy("Текст, который будет скопирован")
print("Скопировано!")
def paste_from_clipboard(self, event=None):
try:
clipboard_data = self.clipboard.paste()
if self.client_id_entry.focus_get() == self.client_id_entry:
self.client_id_entry.insert(tk.END, clipboard_data)
elif self.client_secret_entry.focus_get() == self.client_secret_entry:
self.client_secret_entry.insert(tk.END, clipboard_data)
elif self.product_ids_entry.focus_get() == self.product_ids_entry:
self.product_ids_entry.insert(tk.END, clipboard_data)
elif self.interval_entry.focus_get() == self.interval_entry:
self.interval_entry.insert(tk.END, clipboard_data)
elif self.price_change_entry.focus_get() == self.price_change_entry:
self.price_change_entry.insert(tk.END, clipboard_data)
except Exception as e:
logging.error(f"Ошибка при вставке из буфера обмена: {e}")
def create_arima_model(self, prices):
try:
prices = [x for x in prices if x is not None and x!= np.nan]
if len(prices) < 10:
return None
order = (1, 1, 1)
trend = 'c'
model = ARIMA(prices, order=order, trend=trend)
model_fit = model.fit(method_kwargs={'warn_convergence': False}, maxiter=1000)
return model_fit
except Exception as e:
logging.error(f"Ошибка при создании модели ARIMA: {e}")
return None
def fit_lstm_model(self, historical_prices):
if len(historical_prices) < 2:
logging.error("Недостаточно данных для обучения модели.")
return
historical_prices = np.array(historical_prices)
X = []
y = []
for i in range(len(historical_prices) - 1):
X.append(historical_prices[i])
y.append(historical_prices[i + 1])
X = np.array(X).reshape(-1, 1, 1)
y = np.array(y).reshape(-1, 1)
self.lstm_model = self.create_lstm_model(historical_prices)
self.lstm_model.fit(X, y, epochs=100, batch_size=32)
def get_competitor_prices(self, product_id):
# Создаем сессию
session = requests.Session()
# Входим в аккаунт
login_url = "https://www.ozon.ru/login"
login_data = {
"login": "ваш_логин",
"password": "ваш_пароль"
}
response = session.post(login_url, data=login_data)
# Проверяем, что вход был успешным
if response.status_code == 200:
print("Вход успешный")
else:
print("Ошибка входа")
api_url = "https://api.ozon.ru/v2/product/info"
params = {
"product_id": product_id
}
response = session.get(api_url, params=params)
if response.status_code == 200:
print("Данные получены успешно")
data = response.json()
if 'competitors' in data:
competitor_prices = [competitor['price'] for competitor in data['competitors'] if 'price' in competitor]
return competitor_prices
else:
print("Ключ 'competitors' не найден в ответе API.")
return []
else:
print("Ошибка запроса")
return []
try:
url = 'https://api.ozon.ru/v2/product/info'
client_id = "your_client_id_here"
client_secret = "your_client_secret_here"
auth = (client_id, client_secret)
params = {
'product_id': product_id
}
response = requests.get(url, auth=auth, params=params, timeout=10)
response.raise_for_status() # Это вызовет исключение для HTTP-ошибок
data = response.json()
if 'competitors' in data:
competitor_prices = [competitor['price'] for competitor in data['competitors'] if 'price' in competitor]
return competitor_prices
else:
logging.error("Ключ 'competitors' не найден в ответе API.")
return []
except requests.exceptions.HTTPError as errh:
logging.error(f"HTTP-ошибка: {errh}")
except requests.exceptions.ConnectionError as errc:
logging.error(f"Ошибка соединения: {errc}")
except requests.exceptions.Timeout as errt:
logging.error(f"Ошибка таймаута: {errt}")
except requests.exceptions.RequestException as err:
logging.error(f"Что-то пошло не так: {err}")
return []
def calculate_average_price(self, competitor_prices):
if competitor_prices:
return sum(competitor_prices) / len(competitor_prices)
else:
return 0
def create_model(self, prices):
if len(prices) < 5:
logging.error("Недостаточно данных для создания модели.")
return None
X = np.array(prices[:-1]).reshape(-1, 1)
y = np.array(prices[1:])
self.model = DecisionTreeRegressor()
self.model.fit(X, y)
return self.model
def create_lstm_model(self, historical_prices):
model = Sequential()
model.add(LSTM(10, input_shape=(historical_prices.shape[0], 1)))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def predict_price(self, average_price):
if self.model is not None:
return self.model.predict(average_price)
else:
return average_price
def predict_price_arima(self, average_price):
if self.arima_model is not None:
return self.arima_model.predict(average_price.reshape(1, 1))
else:
return average_price
def predict_price_lstm(self, average_price):
if self.lstm_model is not None:
average_price = np.array([average_price])
average_price = average_price.reshape((1, 1, 1))
predicted_price = self.lstm_model.predict(average_price)
return predicted_price[0][0][0]
else:
return average_price
def update_price(self, product_id, new_price):
try:
url = 'https://api.ozon.ru/v2/product/price'
auth = f"{self.client_id_entry.get()}:{self.client_secret_entry.get()}"
auth = base64.b64encode(auth.encode()).decode()
headers = {
'Authorization': f"Basic {auth}",
}
params = {
'product_id': product_id,
'price': new_price
}
response = requests.post(url, headers=headers, params=params, timeout=10)
logging.info(f"Запрос к API: {url} с параметрами {params}")
logging.info(f"Ответ от API: {response.text}")
if response.status_code == 200:
self.status_label.config(text=f"Цена товара {product_id} изменена на {new_price}")
else:
logging.error(f"Ошибка при запросе к API: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка при запросе к API: {e}")
def get_historical_prices(self):
try:
url = 'https://api.ozon.ru/v2/product/historical_prices'
auth = f"{self.client_id_entry.get()}:{self.client_secret_entry.get()}"
auth = base64.b64encode(auth.encode()).decode()
headers = {
'Authorization': f"Basic {auth}",
}
params = {
'product_id': self.product_ids_entry.get(),
'limit': 1000 # Извлекать до 1000 исторических цен
}
response = requests.get(url, headers=headers, params=params, timeout=10)
logging.info(f"Запрос к API: {url} с параметрами {params}")
logging.info(f"Ответ от API: {response.text}")
if response.status_code == 200:
data = response.json()
if 'prices' in data:
return data['prices']
else:
logging.error("Ключ 'prices' не найден в ответе API.")
return []
else:
logging.error(f"Ошибка при запросе к API: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка при запросе к API: {e}")
return []
def start_repricer(self):
thread = threading.Thread(target=self._start_repricer)
thread.start()
def _start_repricer(self):
client_id = self.client_id_entry.get()
client_secret = self.client_secret_entry.get()
product_ids = self.product_ids_entry.get().split(",")
price_change = self.price_change_entry.get()
try:
price_change_percentage = float(price_change) / 100
except ValueError:
price_change_percentage = 0
self.error_label.config(text="Пожалуйста, введите процент изменения цены.")
if not client_id or not client_secret or not product_ids:
self.error_label.config(text="Не все поля заполнены.")
return
try:
product_ids = [int(x) for x in product_ids]
except ValueError:
self.error_label.config(text="Product IDs должны быть целыми числами.")
return
interval = int(self.interval_entry.get())
self.status_label.config(text="Repricer запущен.")
self.running = True
# Создание модели ARIMA
self.arima_model = self.create_arima_model(self.get_historical_prices())
# Обучение модели LSTM
historical_prices = self.get_historical_prices()
self.fit_lstm_model(historical_prices)
while self.running:
for product_id in product_ids:
competitor_prices = self.get_competitor_prices(product_id)
average_price = self.calculate_average_price(competitor_prices)
new_price = self.predict_price_lstm(average_price) * (1 + price_change_percentage)
self.update_price(product_id, new_price)
# Обновление таблицы
self.update_product_table(product_id, new_price)
time.sleep(interval * 60)
if not self.running:
break
def update_product_table(self, product_id, new_price):
try:
url = 'https://api.ozon.ru/v2/product/info'
auth = f"{self.client_id_entry.get()}:{self.client_secret_entry.get()}"
auth = base64.b64encode(auth.encode()).decode()
headers = {
'Authorization': f"Basic {auth}",
}
params = {
'product_id': product_id
}
response = requests.get(url, headers=headers, params=params, timeout=10)
logging.info(f"Запрос к API: {url} с параметрами {params}")
logging.info(f"Ответ от API: {response.text}")
if response.status_code == 200:
data = response.json()
product_name = data.get('name', "Название не найдено")
product_quantity = data.get('quantity', 0)
self.product_table.insert('', 'end', values=(product_id, product_name, new_price, product_quantity))
price_history = self.get_price_history(product_id)
price_history.append((datetime.now(), new_price))
self.save_price_history(product_id, price_history)
else:
logging.error(f"Ошибка при запросе к API: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка при запросе к API: {e}")
def save_price_history(self, product_id, price_history):
try:
conn = sqlite3.connect('price_history.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS price_history (product_id INTEGER, date TEXT, price REAL)')
for date, price in price_history:
cursor.execute('INSERT INTO price_history VALUES (?,?,?)', (product_id, date, price))
conn.commit()
conn.close()
except sqlite3.Error as e:
logging.error(f"Ошибка при сохранении истории изменений цены: {e}")
def get_price_history(self, product_id):
try:
conn = sqlite3.connect('price_history.db')
cursor = conn.cursor()
cursor.execute('SELECT date, price FROM price_history WHERE product_id=?', (product_id,))
price_history = cursor.fetchall()
conn.close()
return price_history
except sqlite3.Error as e:
logging.error(f"Ошибка при получении истории изменений цены: {e}")
return []
def change_color(self):
if self.color == "white":
self.root.configure(background="black")
self.client_id_label.configure(background="black", foreground="white")
self.client_id_entry.configure(background="black", foreground="white")
self.client_secret_label.configure(background="black", foreground="white")
self.client_secret_entry.configure(background="black", foreground="white")
self.product_ids_label.configure(background="black", foreground="white")
self.product_ids_entry.configure(background="black", foreground="white")
self.interval_label.configure(background="black", foreground="white")
self.interval_entry.configure(background="black", foreground="white")
self.price_change_label.configure(background="black", foreground="white")
self.price_change_entry.configure(background="black", foreground="white")
self.start_button.configure(background="black", foreground="white")
self.status_label.configure(background="black", foreground="white")
self.error_label.configure(background="black", foreground="red")
self.color_button.configure(background="black", foreground="white")
self.color = "black"
else:
self.root.configure(background="#2e2e2e")
self.client_id_label.configure(background="#2e2e2e", foreground="white")
self.client_id_entry.configure(background="#2e2e2e", foreground="white")
self.client_secret_label.configure(background="#2e2e2e", foreground="white")
self.client_secret_entry.configure(background="#2e2e2e", foreground="white")
self.product_ids_label.configure(background="#2e2e2e", foreground="white")
self.product_ids_entry.configure(background="#2e2e2e", foreground="white")
self.interval_label.configure(background="#2e2e2e", foreground="white")
self.interval_entry.configure(background="#2e2e2e", foreground="white")
self.price_change_label.configure(background="#2e2e2e", foreground="white")
self.price_change_entry.configure(background="#2e2e2e", foreground="white")
self.start_button.configure(background="#2e2e2e", foreground="white")
self.status_label.configure(background="#2e2e2e", foreground="white")
self.error_label.configure(background="#2e2e2e", foreground="red")
self.color_button.configure(background="#2e2e2e", foreground="white")
self.color = "white"
def on_closing(self):
self.save_data()
self.root.iconify()
def save_data(self):
try:
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS settings (client_id TEXT, client_secret TEXT, product_ids TEXT, interval INTEGER, price_change REAL)')
cursor.execute('INSERT INTO settings VALUES (?,?,?,?,?)', (self.client_id_entry.get(), self.client_secret_entry.get(), self.product_ids_entry.get(), int(self.interval_entry.get()), float(self.price_change_entry.get())))
conn.commit()
conn.close()
except sqlite3.Error as e:
logging.error(f"Ошибка при сохранении данных: {e}")
def run(self):
self.root.mainloop()
def install_libraries():
libraries = [
'tkinter',
'threading',
'time',
'ys',
'numpy',
'pandas',
'klearn'
]
for library in libraries:
try:
__import__(library)
except ImportError:
print(f"Установка библиотеки {library}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", library])
print(f"Библиотека {library} установлена успешно!")
if __name__ == "__main__":
install_libraries()
app = OzonRepricer()
app.run()
Ответы (1 шт):
Озон - это прекрасный пример современного WEB-приложения, у которого первая страница состоит из одного js-скрипта, который грузит с сервера всё остальное, и формирует код страницы, которую уже увидит пользователь.
Чтобы увидеть элементы страницы, а также процесс загрузки страницы, можно просто нажать F12 в браузере.
Почему это важно? потому, что у Озона (также, как и у многих популярных сайтов) есть многоуровневая система защиты от ботов. И, вообще говоря, когда Вы ходите на Озон - вы ходите еще и через что то типа Cloudflare, у которой есть своя система защиты от ботов, прописанная в виде правил.
Зачем нужна такая система? Создатели сервиса заинтересованы в живых пользователях, а не в ботах. И эта система, обнаружив признаки бота, вполне может выдавать "код 403" в ответ на любые действия.
Самое простое, где можно ошибиться в процессе - это заголовки. Их можно просмотреть в браузере, понять и сэмулировать в своём приложении. Что конкретно не работает у Вас - я боюсь, здесь Вам никто не скажет. Потому что код большой, а для вопросов рекомендуется создание минимального воспроизводимого примера. Грубо говоря, Вы пишете минимальный код, который должен делать что то одно, например, логин в Озон. И этот код демонстрирует конкретную проблему. Вот тогда - есть шансы на ответ!