[Java, Kotlin] Реактивный масштабируемый чат на Kotlin + Spring + WebSockets

Автор Сообщение
news_bot ®

Стаж: 6 лет 3 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
13-Апр-2021 20:31

Содержание
  • Конфигурация проекта
    • Логгер
    • Домен
    • Маппер
  • Настройка Spring Security
  • Конфигурация веб-сокетов
  • Архитектура решения
  • Реализация
    • Интеграция с Redis
    • Импелементация сервиса
  • Заключение
ПредисловиеВ данном туториале будет рассмотрено создание масштабируемого приложения, подключение и общение с котором происходит по веб-сокетам. Рассмотрим и мужественно преодолеем проблему передачи сообщений между инстансами с помощью месседж брокера. В качестве месседж брокера будет использован Redis.Конфигурация проектаНачнём с самого важного, конфигурации логгера!Конфигурация логгера состоит из того, что нам нужно создать prototype bean, который будет конфигурировать логгер для класса, в которой этот бин инжектится.
@Configuration
class LoggingConfig {
    @Bean
    @Scope("prototype")
    fun logger(injectionPoint: InjectionPoint): Logger {
        return LoggerFactory.getLogger(
                injectionPoint.methodParameter?.containingClass
                        ?: injectionPoint.field?.declaringClass
        )
    }
}
Теперь, чтобы получить сконфигурированный логгер, нам достаточно внедрить его.
@Component
class ChatWebSocketHandlerService(
    private val logger: Logger
)
Далее создадим доменку и сконфигурируем маппер для неёКласс чата содержит базовую информацию, включая участников чата.
data class Chat(
    val chatId: UUID,
    val chatMembers: List<ChatMember>,
    @JsonSerialize(using = LocalDateTimeSerializer::class)
    @JsonDeserialize(using = LocalDateTimeDeserializer::class)
    val createdDate: LocalDateTime,
    var lastMessage: CommonMessage?
)
Класс ChatMember описывает участника чата. Из интересного тут - это флаг deletedChat. Его назначение - убрать чат из выборки списка чатов для пользователя с userId.
data class ChatMember(
        val userId: UUID,
        var fullName: String,
        var avatar: String,
        var deletedChat: Boolean
)
Ниже представлен базовый класс для всех сообщений в чате. Аннотация @JsonTypeInfo тут нужна для того, чтобы классам-наследникам при заворачивании в JSON проставлялось поле @type с указанием типа сообщения, а при разворачивании были проставлены поля базового класса.
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)
open class CommonMessage(
    val messageId: UUID,
    val chatId: UUID,
    val sender: ChatMember,
    @field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class)
    val messageDate: LocalDateTime,
    var seen: Boolean
)
Пример конкретного класса сообщения TextMessage - текстового сообщения
class TextMessage(
    messageId: UUID,
    chatId: UUID,
    sender: ChatMember,
    var content: String,
    messageDate: LocalDateTime,
    seen: Boolean
) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen)
Сконфигурируем ObjectMapperВ registerSubtypes добавляются классы-наследники, которые будут сериализоваться и десериализоваться в JSON. Это необходимо для того, чтобы после десериализации определить тип конкретный тип и передать на обработку в нужный метод
@Configuration
class ObjectMapperConfig {
    @Bean
    fun objectMapper(): ObjectMapper = ObjectMapper()
        .registerModule(JavaTimeModule())
        .registerModule(Jdk8Module())
        .registerModule(ParameterNamesModule())
        .registerModule(KotlinModule())
        .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
        .apply {
            registerSubtypes(
                NamedType(NewMessageEvent::class.java, "NewMessageEvent"),
                NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"),
                NamedType(TextMessage::class.java, "TextMessage"),
                NamedType(ImageMessage::class.java, "ImageMessage")
            )
        }
}
Конфигурация Spring SecurityДля начала нам понадобится ReactiveAuthenticationManager и SecurityContextRepository. Для аутентификации будем использовать JWT, поэтому создаем класс JwtAuthenticationManager со следующим содержанием:
@Component
class JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager {
    override fun authenticate(authentication: Authentication): Mono<Authentication> {
        val token = authentication.credentials.toString()
        val validateToken = jwtUtil.validateToken(token)
        var username: String?
        try {
            username = jwtUtil.extractUsername(token)
        } catch (e: Exception) {
            username = null
            println(e)
        }
        return if (username != null && validateToken) {
            val claims = jwtUtil.getClaimsFromToken(token)
            val role: List<String> = claims["roles"] as List<String>
            val authorities = role.stream()
                    .map { role: String? -> SimpleGrantedAuthority(role) }
                    .collect(Collectors.toList())
            val authenticationToken = UsernamePasswordAuthenticationToken(
                    username,
                    null,
                    authorities
            )
            authenticationToken.details = claims
            Mono.just(authenticationToken)
        } else {
            Mono.empty()
        }
    }
}
Чтобы везде, где необходимо, иметь возможность извлечь информацию из seucirty context, заносим claims в details токена (строка 25).Для извлечения токена из запроса создаем класс SecurityContextRepository. Извлекать токен будем двумя способами:
  • Заголовок Authorization: Bearer ${JWT_TOKEN}
  • GET параметр ?access_token=${JWT_TOKEN}
