[Разработка веб-сайтов, Системы обмена сообщениями, Scala, ООП, Функциональное программирование] Изучаю Scala: Часть 4 — WebSocket
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Привет, Хабр! На этот раз я по пробовал сделать простенький чат через ВебСокеты. За подробностями добро пожаловать под кат.
Содержание
- Изучаю Scala: Часть 1 — Игра змейка
- Изучаю Scala: Часть 2 — Todo лист с возможностью загрузки картинок
- Изучаю Scala: Часть 3 — Юнит Тесты
- Изучаю Scala: Часть 4 — WebSocket
Ссылки
Собственно весь код находиться в одном объект ChatHub
class ChatHub[F[_]] private(
val topic: Topic[F, WebSocketFrame],
private val ref: Ref[F, Int]
)
(
implicit concurrent: Concurrent[F],
timer: Timer[F]
) extends Http4sDsl[F] {
val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
.description("Подключает к общему чату")
.in(
stringBody
.description("Сообщение которое будет отправлено пользователям в чате")
.example("Привет!")
)
.out(
stringBody
.description("Сообщение которое кто-то написал в чат")
.example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
)
//Заглушка которая всегда отвечает ошибкой.
.serverLogic(_ => IO(Left(()): Either[Unit, String]))
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
private def logic(): F[Response[F]] = {
val toClient: Stream[F, WebSocketFrame] =
topic.subscribe(1000)
val fromClient: Pipe[F, WebSocketFrame, Unit] =
handle
WebSocketBuilder[F].build(toClient, fromClient)
}
private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
.collect({
case WebSocketFrame.Text(text, _) => text
})
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
.through(topic.publish)
}
object ChatHub {
def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
ref <- Ref.of[F, Int](0)
topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
} yield new ChatHub(topic, ref)
}
Тут надо сразу сказать про Topic — примитив синхронизации из Fs2 который позволяет сделать модель Publisher — Subscriber причем у вас может быть много Publisher и одновременно много Subscriber. Вообще в него лучшее отправлять сообщения через какой-то буфер вроде Queue потому что у него есть ограничения на количество сообщения в очереди и Publisher ждет пока все Subscriber не получат сообщения в свою очередь сообщений и если она переполнена то может и зависнуть.
val topic: Topic[F, WebSocketFrame],
Тут еще я считаю количество сообщений которые были переданы в чат как номер каждого сообщения. Так как это мне нужно делать из разных потоков я использовал аналог Atomic который тут называется Ref и гарантирует атомарность операции.
private val ref: Ref[F, Int]
Обработка потока сообщений от пользователей.
private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =
stream
//Достаем из фрейма текстовое сообщение и фильтруем фреймы.
.collect({
case WebSocketFrame.Text(text, _) => text
})
//Атомарно увеличиваем наш счетчик с сохранением нового значения и добавления его значения к тексту сообщения пользователя.
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//Каждое пришедшее сообщение отправляем в топик
.through(topic.publish)
Собственно сама логика создания сокета.
private def logic(): F[Response[F]] = {
//Откуда получать данные для клиента.
val toClient: Stream[F, WebSocketFrame] =
//Просто подписываемся на данные которые будут приходить в топик
topic.subscribe(1000)
//Что будем делать с данными которые приходить от клиента
val fromClient: Pipe[F, WebSocketFrame, Unit] =
//Просто отправляем данные в топик после обработки
handle
//Создаем веб сокет с созданными ранее генератором и потребителем данных.
WebSocketBuilder[F].build(toClient, fromClient)
}
Связываем наш сокет с роутом на сервере (ws://localhost:8080/chat)
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
Собственно на этом все. Дальше уже можно запускать сервер с этим роутом. Мне еще захотелось какую ни какую документацию сделать. Вообще для документирования WebSocket и прочего основанного на событиях взаимодействия вроде RabbitMQ AMPQ есть AsynAPI но под Tapir там нет ничего поэтому просто сделал для Swagger описание эндпойнта как GET запрос. Работать он конечно не будет. Точнее 501 ошибку будет возвращать зато будет отображаться в Swagger
val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
.description("Подключает к общему чату")
.in(
stringBody
.description("Сообщение которое будет отправлено пользователям в чате")
.example("Привет!")
)
.out(
stringBody
.description("Сообщение которое кто-то написал в чат")
.example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
)
В самом сваггере это выглядит вот так
Подключаем наш чат к нашему серверу API
todosController = new TodosController()
imagesController = new ImagesController()
//Создаем объект нашего чата
chatHub <- Resource.liftF(ChatHub[IO]())
endpoints = todosController.endpoints ::: imagesController.endpoints
//Добавляем его эндпойнт в документацию Swagger
docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
yml: String = docs.toYaml
//Добавляем его маршрут в список маршрутов приложения
routes = chatHub.routeWs <+>
endpoints.toRoutes <+>
new SwaggerHttp4s(yml, "swagger").routes[IO]
httpApp = Router(
"/" -> routes
).orNotFound
blazeServer <- BlazeServerBuilder[IO](serverEc)
.bindHttp(settings.host.port, settings.host.host)
.withHttpApp(httpApp)
.resource
Подключаемся к чату крайне простым скриптом.
<script>
const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
const webSocket = new WebSocket('ws://localhost:8080/chat');
webSocket.onopen = event => {
alert('onopen ');
};
webSocket.onmessage = event => {
console.log(event);
receive(event.data);
};
webSocket.onclose = event => {
alert('onclose ');
};
function send() {
let text = document.getElementById("message");
webSocket.send(`Сообщение от клиента с Id подключения ${id}: ${text.value}`);
text.value = '';
}
function receive(m) {
let text = document.getElementById("chat");
text.value = text.value + '\n\r' + m;
}
</script>
На этом собственно все. Надеюсь кому-то кто тоже изучает скала будет интересна эта статья а может даже полезна.
===========
Источник:
habr.com
===========
Похожие новости:
- [Разработка веб-сайтов] Проектирование сайта — гарантия успешной его реализации. Личный опыт
- [JavaScript, Node.JS, Open source, Разработка веб-сайтов] Сладкая жизнь, или Создание веб-приложения без написания кода
- [Разработка веб-сайтов, Разработка мобильных приложений, Управление разработкой, Микросервисы] Micro-frontend. Асинхронный подход к мультикомандной разработке
- [Разработка веб-сайтов, JavaScript, Клиентская оптимизация, Angular, Разработка под e-commerce] Дружим Angular с Google
- [Rust, ООП] Rust vs. State
- [Разработка веб-сайтов, 1С-Битрикс, Резервное копирование] Инкрементальный бэкап VDS с сайтом на 1С-Битрикс в Яндекс.Облако
- [Разработка веб-сайтов, JavaScript, Программирование, Node.JS] Руководство по Express.js. Часть 1 (перевод)
- [Разработка веб-сайтов, JavaScript, Программирование] Изучаем Parcel — альтернативу Webpack для небольших проектов
- [Разработка веб-сайтов, Программирование, Java] Пишем чат с использованием Spring Boot и WebSockets (перевод)
- [Поисковая оптимизация, Разработка веб-сайтов] Ленивая загрузка для карт
Теги для поиска: #_razrabotka_vebsajtov (Разработка веб-сайтов), #_sistemy_obmena_soobschenijami (Системы обмена сообщениями), #_scala, #_oop (ООП), #_funktsionalnoe_programmirovanie (Функциональное программирование), #_scala, #_functional_programming, #_funktsionalnoe_programmirovanie (функциональное программирование), #_oop (ооп), #_websocket, #_websockets, #_razrabotka_vebsajtov (
Разработка веб-сайтов
), #_sistemy_obmena_soobschenijami (
Системы обмена сообщениями
), #_scala, #_oop (
ООП
), #_funktsionalnoe_programmirovanie (
Функциональное программирование
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 21:10
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Привет, Хабр! На этот раз я по пробовал сделать простенький чат через ВебСокеты. За подробностями добро пожаловать под кат. Содержание
Ссылки Собственно весь код находиться в одном объект ChatHub class ChatHub[F[_]] private(
val topic: Topic[F, WebSocketFrame], private val ref: Ref[F, Int] ) ( implicit concurrent: Concurrent[F], timer: Timer[F] ) extends Http4sDsl[F] { val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint .get .in("chat") .tag("WebSockets") .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat") .description("Подключает к общему чату") .in( stringBody .description("Сообщение которое будет отправлено пользователям в чате") .example("Привет!") ) .out( stringBody .description("Сообщение которое кто-то написал в чат") .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!") ) //Заглушка которая всегда отвечает ошибкой. .serverLogic(_ => IO(Left(()): Either[Unit, String])) def routeWs: HttpRoutes[F] = { HttpRoutes.of[F] { case GET -> Root / "chat" => logic() } } private def logic(): F[Response[F]] = { val toClient: Stream[F, WebSocketFrame] = topic.subscribe(1000) val fromClient: Pipe[F, WebSocketFrame, Unit] = handle WebSocketBuilder[F].build(toClient, fromClient) } private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s .collect({ case WebSocketFrame.Text(text, _) => text }) .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text")))) .through(topic.publish) } object ChatHub { def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for { ref <- Ref.of[F, Int](0) topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("===")) } yield new ChatHub(topic, ref) } Тут надо сразу сказать про Topic — примитив синхронизации из Fs2 который позволяет сделать модель Publisher — Subscriber причем у вас может быть много Publisher и одновременно много Subscriber. Вообще в него лучшее отправлять сообщения через какой-то буфер вроде Queue потому что у него есть ограничения на количество сообщения в очереди и Publisher ждет пока все Subscriber не получат сообщения в свою очередь сообщений и если она переполнена то может и зависнуть. val topic: Topic[F, WebSocketFrame],
Тут еще я считаю количество сообщений которые были переданы в чат как номер каждого сообщения. Так как это мне нужно делать из разных потоков я использовал аналог Atomic который тут называется Ref и гарантирует атомарность операции. private val ref: Ref[F, Int]
Обработка потока сообщений от пользователей. private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =
stream //Достаем из фрейма текстовое сообщение и фильтруем фреймы. .collect({ case WebSocketFrame.Text(text, _) => text }) //Атомарно увеличиваем наш счетчик с сохранением нового значения и добавления его значения к тексту сообщения пользователя. .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text")))) //Каждое пришедшее сообщение отправляем в топик .through(topic.publish) Собственно сама логика создания сокета. private def logic(): F[Response[F]] = {
//Откуда получать данные для клиента. val toClient: Stream[F, WebSocketFrame] = //Просто подписываемся на данные которые будут приходить в топик topic.subscribe(1000) //Что будем делать с данными которые приходить от клиента val fromClient: Pipe[F, WebSocketFrame, Unit] = //Просто отправляем данные в топик после обработки handle //Создаем веб сокет с созданными ранее генератором и потребителем данных. WebSocketBuilder[F].build(toClient, fromClient) } Связываем наш сокет с роутом на сервере (ws://localhost:8080/chat) def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] { case GET -> Root / "chat" => logic() } } Собственно на этом все. Дальше уже можно запускать сервер с этим роутом. Мне еще захотелось какую ни какую документацию сделать. Вообще для документирования WebSocket и прочего основанного на событиях взаимодействия вроде RabbitMQ AMPQ есть AsynAPI но под Tapir там нет ничего поэтому просто сделал для Swagger описание эндпойнта как GET запрос. Работать он конечно не будет. Точнее 501 ошибку будет возвращать зато будет отображаться в Swagger val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
.get .in("chat") .tag("WebSockets") .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat") .description("Подключает к общему чату") .in( stringBody .description("Сообщение которое будет отправлено пользователям в чате") .example("Привет!") ) .out( stringBody .description("Сообщение которое кто-то написал в чат") .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!") ) В самом сваггере это выглядит вот так Подключаем наш чат к нашему серверу API todosController = new TodosController()
imagesController = new ImagesController() //Создаем объект нашего чата chatHub <- Resource.liftF(ChatHub[IO]()) endpoints = todosController.endpoints ::: imagesController.endpoints //Добавляем его эндпойнт в документацию Swagger docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1") yml: String = docs.toYaml //Добавляем его маршрут в список маршрутов приложения routes = chatHub.routeWs <+> endpoints.toRoutes <+> new SwaggerHttp4s(yml, "swagger").routes[IO] httpApp = Router( "/" -> routes ).orNotFound blazeServer <- BlazeServerBuilder[IO](serverEc) .bindHttp(settings.host.port, settings.host.host) .withHttpApp(httpApp) .resource Подключаемся к чату крайне простым скриптом. <script>
const id = `f${(~~(Math.random() * 1e8)).toString(16)}`; const webSocket = new WebSocket('ws://localhost:8080/chat'); webSocket.onopen = event => { alert('onopen '); }; webSocket.onmessage = event => { console.log(event); receive(event.data); }; webSocket.onclose = event => { alert('onclose '); }; function send() { let text = document.getElementById("message"); webSocket.send(`Сообщение от клиента с Id подключения ${id}: ${text.value}`); text.value = ''; } function receive(m) { let text = document.getElementById("chat"); text.value = text.value + '\n\r' + m; } </script> На этом собственно все. Надеюсь кому-то кто тоже изучает скала будет интересна эта статья а может даже полезна. =========== Источник: habr.com =========== Похожие новости:
Разработка веб-сайтов ), #_sistemy_obmena_soobschenijami ( Системы обмена сообщениями ), #_scala, #_oop ( ООП ), #_funktsionalnoe_programmirovanie ( Функциональное программирование ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 21:10
Часовой пояс: UTC + 5