Scala akka | Проблема с обменом между клиентами сообщениями

Пишу irc p2p chat на scala с akka typed cluster. Проблема в том, что при создании нескольких клиентов, также создаются и новые акторы и соответственно мапа с пользователями создается снова и всегда будет 1 пользователь. Клиенты подключаются к единому кластеру, но более ничего не получается, не могу разобраться, как сделать так, чтобы можно было рассылать общие сообщения между клиентами, без применения сервер-клиент и получился p2p чат, вот ссылка на репозиторий https://github.com/vdavdov/irc-akka-chat

Также мои классы:

ChatActor
package by.vdavdov.akka

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

object ChatActor {

  sealed trait Command
  case class SendMessage(user: String, message: String) extends Command
  case class JoinGroup(user: String, replyTo: ActorRef[String]) extends Command
  case class PeerJoined(peerAddress: String) extends Command
  case class DirectMessage(from: String, to: String, message: String) extends Command
  case class LeaveGroup(user: String) extends Command

  // Структура для хранения данных пользователя
  case class User(ref: ActorRef[String], name: String)

  def apply(): Behavior[Command] = Behaviors.setup { context =>
    var users = Map.empty[String, User]

    Behaviors.receiveMessage {
      case SendMessage(user, message) =>
        context.log.info(s"Group message from $user: $message")
        // Отправляем сообщения всем пользователям
        context.log.info(s"Now in group is ${users.keys}")
        users.values.foreach { u =>
          context.log.info(s"Sending message to ${u.name}: $message")
          u.ref ! s"$user: $message"
        }
        Behaviors.same

      case JoinGroup(user, replyTo) =>
        context.log.info(s"$user is trying to join the group")
        if (!users.contains(user)) {
          val newUser = User(replyTo, user)
          users += (user -> newUser)
          // Уведомляем пользователя о том, что он успешно присоединился
          replyTo ! s"$user has joined the chat."
          context.log.info(s"$user has joined the group.")
        } else {
          // Уведомляем пользователя о том, что он уже в чате
          replyTo ! s"$user is already in the chat."
          context.log.warn(s"$user is already in the chat.")
        }
        Behaviors.same

      case PeerJoined(peerAddress) =>
        context.log.info(s"New peer joined: $peerAddress")
        Behaviors.same

      case DirectMessage(from, to, message) =>
        users.get(to) match {
          case Some(user) =>
            user.ref ! s"Direct message from $from: $message"
            context.log.info(s"Direct message from $from to $to: $message")
          case None =>
            context.log.warn(s"$to is not in the chat, unable to send direct message from $from")
        }
        Behaviors.same

      case LeaveGroup(user) =>
        if (users.contains(user)) {
          users = users - user
          context.log.info(s"$user has left the group.")
        } else {
          context.log.warn(s"$user attempted to leave the group but was not found.")
        }
        Behaviors.same
    }
  }
}

ChatApplication

package by.vdavdov.akka

import akka.actor.Address
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.cluster.typed.{Cluster, Join}
import javafx.application.Application
import javafx.fxml.FXMLLoader
import javafx.scene.Scene
import javafx.stage.Stage

import java.util.UUID

object ChatApp {
  // Храним ActorSystem и ActorRef для одного экземпляра
  private var system: ActorSystem[ChatActor.Command] = _
  private var chatActorRef: ActorRef[ChatActor.Command] = _


  def main(args: Array[String]): Unit = {
    // Инициализация ActorSystem и ChatActor
    system = ActorSystem(ChatActor(), "ChatCluster")
    chatActorRef = system.systemActorOf(ChatActor(), "ChatActor")

    // Присоединение к кластеру
    val cluster = Cluster(system)


    cluster.manager ! Join(Address("akka", "ChatCluster", "127.0.0.1", 2552)) // и тут меняю каждый раз

    Application.launch(classOf[ChatApplication], args: _*)
  }

  // Метод для доступа к ActorRef
  def getChatActor: ActorRef[ChatActor.Command] = chatActorRef
  def getReplyActor: ActorRef[String] = system.systemActorOf(ReplyActor(), "ReplyActor")
}

class ChatApplication extends Application {