@Component
class SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository {
    override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> {
        return Mono.error { IllegalStateException("Save method not supported") }
    }
    override fun load(exchange: ServerWebExchange): Mono<SecurityContext> {
        val authHeader = exchange.request
            .headers
            .getFirst(HttpHeaders.AUTHORIZATION)
        val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) {
            authHeader.substring(7)
        } else exchange.request
            .queryParams
            .getFirst("access_token") ?: return Mono.empty()
        val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken)
        return authenticationManager
            .authenticate(auth)
            .map { authentication: Authentication -> SecurityContextImpl(authentication) }
    }
}
Теперь имея два необходимых класса мы можем сконфигурировать сам Spring Security.
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
class SecurityConfig(
    val reactiveAuthenticationManager: ReactiveAuthenticationManager,
    val securityContextRepository: SecurityContextRepository
) {
    @Bean
    fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain {
        return httpSecurity
            .exceptionHandling()
            .authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException ->
                Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED }
            }
            .accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException ->
                Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN }
            }
            .and()
            .csrf().disable()
            .cors().disable()
            .formLogin().disable()
            .httpBasic().disable()
            .authenticationManager(reactiveAuthenticationManager)
            .securityContextRepository(securityContextRepository)
            .authorizeExchange()
            .pathMatchers("/actuator/**").permitAll()
            .pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER")
            .anyExchange().authenticated()
            .and()
            .build()
    }
}
Здесь из интересного: конфигурация позволяет подключиться по путям начинающимся с /ws только аутентифицированным пользователям, у которых есть роль ROLE_USER.С конфигурацией Security закончили, теперь необходимо сконфигурировать подключение по сокетам.Конфигурация веб-сокетовВ первую очередь нам необходимо задать маппинг между запросом и обработчиком. Чтобы добавить обработчик сокетов по определенному адресу, мы делаем следующее:
  • Создаем мапу, где ключ - uri, а значение - обработчик. В этом конкретном случае WebSocketHandler.
  • Создаем обработчик для ранее определенного маппинга и cors.
