Проблема с зависанием скрипта, определяющего наличие аутентификации на rtsp-потоке
Я пишу свой скрипт на Python, который находит открытые камеры с помощью shodan.
От API у меня есть около 10 url-адресов, работающих через rtsp (предположительно) и не требующих аутентификации.
Но, как правило, API shodan`a не всегда выдаёт свежую информацию, и url-адресам нужна дополнительная проверка на наличие аутентификации.
Сам отрывок из кода:
def some_func(ip):
cap = cv2.VideoCapture(f"rtsp://{ip}/").isOpened() # В ip включен порт
if not cap:
print("CLOSED:", ip)
else:
print("OPENED:", ip)
Если поток открыт (то есть не требует аутентификации) возвращается True
, иначе False
.
Суть проблемы в том, что я пытался сделать это через библиотеку opencv на python через метод VideoCapture
— если поток открыт, то где-то через короткий промежуток времени код выполняется, но если rtsp-поток закрыт (то есть выводятся ошибки типа 404, 403 и т. п.), то скрипт встаёт намертво (через 2 минуты может отвиснуть).
Вот пример вывода:
[rtsp @ 000002cccd9d3f80] method DESCRIBE failed: 404 Stream Not Found
Я бы смирился с этим, если бы у этого метода был бы параметр timeout
, но я его не нашёл!
Может есть какой-нибудь параметр, отвечающий за timeout
в VideoCapture
, или любая другая библиотека, имеющая похожий метод с таймаутом?
Ответы (1 шт):
В результате поиска я узнал, что метод VideoCapture
заимствует весь поток, в итоге я нашел решение проблемы через библиотеку threading с помощью ChatGPT, т.к. в потоке как раз можно выставить timeout
, что мне и требовалось.
Вот отрывок кода решающий мою проблему:
def check_rtsp_stream(ip, Timeout):
result = {'open': False} # словарь для хранения результата
def worker():
cap = cv2.VideoCapture(f"rtsp://{ip}/")
result['open'] = cap.isOpened()
cap.release()
thread = threading.Thread(target=worker)
thread.start()
thread.join(timeout=Timeout) # ждем завершения потока с таймаутом
# Проверяем, завершился ли поток
if thread.is_alive():
print("TIMEOUT:", ip)
return False # если поток не завершился, считаем, что поток закрыт
return result['open'] # возвращаем результат проверки
# Пример использования
if check_rtsp_stream(ip):
print("OPENED:", ip)
else:
print("CLOSED:", ip)