  override def start(primaryStage: Stage): Unit = {
    // Загружаем FXML
    val loader = new FXMLLoader(getClass.getResource("/fxml/chat.fxml"))
    val scene = new Scene(loader.load())

    // Генерируем уникальное имя пользователя
    val username = UUID.randomUUID().toString

    // Получаем ActorRef для ChatActor
    val chatActorRef = ChatApp.getChatActor

    // Передаем actorRef и имя пользователя в контроллер UI
    val replyActor : ActorRef[String] = ChatApp.getReplyActor
    val controller = loader.getController[ChatController]
    if (controller != null) {
      controller.setActorRef(chatActorRef, username, replyActor) // Передаем ActorRef и имя пользователя
    } else {
      println("Ошибка: контроллер равен null.")
    }

    // Настраиваем и показываем основное окно
    primaryStage.setTitle("Chat Application")
    primaryStage.setScene(scene)
    primaryStage.show()
  }
}

ChatController p

ackage by.vdavdov.akka

import akka.actor.typed.ActorRef
import javafx.fxml.FXML
import javafx.scene.control.{Button, ListView, TextArea, TextField}

class ChatController {

  @FXML
  private var chatArea: TextArea = _
  @FXML
  private var messageField: TextField = _
  @FXML
  private var sendButton: Button = _
  @FXML
  private var userList: ListView[String] = _
  @FXML
  private var directMessageButton: Button = _
  private var actorRef: ActorRef[ChatActor.Command] = _
  private var username: String = _
  private var actorRefString: ActorRef[String] = _

  // Метод для получения сообщения из ChatActor
  def receiveMessage(message: String): Unit = {
    appendToChatArea(message)
  }

  // Устанавливаем ActorRef и имя пользователя
  def setActorRef(ref: ActorRef[ChatActor.Command], user: String, stringRef : ActorRef[String]): Unit = {
    actorRef = ref
    username = user
    actorRefString = stringRef

    actorRef ! ChatActor.JoinGroup(username, actorRefString)

    sendButton.setOnAction(_ => {
      val message = messageField.getText
      if (message.nonEmpty) {
        actorRef ! ChatActor.SendMessage(username, message)
        appendToChatArea(s"You: $message")
        messageField.clear()
      }
    })

    directMessageButton.setOnAction(_ => {
      val selectedUser = userList.getSelectionModel.getSelectedItem
      val message = messageField.getText
      if (selectedUser != null && message.nonEmpty) {
        actorRef ! ChatActor.DirectMessage(username, selectedUser, message)
        appendToChatArea(s"Private to $selectedUser: $message")
        messageField.clear()
      }
    })
  }

  private def appendToChatArea(message: String): Unit = {
    chatArea.appendText(message + "\n")
  }

  def leaveGroup(): Unit = {
    actorRef ! ChatActor.LeaveGroup(username)
    appendToChatArea(s"$username has left the chat.")
  }

  def updateUserList(users: List[String]): Unit = {
    userList.getItems.clear()
    users.foreach(userList.getItems.add)
  }
}

ReplyActor

package by.vdavdov.akka

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}

// Актор для обработки сообщений от ChatActor
object ReplyActor {
  def apply(): Behavior[String] = Behaviors.receive { (context, message) =>
    // Здесь вы можете обработать входящие сообщения
    context.log.info(s"Received message from ChatActor: $message")
    Behaviors.same
  }
}

application.conf

akka {
  actor {
    provider = "cluster"
  }

  remote {
    artery {
      enabled = true
      transport = tcp
      canonical.hostname = "127.0.0.1"
      canonical.port = 2552
    }
  }

  cluster {
    seed-nodes = ["akka://[email protected]:2551",
     "akka://[email protected]:2552",
      "akka://[email protected]:2553"]
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    log-info = on
    log-info-verbose = on
  }
}

chat.fxml

<?xml version="1.0" encoding="UTF-8"?>

<?import javafx.scene.control.Button?>
<?import javafx.scene.control.ListView?>
<?import javafx.scene.control.TextArea?>
<?import javafx.scene.control.TextField?>
<?import javafx.scene.layout.AnchorPane?>

<AnchorPane xmlns:fx="http://javafx.com/fxml" fx:controller="by.vdavdov.akka.ChatController">
    <TextArea fx:id="chatArea" layoutX="14.0" layoutY="14.0" prefHeight="400.0" prefWidth="400.0" editable="false"/>
    <TextField fx:id="messageField" layoutX="14.0" layoutY="420.0" prefWidth="300.0"/>
    <Button fx:id="sendButton" layoutX="320.0" layoutY="420.0" text="Send"/>
    <ListView fx:id="userList" layoutX="420.0" layoutY="14.0" prefHeight="400.0" prefWidth="150.0"/>
    <Button fx:id="directMessageButton" text="Direct Message" />
</AnchorPane>

Помогите, пожалуйста, если я расписал что-то не очевидно или требуется добавить больше логирования, просьба написать, не ссылаясь на форму правильного вопроса...пожалуйста ;(


Ответы (0 шт):