@Configuration
class ReactiveWebSocketConfig {
    @Bean
    fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping {
        val map: MutableMap<String, WebSocketHandler> = HashMap()
        map["/ws/chat"] = chatWebSocketHandler
        val handlerMapping = SimpleUrlHandlerMapping()
        handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues()))
        handlerMapping.order = 1
        handlerMapping.urlMap = map
        return handlerMapping
    }
    @Bean
    fun handlerAdapter(): WebSocketHandlerAdapter {
        return WebSocketHandlerAdapter()
    }
}
Здесь в качестве обработчика для uri /ws/chat указываем chatWebSocketHandler, его вид представлен ниже, имплементацией займемся позднее. Этот класс реализует интерфейс WebSocketHandler, содержащий один метод handle(session: WebSocketSession): Mono<Void>
@Component
class ChatWebSocketHandler : WebSocketHandler {
    override fun handle(session: WebSocketSession): Mono<Void> {
        TODO("Not yet implemented")
    }
}
С базовой конфигурацией закончили.Поговорим об архитектуре решенияНа данным момент наш чат нельзя масштабировать. Так как подключение по веб-сокету дуплексное и, установив соединение с одним инстансом, все запросы по этому сокету будут направляться на этот инстанс, так при запуске двух и более инстансов у нас не будет доступа до всех имеющихся сессий. Становится возможной ситуация, когда собеседники из одного чата подключены к разным инстансам и нет возможности сразу отправить сообщение по сокетам всем участникам. Для решения необходим Message Broker, который будет получать на вход сообщение для определенного чата и рассылать его на все инстансы. Инстансы посмотрят на сообщение, возьмут информацию об участниках чата из БД, и будут искать этих участников среди своих подключенных пользователей.
Представим, что участники одного чата User 1 и User 2 подключены к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User 2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в чат и будет отправлено в Message broker, оттуда разослано по всем инстансам. Chat-Instance-1 получит это сообщение и увидит, что у него есть User 2, который относится к этому чату и ему необходимо отправить это сообщение.РеализацияТеперь займемся имплементацией нашего обработчика ChatWebSocketHandlerНам понадобится мапа userId => session, для того, чтобы хранить открытые сессии и иметь возможность достать их по userId. Для поддержки одновременной работы с несколькими сессиями из под одного userId интерфейс мапы будет следующим: MutableMap<UUID, LinkedList<WebSocketSession>>.Добавлять в мапу запись мы будем при подписке на стрим session.receive, а подчищать будем в doFinally.В методе getReceiverStream создается стрим-обработчик сообщений, пришедших от клиента. Мы получаем payload как строку и преобразуем его к базовому WebSocketEvent, после чего в зависимости от типа event'a передаем его на обработку в слой сервисов.В методе getSenderStream происходит конфигурация стрима, который занимается отправкой сообщений по сокету клиенту
@Component
class ChatWebSocketHandler(
    val objectMapper: ObjectMapper,
    val logger: Logger,
    val chatService: ChatService,
    val objectStringConverter: ObjectStringConverter,
    val sinkWrapper: SinkWrapper
) : WebSocketHandler {
    private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap()
    override fun handle(session: WebSocketSession): Mono<Void> {
        return ReactiveSecurityContextHolder.getContext()
            .flatMap { ctx ->
                val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString())
                val sender = getSenderStream(session, userId)
                val receiver = getReceiverStream(session, userId)
                return@flatMap Mono.zip(sender, receiver).then()
            }
    }
    private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> {
        return session.receive()
            .filter { it.type == WebSocketMessage.Type.TEXT }
            .map(WebSocketMessage::getPayloadAsText)
            .flatMap {
                objectStringConverter.stringToObject(it, WebSocketEvent::class.java)
            }
            .flatMap { convertedEvent ->
                when (convertedEvent) {
                    is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent)
                    is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId)
                    else -> Mono.error(RuntimeException())
                }
            }
            .onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) }
            .doOnSubscribe {
                val userSession = userIdToSession[userId]
                if (userSession == null) {
                    val newUserSessions = LinkedList<WebSocketSession>()
                    userIdToSession[userId] = newUserSessions
                }
                userIdToSession[userId]?.add(session)
            }
            .doFinally {
                val userSessions = userIdToSession[userId]
                userSessions?.remove(session)
            }
            .then()
    }
    private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> {
        val sendMessage = sinkWrapper.sinks.asFlux()
            .filter { sendTo -> sendTo.userId == userId }
            .map { sendTo -> objectMapper.writeValueAsString(sendTo.event) }
            .map { stringObject -> session.textMessage(stringObject) }
            .doOnError { logger.error("", it) }
        return session.send(sendMessage)
    }
}
Для того чтобы писать в websocket нам необходимо создать поток данных, в который мы сможем добавлять данные. С reactora 3.4 для этого рекомендуется использовать Sinks.Many. Создадим такой поток в классе SinkWrapper.
@Component
class SinkWrapper {
    val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer()
}
Теперь, отправив данные в этот поток, они будут обработаны в потоке, сформированном в getSenderStream.Интеграция с RedisУ Redis есть PUB/SUB модель общения, которая прекрасно решает задачу транслирования сообщений между инстансами. Итак, для приготовления данного блюда нам понадобится:
  • RedisChatMessageListener - подписка на топики и перенаправление сообщение в слой сервисов
  • RedisChatMessagePublisher - публикация сообщений в топики
  • RedisConfig - конфигурация редиса
  • RedisListenerStarter - старт листенеров при старте инстанса
