Scala akka | Проблема с обменом между клиентами сообщениями
Пишу irc p2p chat на scala с akka typed cluster. Проблема в том, что при создании нескольких клиентов, также создаются и новые акторы и соответственно мапа с пользователями создается снова и всегда будет 1 пользователь. Клиенты подключаются к единому кластеру, но более ничего не получается, не могу разобраться, как сделать так, чтобы можно было рассылать общие сообщения между клиентами, без применения сервер-клиент и получился p2p чат, вот ссылка на репозиторий https://github.com/vdavdov/irc-akka-chat
Также мои классы:
ChatActor
package by.vdavdov.akka
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object ChatActor {
sealed trait Command
case class SendMessage(user: String, message: String) extends Command
case class JoinGroup(user: String, replyTo: ActorRef[String]) extends Command
case class PeerJoined(peerAddress: String) extends Command
case class DirectMessage(from: String, to: String, message: String) extends Command
case class LeaveGroup(user: String) extends Command
// Структура для хранения данных пользователя
case class User(ref: ActorRef[String], name: String)
def apply(): Behavior[Command] = Behaviors.setup { context =>
var users = Map.empty[String, User]
Behaviors.receiveMessage {
case SendMessage(user, message) =>
context.log.info(s"Group message from $user: $message")
// Отправляем сообщения всем пользователям
context.log.info(s"Now in group is ${users.keys}")
users.values.foreach { u =>
context.log.info(s"Sending message to ${u.name}: $message")
u.ref ! s"$user: $message"
}
Behaviors.same
case JoinGroup(user, replyTo) =>
context.log.info(s"$user is trying to join the group")
if (!users.contains(user)) {
val newUser = User(replyTo, user)
users += (user -> newUser)
// Уведомляем пользователя о том, что он успешно присоединился
replyTo ! s"$user has joined the chat."
context.log.info(s"$user has joined the group.")
} else {
// Уведомляем пользователя о том, что он уже в чате
replyTo ! s"$user is already in the chat."
context.log.warn(s"$user is already in the chat.")
}
Behaviors.same
case PeerJoined(peerAddress) =>
context.log.info(s"New peer joined: $peerAddress")
Behaviors.same
case DirectMessage(from, to, message) =>
users.get(to) match {
case Some(user) =>
user.ref ! s"Direct message from $from: $message"
context.log.info(s"Direct message from $from to $to: $message")
case None =>
context.log.warn(s"$to is not in the chat, unable to send direct message from $from")
}
Behaviors.same
case LeaveGroup(user) =>
if (users.contains(user)) {
users = users - user
context.log.info(s"$user has left the group.")
} else {
context.log.warn(s"$user attempted to leave the group but was not found.")
}
Behaviors.same
}
}
}
ChatApplication
package by.vdavdov.akka
import akka.actor.Address
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.cluster.typed.{Cluster, Join}
import javafx.application.Application
import javafx.fxml.FXMLLoader
import javafx.scene.Scene
import javafx.stage.Stage
import java.util.UUID
object ChatApp {
// Храним ActorSystem и ActorRef для одного экземпляра
private var system: ActorSystem[ChatActor.Command] = _
private var chatActorRef: ActorRef[ChatActor.Command] = _
def main(args: Array[String]): Unit = {
// Инициализация ActorSystem и ChatActor
system = ActorSystem(ChatActor(), "ChatCluster")
chatActorRef = system.systemActorOf(ChatActor(), "ChatActor")
// Присоединение к кластеру
val cluster = Cluster(system)
cluster.manager ! Join(Address("akka", "ChatCluster", "127.0.0.1", 2552)) // и тут меняю каждый раз
Application.launch(classOf[ChatApplication], args: _*)
}
// Метод для доступа к ActorRef
def getChatActor: ActorRef[ChatActor.Command] = chatActorRef
def getReplyActor: ActorRef[String] = system.systemActorOf(ReplyActor(), "ReplyActor")
}
class ChatApplication extends Application {
override def start(primaryStage: Stage): Unit = {
// Загружаем FXML
val loader = new FXMLLoader(getClass.getResource("/fxml/chat.fxml"))
val scene = new Scene(loader.load())
// Генерируем уникальное имя пользователя
val username = UUID.randomUUID().toString
// Получаем ActorRef для ChatActor
val chatActorRef = ChatApp.getChatActor
// Передаем actorRef и имя пользователя в контроллер UI
val replyActor : ActorRef[String] = ChatApp.getReplyActor
val controller = loader.getController[ChatController]
if (controller != null) {
controller.setActorRef(chatActorRef, username, replyActor) // Передаем ActorRef и имя пользователя
} else {
println("Ошибка: контроллер равен null.")
}
// Настраиваем и показываем основное окно
primaryStage.setTitle("Chat Application")
primaryStage.setScene(scene)
primaryStage.show()
}
}
ChatController p
ackage by.vdavdov.akka
import akka.actor.typed.ActorRef
import javafx.fxml.FXML
import javafx.scene.control.{Button, ListView, TextArea, TextField}
class ChatController {
@FXML
private var chatArea: TextArea = _
@FXML
private var messageField: TextField = _
@FXML
private var sendButton: Button = _
@FXML
private var userList: ListView[String] = _
@FXML
private var directMessageButton: Button = _
private var actorRef: ActorRef[ChatActor.Command] = _
private var username: String = _
private var actorRefString: ActorRef[String] = _
// Метод для получения сообщения из ChatActor
def receiveMessage(message: String): Unit = {
appendToChatArea(message)
}
// Устанавливаем ActorRef и имя пользователя
def setActorRef(ref: ActorRef[ChatActor.Command], user: String, stringRef : ActorRef[String]): Unit = {
actorRef = ref
username = user
actorRefString = stringRef
actorRef ! ChatActor.JoinGroup(username, actorRefString)
sendButton.setOnAction(_ => {
val message = messageField.getText
if (message.nonEmpty) {
actorRef ! ChatActor.SendMessage(username, message)
appendToChatArea(s"You: $message")
messageField.clear()
}
})
directMessageButton.setOnAction(_ => {
val selectedUser = userList.getSelectionModel.getSelectedItem
val message = messageField.getText
if (selectedUser != null && message.nonEmpty) {
actorRef ! ChatActor.DirectMessage(username, selectedUser, message)
appendToChatArea(s"Private to $selectedUser: $message")
messageField.clear()
}
})
}
private def appendToChatArea(message: String): Unit = {
chatArea.appendText(message + "\n")
}
def leaveGroup(): Unit = {
actorRef ! ChatActor.LeaveGroup(username)
appendToChatArea(s"$username has left the chat.")
}
def updateUserList(users: List[String]): Unit = {
userList.getItems.clear()
users.foreach(userList.getItems.add)
}
}
ReplyActor
package by.vdavdov.akka
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
// Актор для обработки сообщений от ChatActor
object ReplyActor {
def apply(): Behavior[String] = Behaviors.receive { (context, message) =>
// Здесь вы можете обработать входящие сообщения
context.log.info(s"Received message from ChatActor: $message")
Behaviors.same
}
}
application.conf
akka {
actor {
provider = "cluster"
}
remote {
artery {
enabled = true
transport = tcp
canonical.hostname = "127.0.0.1"
canonical.port = 2552
}
}
cluster {
seed-nodes = ["akka://[email protected]:2551",
"akka://[email protected]:2552",
"akka://[email protected]:2553"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
log-info = on
log-info-verbose = on
}
}
chat.fxml
<?xml version="1.0" encoding="UTF-8"?>
<?import javafx.scene.control.Button?>
<?import javafx.scene.control.ListView?>
<?import javafx.scene.control.TextArea?>
<?import javafx.scene.control.TextField?>
<?import javafx.scene.layout.AnchorPane?>
<AnchorPane xmlns:fx="http://javafx.com/fxml" fx:controller="by.vdavdov.akka.ChatController">
<TextArea fx:id="chatArea" layoutX="14.0" layoutY="14.0" prefHeight="400.0" prefWidth="400.0" editable="false"/>
<TextField fx:id="messageField" layoutX="14.0" layoutY="420.0" prefWidth="300.0"/>
<Button fx:id="sendButton" layoutX="320.0" layoutY="420.0" text="Send"/>
<ListView fx:id="userList" layoutX="420.0" layoutY="14.0" prefHeight="400.0" prefWidth="150.0"/>
<Button fx:id="directMessageButton" text="Direct Message" />
</AnchorPane>
Помогите, пожалуйста, если я расписал что-то не очевидно или требуется добавить больше логирования, просьба написать, не ссылаясь на форму правильного вопроса...пожалуйста ;(