Can't grab frame opencv и beeware python

писал приложение на python используя библиотеку beeware и opencv для подсчета количества труб на фото, когда пользователь нажимает на кнопку "Сделать фото", то программа зависает и вылазит следующая ошибка:

[ WARN:[email protected]] global cap_msmf.cpp:476 `anonymous-namespace'::SourceReaderCB::OnReadSample videoio(MSMF): OnReadSample() is called with error status: -1072875772
[ WARN:[email protected]] global cap_msmf.cpp:488 `anonymous-namespace'::SourceReaderCB::OnReadSample videoio(MSMF): async ReadSample() call is failed with error status: -1072875772
[ WARN:[email protected]] global cap_msmf.cpp:1769 CvCapture_MSMF::grabFrame videoio(MSMF): can't grab frame. Error: -1072875772

после открывается это же приложение поверх старого и все зависает окончательно, читал на некоторых форумах, что из-за драйвера может быть, я искал обновления для своей камеры, но их нет вот характеристика:

камера встроенная в ноутбук, питон 3.7, opencv-python 4.10.0.84, briefcase 0.3.9

помогите решить пожалуйста вот код:

import toga
from toga.style import Pack
from toga.style.pack import COLUMN, CENTER, ROW
import cv2
import numpy as np
import threading
import os
from ultralytics import YOLO
from PIL import Image
import io
import time


class PipeCounterApp(toga.App):

    def startup(self):
        self.main_window = toga.MainWindow(title=self.formal_name)
        self.main_window.size = (1920, 1080)  # Set window size

        # Create a container for camera display and processing status
        self.camera_container = toga.Box(
            style=Pack(direction=COLUMN, padding=10, alignment=CENTER, background_color='#333333'))
        self.image_view = toga.ImageView(style=Pack(height=600, width=900))
        self.processing_label = toga.Label("", style=Pack(padding=(10, 10), font_size=20, color='#FFFFFF',
                                                          text_align=CENTER))
        self.camera_container.add(self.image_view)
        self.camera_container.add(self.processing_label)

        # Create modern styled buttons
        self.capture_button = toga.Button("Сделать фото", on_press=self.capture_image,
                                          style=Pack(padding=(15, 25), font_size=18, background_color='#FFFFFF',
                                                     color='#333333'))
        self.exit_button = toga.Button("Выйти", on_press=self.exit_app,
                                       style=Pack(padding=(15, 25), font_size=18, background_color='#FFFFFF',
                                                  color='#333333'))

        # Create status label to display results
        self.status_label = toga.Label("Нажмите 'Сделать фото' что бы посчитать количество труб",
                                       style=Pack(padding=(10, 10), font_size=18, color='#333333', text_align=CENTER))

        # Organize elements in a column
        button_box = toga.Box(style=Pack(direction=ROW, alignment=CENTER, padding=10, background_color='#333333'))
        button_box.add(self.capture_button)
        button_box.add(self.exit_button)

        main_box = toga.Box(style=Pack(direction=COLUMN, alignment=CENTER, padding=10, background_color='#333333'))
        main_box.add(self.camera_container)
        main_box.add(self.status_label)
        main_box.add(button_box)

        self.main_window.content = main_box
        self.main_window.show()

        # Initialize camera
        self.camera = cv2.VideoCapture(0)

        # Load model
        self.model = YOLO('C:/Users/User/PycharmProjects/dinal_count_pipe/yolo_dataset/best.pt')

        # Folder to save processed images
        self.processed_images_folder = 'processed_images'
        os.makedirs(self.processed_images_folder, exist_ok=True)

        # Start thread to display camera feed
        self.camera_thread = threading.Thread(target=self.update_camera_feed, daemon=True)
        self.camera_thread.start()

    def update_camera_feed(self):
        while True:
            ret, frame = self.camera.read()
            if ret:
                # Convert image to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                self.display_image(frame_rgb)
            else:
                self.status_label.text = "Failed to capture image."

            # Reduce sleep time to increase FPS
            time.sleep(0.00001)  # Update frame every 10 ms

    def capture_image(self, widget):
        self.processing_label.text = "Processing... Please wait."
        ret, frame = self.camera.read()
        if ret:
            image_path = 'captured_image.jpg'
            cv2.imwrite(image_path, frame)

            count = self.process_image(image_path)

            self.status_label.text = f"Pipe Count: {count}"

            # Save processed image with annotations
            self.save_processed_image(frame, count)

            os.remove(image_path)
        else:
            self.status_label.text = "Failed to capture image."

        self.processing_label.text = ""  # Clear text after processing

    def process_image(self, image_path):
        try:
            results = self.model.predict(image_path, save=False)
            count = len(results[0].boxes)
            return count
        except Exception as e:
            print(f'Error processing image: {e}')
            return 0

    def save_processed_image(self, frame, count):
        try:
            # Draw annotations on the image
            annotated_frame = frame.copy()
            for box in self.model.predict('captured_image.jpg')[0].boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(annotated_frame, f'Pipe', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

            # Save annotated image
            processed_image_path = os.path.join(self.processed_images_folder, f'processed_image_{count}.jpg')
            cv2.imwrite(processed_image_path, annotated_frame)
        except Exception as e:
            print(f'Error saving processed image: {e}')

    def display_image(self, frame):
        try:
            # Convert image from RGB to format suitable for Toga
            pil_image = Image.fromarray(frame)
            with io.BytesIO() as output:
                pil_image.save(output, format="PNG")
                data = output.getvalue()

            # Create a toga Image object
            image = toga.Image(data=data)
            self.image_view.image = image
        except Exception as e:
            print(f'Error displaying image: {e}')

    def exit_app(self, widget):
        self.main_window.close()

    def shutdown(self):
        if self.camera.isOpened():
            self.camera.release()


def main():
    return PipeCounterApp('PipeCounterApp', 'com.example.pipecounter')


if __name__ == '__main__':
    main().main_loop()

Ответы (2 шт):

Автор решения: Валентин Димитерко

Попробуйте другой видеобэкэнд например CAP_DSHOW (DirectShow) вместо MSMF.

→ Ссылка
Автор решения: Валентин Димитерко

что-то по типу такого :

import threading
import cv2
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW

class PipeCounterApp(toga.App):
    def startup(self):
        # Создание основного окна
        self.main_window = toga.MainWindow(title=self.name)
        
        # Создание кнопки
        self.capture_button = toga.Button('Сделать фото', on_press=self.capture_photo)
        
        # Расположение кнопки в интерфейсе
        box = toga.Box(style=Pack(direction=COLUMN))
        box.add(self.capture_button)
        
        self.main_window.content = box
        self.main_window.show()

    def capture_photo(self, widget):
        # Создание и запуск потока для захвата фото
        capture_thread = threading.Thread(target=self.capture_frame)
        capture_thread.start()

    def capture_frame(self):
        # Захват видео с камеры
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
        
        if not cap.isOpened():
            print("Не удалось открыть камеру")
            return

        ret, frame = cap.read()
        if ret:
            # Сохранение или обработка кадра
            cv2.imwrite('photo.jpg', frame)
            print("Фото сделано и сохранено как 'photo.jpg'")
        else:
            print("Не удалось захватить кадр")
        
        cap.release()

def main():
    return PipeCounterApp('Pipe Counter', 'org.beeware.pipe_counter')

if __name__ == '__main__':
    main().main_loop()
→ Ссылка