Реализация:RedisConfig стандартный, ничего особенного
@Configuration
class RedisConfig {
    @Bean
    fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory {
        val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port)
        redisStandaloneConfiguration.setPassword(redisProperties.password)
        return LettuceConnectionFactory(redisStandaloneConfiguration)
    }
    @Bean
    fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate {
        return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory)
    }
}
RedisChatMessageListenerЗдесь мы создаем подписку на топик по имени базового класса (обычно название топиков выносят в проперти). Получив сообщение из канала преобразуем его в объект (строка 13) и дальше передаем в sendMessage, который достанет участников чата и попробует разослать им это сообщение, если таковы имеются среди подключенных к инстансу.
@Component
class RedisChatMessageListener(
    private val logger: Logger,
    private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,
    private val objectStringConverter: ObjectStringConverter,
    private val chatService: ChatService
) {
    fun subscribeOnCommonMessageTopic(): Mono<Void> {
        return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name))
            .map { message -> message.message }
            .doOnNext { logger.info("Receive new message: $it") }
            .flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) }
            .flatMap { message ->
                when (message) {
                    is TextMessage -> chatService.sendMessage(message)
                    is ImageMessage -> chatService.sendMessage(message)
                    else -> Mono.error(RuntimeException())
                }
            }
            .then()
    }
}
RedisChatMessagePublisherПаблишер имеет один метод для транслирования CommonMessage на все инстансы. Объект сообщения приводится к строке и публикуется в топик по имени базового класса.
@Component
class RedisChatMessagePublisher(
    val logger: Logger,
    val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,
    val objectStringConverter: ObjectStringConverter
) {
    fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {
        return objectStringConverter.objectToString(commonMessage)
            .flatMap {
                logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}")
                reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it)
            }
            .then()
    }
}
RedisListenerStarterВ этом классе стартуются все листенеры из RedisChatMessageListener. В нашем случае - единственный листенер subscribeOnCommonMessageTopic
@Component
class RedisListenerStarter(
    val logger: Logger,
    val redisChatMessageListener: RedisChatMessageListener
) {
    @Bean
    fun newMessageEventChannelListenerStarter(): ApplicationRunner {
        return ApplicationRunner { args: ApplicationArguments ->
            redisChatMessageListener.subscribeOnCommonMessageTopic()
                .doOnSubscribe { logger.info("Start NewMessageEvent channel listener") }
                .onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) }
                .subscribe()
        }
    }
}
Импелементация сервисаУпрощенная реализация, без сохранения сообщений в БД с замоканным chatRepository. В виду того, что статья выходит и так больше, чем я рассчитывал.Метод handleNewMessageEvent вызывается из WebSocketHandler и получает на вход userId отправителя и NewMessageEvent - простое текстовое сообщение. В методе происходит проверка на то, что отправитель действительно является участником чата и дальше это сообщение транслируется между инстансами.
@Service
class DefaultChatService(
    val logger: Logger,
    val sinkWrapper: SinkWrapper,
    val chatRepository: ChatRepository,
    val redisChatPublisher: RedisChatMessagePublisher
) : ChatService {
    override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> {
        logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent")
        return chatRepository.findById(newMessageEvent.chatId)
            .filter { it.chatMembers.map(ChatMember::userId).contains(senderId) }
            .flatMap { chat ->
                val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false)
                chat.lastMessage = textMessage
                return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage))
            }
            .flatMap { broadcastMessage(it.t2) }
    }
    /**
     * Broadcast the message between instances
     */
    override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {
        return redisChatPublisher.broadcastMessage(commonMessage)
    }
    /**
     * Send the message to all of chatMembers of message chat direct
     */
    override fun sendMessage(message: CommonMessage): Mono<Void> {
        return chatRepository.findById(message.chatId)
            .map { it.chatMembers }
            .flatMapMany { Flux.fromIterable(it) }
            .flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) }
            .then()
    }
    override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> {
        return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) }
            .then()
    }
}
ЗаключениеВ качестве дальнейших доработок можно произвести разделение получаемых и отправляемых ивентов на отдельные классы. Также в месте, где происходит получение сообщения по сокетам от клиента, его приведение к WebSocketEvent и передача в обработчик, можно попробовать избавиться от хардкодного маппинка event => handler. Пока не думал, как это можно сделать красивее, но уверен, что решение есть.Проект на GitHub
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_java, #_kotlin, #_java, #_kotlin, #_spring, #_microservice, #_redis, #_chat, #_java, #_kotlin
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 14-Май 07:41
Часовой пояс: UTC + 5