Java Работа с Thread и BlockingQueue queue; Как составить условие для добавления яда в очередь для завершения работы потоков

Есть задача - потоки - исполнители берут из очереди задачу, исполняют очередную ее стадию, после чего, если это не последняя стадия, ставят задачу обратно в очередь.

Реализовано на BlockingQueue<MutliStageTask> queue; вот моя реализация:

while (true) {
            try {
                mutliStageTask = queue.take();
                System.out.println("Размер очереди - " + queue.size());
                System.out.println("Размер листа подзадач - " + mutliStageTask.getList().size());

                //Записываем в экземпляр класса ball параметры из нулевого элемента листа.
                Ball ball1 = (Ball) mutliStageTask.getList().get(0);

                if (ball1.getResult() == "null"){
                    System.out.println("toxic value add " + "\n" + "***** getTask finished work /////");
                    break;
                }

                if (ball1.getResult().equals("Put end")){
                    PutFinish = true;
                }

                if (!mutliStageTask.getList().isEmpty()){         //Если лист не пуст - то.
                    ball1.execute();                              //Вызываем execute.
                    mutliStageTask.getList().remove(0);     //Удаляем 0 элемент списка.
                    if (!mutliStageTask.getList().isEmpty()){     //Если список задач не пустой, добавляем задачу в очередь.
                        queue.put(mutliStageTask);
                        System.out.println("'List.Not.isEmpty' - return task to work");
                    }
                }
                System.out.println("---------------------------");
                System.out.println("Размер очереди 2 -- " + queue.size());
                System.out.println("Размер листа подзадач 2 -- " + mutliStageTask.getList().size());
                Thread.sleep(100);

                if ( PutFinish && queue.isEmpty() && mutliStageTask.getList().isEmpty()) {   //Ожидание завершения работы других потоков.
                    end = true;
                    System.out.println("List empty");
                }
            } catch (InterruptedException e) {

Суть задачи, в очередь добавляется класс у которого поле list, в него записываем другой класс в котором поле result(добавляю его 3 экземпляра), в классе implement Ru_nable run() достаю из очереди задачу, у неё извлекаю лист, и удаляю из list элемент, проверяю что лист не пуст и снова добавляю задачу в очередь. По завершению обработки мне нужно отравить в очередь яд и завершить работу потоков. Я реализовал проверку завершения на основе isEmpty списка задач и isEmpty list(подзадач), считаю что это момент для завершения.

Решение не верное, так как второй поток может еще работать. Как более правильно реализовать проверку момента, для добавления яда, что бы оба потока успели завершить работу?

На данный момент немного поправил код, избавился от "index 0 out of bounds for length 0". Пока решил добавлением List и после того как очередь задач обработана, записываю в list 1(просто условность), таким образом жду завершения обработки второго потока, затем в main проверяю list.getSize() == 2 и добавляю яд в очередь. Уверенности в правильности решения нет.

Если у кого то есть вариант, как правильнее это реализовать, буду признателен.

  @Override
    public void run() {
        System.out.println("Get " + Thread.currentThread().getName() + " Started");
        while (true) {

            try {
                mutliStageTask = queue.take();                      //Блокирующая очередь.

                //Записываем в экземпляр класса ball параметры из нулевого элемента листа.
                if (!mutliStageTask.getList().isEmpty()) {          //Добавил проверку на пустой лист перед записью
                    ball1 = (Ball) mutliStageTask.getList().get(0); //вызывало ошибку index of bound 0.
                }

                if (ball1.getResult() == "null"){                   //Проверка для добавления яда.
                    System.out.println("toxic value add " + "\n" + "***** getTask finished work /////");
                    break;                                          //Выходим из цикла.
                }

                if (!mutliStageTask.getList().isEmpty()){         //Если лист не пуст - то.
                    mutliStageTask.getList().remove(0);     //Удаляем 0 элемент списка.
                    ball1.execute();                              //Вызываем execute.
                    if (!mutliStageTask.getList().isEmpty()){     //Если список задач не пустой, добавляем задачу в очередь.
                        queue.put(mutliStageTask);
                        System.out.println("'List.Not.isEmpty' - return task to work - " + Thread.currentThread().getName());
                    }
                }
                System.out.println(Thread.currentThread().getName() + " Размер очереди 2 -- " + queue.size());
                System.out.println(Thread.currentThread().getName() + " Размер листа подзадач 2 -- " + mutliStageTask.getList().size());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if ( queue.isEmpty() && mutliStageTask.getList().isEmpty()) {   //Ожидание завершения работы других потоков.
                listDone.add(1);
            }
            System.out.println("ЗАДАЧ НЕТ! размер листа - " + listDone.size() + " " + Thread.currentThread().getName() + " - " +Thread.currentThread().getState());
        }
    }


Ответы (0 шт):