Почему второй клиент не может получить новые сообщения?

use Workerman\Worker;
use Workerman\Lib\Timer;

require './vendor/autoload.php';
$connect = require './system/db.php';
$ws_worker = new Worker("websocket://0.0.0.0:2346");

function sendMessage($userId, $receiverId, $messageText)
{
    global $connect;

    $query = "INSERT INTO messages (sender_id, receiver_id, message_text, status, read_status, date, time)
                VALUES (:userId, :receiverId, :messageText, 1, 0, NOW(), NOW())";
    $stmt = $connect->prepare($query);
    $stmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $stmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $stmt->bindParam(':messageText', $messageText, PDO::PARAM_STR);
    $stmt->execute();

    $lastMessageId = $connect->lastInsertId();

    foreach ($GLOBALS['ws_worker']->connections as $connection) {
    if (isset($connection->userId) && $connection->userId == $userId && isset($connection->receiverId) && $connection->receiverId == $receiverId) {
        $connection->lastMessageId = $lastMessageId;
        break;
    }
    }
}


function getMessages($userId, $receiverId)
{
    global $connect;

    $query = "SELECT * FROM messages 
                WHERE ((sender_id = :userId AND receiver_id = :receiverId) 
                OR (sender_id = :receiverId AND receiver_id = :userId))
                AND status = 2
                ORDER BY date, time";

    $stmt = $connect->prepare($query);
    $stmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $stmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $stmt->execute();

    return $stmt->fetchAll(PDO::FETCH_ASSOC);
}

function getNewMessages($userId, $receiverId, $lastMessageId)
{
    global $connect;

    $query = "SELECT * FROM messages 
                WHERE id > :lastMessageId AND read_status = 0 AND status = 1 
                AND ((sender_id = :userId AND receiver_id = :receiverId) 
                OR (sender_id = :receiverId AND receiver_id = :userId))
                ORDER BY date, time";

    $stmt = $connect->prepare($query);
    $stmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $stmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $stmt->bindParam(':lastMessageId', $lastMessageId, PDO::PARAM_INT);
    $stmt->execute();

    $updateStatusQuery = "UPDATE messages SET status = 2 WHERE status = 1 AND ((sender_id = :userId AND receiver_id = :receiverId) OR (sender_id = :receiverId AND receiver_id = :userId))";
    $updateStatusStmt = $connect->prepare($updateStatusQuery);
    $updateStatusStmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $updateStatusStmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $updateStatusStmt->execute();

    return $stmt->fetchAll(PDO::FETCH_ASSOC);
}

function getUnreadMessages($userId, $receiverId)
{
    global $connect;

    $query = "SELECT * FROM messages 
                WHERE sender_id = :receiverId AND receiver_id = :userId
                AND read_status = 0 AND status = 2
                ORDER BY date, time";

    $stmt = $connect->prepare($query);
    $stmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $stmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $stmt->execute();

    return $stmt->fetchAll(PDO::FETCH_ASSOC);
}

function markMessageAsRead($userId, $receiverId, $messageId)
{
    global $connect;

    $updateQuery = "UPDATE messages SET read_status = 1 
                    WHERE id = :messageId AND receiver_id = :userId AND sender_id = :receiverId AND read_status = 0";
    $updateStmt = $connect->prepare($updateQuery);
    $updateStmt->bindParam(':userId', $userId, PDO::PARAM_INT);
    $updateStmt->bindParam(':receiverId', $receiverId, PDO::PARAM_INT);
    $updateStmt->bindParam(':messageId', $messageId, PDO::PARAM_INT);
    $updateStmt->execute();
}

$ws_worker->count = 4;


$ws_worker->onConnect = function ($connection) {
    global $connect;
    echo "Соединение открыто\n";
};

$ws_worker->onClose = function ($connection) {
    echo "Соединение закрыто\n";
};



$ws_worker->onMessage = function ($connection, $data) {
    global $connect;

    $decodedData = json_decode($data, true);

    if (isset($decodedData['user_id']) && isset($decodedData['receiver_id'])) {
    $userId = $decodedData['user_id'];
    $receiverId = $decodedData['receiver_id'];

    Timer::add(1, function () use ($connection, $userId, $receiverId) {
        try {
        $lastMessageId = $connection->lastMessageId ?? 0;

        $unreadMessages = getUnreadMessages($receiverId, $userId);

        $data = [
            'unread_messages' => $unreadMessages ?? [],
        ];

        $jsonData = json_encode($data);
        $connection->send($jsonData);
        } catch (\Exception $e) {
        echo "Error: " . $e->getMessage() . "\n";
        }
    });

    if (isset($decodedData['message_text'])) {
        sendMessage($userId, $receiverId, $decodedData['message_text']);
        $connection->lastMessageId = $connect->lastInsertId();
    }

    $lastMessageId = $decodedData['last_message_id'] ?? 0;

    if (isset($decodedData['load_all']) && $decodedData['load_all'] === true) {
        $allMessages = getMessages($userId, $receiverId);
        $responseData = [
        'all_messages' => array_unique($allMessages, SORT_REGULAR),
        'new_messages' => [],
        ];
    } else {
        $newMessages = getNewMessages($userId, $receiverId, $lastMessageId);
        markMessageAsRead($userId, $receiverId, $lastMessageId);
        $responseData = [
        'new_messages' => $newMessages,
        ];
    }

    $responseData['user_id'] = $userId;
    $responseData['receiver_id'] = $receiverId;

    $response = json_encode($responseData);
    $connection->send($response);
    }
};

Worker::runAll();

Почему-то собеседник не получает мои сообщения асинхронно: даже если он отправит какое-то сообщение, моё ранее отправленное сообщение он не увидит.
Работают только функции sendMessage, getMessages, getNewMessages - и всё, а остальные выполняются асинхронно, вероятно, ошибка как раз-таки в функциях, которые выполняются не асинхронно.


Ответы (0 шт):