[Java, Kotlin] Реактивный масштабируемый чат на Kotlin + Spring + WebSockets
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Содержание
- Конфигурация проекта
- Логгер
- Домен
- Маппер
- Настройка Spring Security
- Конфигурация веб-сокетов
- Архитектура решения
- Реализация
- Интеграция с Redis
- Импелементация сервиса
- Заключение
ПредисловиеВ данном туториале будет рассмотрено создание масштабируемого приложения, подключение и общение с котором происходит по веб-сокетам. Рассмотрим и мужественно преодолеем проблему передачи сообщений между инстансами с помощью месседж брокера. В качестве месседж брокера будет использован Redis.Конфигурация проектаНачнём с самого важного, конфигурации логгера!Конфигурация логгера состоит из того, что нам нужно создать prototype bean, который будет конфигурировать логгер для класса, в которой этот бин инжектится.
@Configuration
class LoggingConfig {
@Bean
@Scope("prototype")
fun logger(injectionPoint: InjectionPoint): Logger {
return LoggerFactory.getLogger(
injectionPoint.methodParameter?.containingClass
?: injectionPoint.field?.declaringClass
)
}
}
Теперь, чтобы получить сконфигурированный логгер, нам достаточно внедрить его.
@Component
class ChatWebSocketHandlerService(
private val logger: Logger
)
Далее создадим доменку и сконфигурируем маппер для неёКласс чата содержит базовую информацию, включая участников чата.
data class Chat(
val chatId: UUID,
val chatMembers: List<ChatMember>,
@JsonSerialize(using = LocalDateTimeSerializer::class)
@JsonDeserialize(using = LocalDateTimeDeserializer::class)
val createdDate: LocalDateTime,
var lastMessage: CommonMessage?
)
Класс ChatMember описывает участника чата. Из интересного тут - это флаг deletedChat. Его назначение - убрать чат из выборки списка чатов для пользователя с userId.
data class ChatMember(
val userId: UUID,
var fullName: String,
var avatar: String,
var deletedChat: Boolean
)
Ниже представлен базовый класс для всех сообщений в чате. Аннотация @JsonTypeInfo тут нужна для того, чтобы классам-наследникам при заворачивании в JSON проставлялось поле @type с указанием типа сообщения, а при разворачивании были проставлены поля базового класса.
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)
open class CommonMessage(
val messageId: UUID,
val chatId: UUID,
val sender: ChatMember,
@field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class)
val messageDate: LocalDateTime,
var seen: Boolean
)
Пример конкретного класса сообщения TextMessage - текстового сообщения
class TextMessage(
messageId: UUID,
chatId: UUID,
sender: ChatMember,
var content: String,
messageDate: LocalDateTime,
seen: Boolean
) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen)
Сконфигурируем ObjectMapperВ registerSubtypes добавляются классы-наследники, которые будут сериализоваться и десериализоваться в JSON. Это необходимо для того, чтобы после десериализации определить тип конкретный тип и передать на обработку в нужный метод
@Configuration
class ObjectMapperConfig {
@Bean
fun objectMapper(): ObjectMapper = ObjectMapper()
.registerModule(JavaTimeModule())
.registerModule(Jdk8Module())
.registerModule(ParameterNamesModule())
.registerModule(KotlinModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.apply {
registerSubtypes(
NamedType(NewMessageEvent::class.java, "NewMessageEvent"),
NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"),
NamedType(TextMessage::class.java, "TextMessage"),
NamedType(ImageMessage::class.java, "ImageMessage")
)
}
}
Конфигурация Spring SecurityДля начала нам понадобится ReactiveAuthenticationManager и SecurityContextRepository. Для аутентификации будем использовать JWT, поэтому создаем класс JwtAuthenticationManager со следующим содержанием:
@Component
class JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager {
override fun authenticate(authentication: Authentication): Mono<Authentication> {
val token = authentication.credentials.toString()
val validateToken = jwtUtil.validateToken(token)
var username: String?
try {
username = jwtUtil.extractUsername(token)
} catch (e: Exception) {
username = null
println(e)
}
return if (username != null && validateToken) {
val claims = jwtUtil.getClaimsFromToken(token)
val role: List<String> = claims["roles"] as List<String>
val authorities = role.stream()
.map { role: String? -> SimpleGrantedAuthority(role) }
.collect(Collectors.toList())
val authenticationToken = UsernamePasswordAuthenticationToken(
username,
null,
authorities
)
authenticationToken.details = claims
Mono.just(authenticationToken)
} else {
Mono.empty()
}
}
}
Чтобы везде, где необходимо, иметь возможность извлечь информацию из seucirty context, заносим claims в details токена (строка 25).Для извлечения токена из запроса создаем класс SecurityContextRepository. Извлекать токен будем двумя способами:
- Заголовок Authorization: Bearer ${JWT_TOKEN}
- GET параметр ?access_token=${JWT_TOKEN}
@Component
class SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository {
override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> {
return Mono.error { IllegalStateException("Save method not supported") }
}
override fun load(exchange: ServerWebExchange): Mono<SecurityContext> {
val authHeader = exchange.request
.headers
.getFirst(HttpHeaders.AUTHORIZATION)
val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) {
authHeader.substring(7)
} else exchange.request
.queryParams
.getFirst("access_token") ?: return Mono.empty()
val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken)
return authenticationManager
.authenticate(auth)
.map { authentication: Authentication -> SecurityContextImpl(authentication) }
}
}
Теперь имея два необходимых класса мы можем сконфигурировать сам Spring Security.
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
class SecurityConfig(
val reactiveAuthenticationManager: ReactiveAuthenticationManager,
val securityContextRepository: SecurityContextRepository
) {
@Bean
fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain {
return httpSecurity
.exceptionHandling()
.authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException ->
Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED }
}
.accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException ->
Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN }
}
.and()
.csrf().disable()
.cors().disable()
.formLogin().disable()
.httpBasic().disable()
.authenticationManager(reactiveAuthenticationManager)
.securityContextRepository(securityContextRepository)
.authorizeExchange()
.pathMatchers("/actuator/**").permitAll()
.pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER")
.anyExchange().authenticated()
.and()
.build()
}
}
Здесь из интересного: конфигурация позволяет подключиться по путям начинающимся с /ws только аутентифицированным пользователям, у которых есть роль ROLE_USER.С конфигурацией Security закончили, теперь необходимо сконфигурировать подключение по сокетам.Конфигурация веб-сокетовВ первую очередь нам необходимо задать маппинг между запросом и обработчиком. Чтобы добавить обработчик сокетов по определенному адресу, мы делаем следующее:
- Создаем мапу, где ключ - uri, а значение - обработчик. В этом конкретном случае WebSocketHandler.
- Создаем обработчик для ранее определенного маппинга и cors.
@Configuration
class ReactiveWebSocketConfig {
@Bean
fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping {
val map: MutableMap<String, WebSocketHandler> = HashMap()
map["/ws/chat"] = chatWebSocketHandler
val handlerMapping = SimpleUrlHandlerMapping()
handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues()))
handlerMapping.order = 1
handlerMapping.urlMap = map
return handlerMapping
}
@Bean
fun handlerAdapter(): WebSocketHandlerAdapter {
return WebSocketHandlerAdapter()
}
}
Здесь в качестве обработчика для uri /ws/chat указываем chatWebSocketHandler, его вид представлен ниже, имплементацией займемся позднее. Этот класс реализует интерфейс WebSocketHandler, содержащий один метод handle(session: WebSocketSession): Mono<Void>
@Component
class ChatWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
TODO("Not yet implemented")
}
}
С базовой конфигурацией закончили.Поговорим об архитектуре решенияНа данным момент наш чат нельзя масштабировать. Так как подключение по веб-сокету дуплексное и, установив соединение с одним инстансом, все запросы по этому сокету будут направляться на этот инстанс, так при запуске двух и более инстансов у нас не будет доступа до всех имеющихся сессий. Становится возможной ситуация, когда собеседники из одного чата подключены к разным инстансам и нет возможности сразу отправить сообщение по сокетам всем участникам. Для решения необходим Message Broker, который будет получать на вход сообщение для определенного чата и рассылать его на все инстансы. Инстансы посмотрят на сообщение, возьмут информацию об участниках чата из БД, и будут искать этих участников среди своих подключенных пользователей.
Представим, что участники одного чата User 1 и User 2 подключены к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User 2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в чат и будет отправлено в Message broker, оттуда разослано по всем инстансам. Chat-Instance-1 получит это сообщение и увидит, что у него есть User 2, который относится к этому чату и ему необходимо отправить это сообщение.РеализацияТеперь займемся имплементацией нашего обработчика ChatWebSocketHandlerНам понадобится мапа userId => session, для того, чтобы хранить открытые сессии и иметь возможность достать их по userId. Для поддержки одновременной работы с несколькими сессиями из под одного userId интерфейс мапы будет следующим: MutableMap<UUID, LinkedList<WebSocketSession>>.Добавлять в мапу запись мы будем при подписке на стрим session.receive, а подчищать будем в doFinally.В методе getReceiverStream создается стрим-обработчик сообщений, пришедших от клиента. Мы получаем payload как строку и преобразуем его к базовому WebSocketEvent, после чего в зависимости от типа event'a передаем его на обработку в слой сервисов.В методе getSenderStream происходит конфигурация стрима, который занимается отправкой сообщений по сокету клиенту
@Component
class ChatWebSocketHandler(
val objectMapper: ObjectMapper,
val logger: Logger,
val chatService: ChatService,
val objectStringConverter: ObjectStringConverter,
val sinkWrapper: SinkWrapper
) : WebSocketHandler {
private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap()
override fun handle(session: WebSocketSession): Mono<Void> {
return ReactiveSecurityContextHolder.getContext()
.flatMap { ctx ->
val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString())
val sender = getSenderStream(session, userId)
val receiver = getReceiverStream(session, userId)
return@flatMap Mono.zip(sender, receiver).then()
}
}
private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> {
return session.receive()
.filter { it.type == WebSocketMessage.Type.TEXT }
.map(WebSocketMessage::getPayloadAsText)
.flatMap {
objectStringConverter.stringToObject(it, WebSocketEvent::class.java)
}
.flatMap { convertedEvent ->
when (convertedEvent) {
is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent)
is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId)
else -> Mono.error(RuntimeException())
}
}
.onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) }
.doOnSubscribe {
val userSession = userIdToSession[userId]
if (userSession == null) {
val newUserSessions = LinkedList<WebSocketSession>()
userIdToSession[userId] = newUserSessions
}
userIdToSession[userId]?.add(session)
}
.doFinally {
val userSessions = userIdToSession[userId]
userSessions?.remove(session)
}
.then()
}
private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> {
val sendMessage = sinkWrapper.sinks.asFlux()
.filter { sendTo -> sendTo.userId == userId }
.map { sendTo -> objectMapper.writeValueAsString(sendTo.event) }
.map { stringObject -> session.textMessage(stringObject) }
.doOnError { logger.error("", it) }
return session.send(sendMessage)
}
}
Для того чтобы писать в websocket нам необходимо создать поток данных, в который мы сможем добавлять данные. С reactora 3.4 для этого рекомендуется использовать Sinks.Many. Создадим такой поток в классе SinkWrapper.
@Component
class SinkWrapper {
val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer()
}
Теперь, отправив данные в этот поток, они будут обработаны в потоке, сформированном в getSenderStream.Интеграция с RedisУ Redis есть PUB/SUB модель общения, которая прекрасно решает задачу транслирования сообщений между инстансами. Итак, для приготовления данного блюда нам понадобится:
- RedisChatMessageListener - подписка на топики и перенаправление сообщение в слой сервисов
- RedisChatMessagePublisher - публикация сообщений в топики
- RedisConfig - конфигурация редиса
- RedisListenerStarter - старт листенеров при старте инстанса
Реализация:RedisConfig стандартный, ничего особенного
@Configuration
class RedisConfig {
@Bean
fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory {
val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port)
redisStandaloneConfiguration.setPassword(redisProperties.password)
return LettuceConnectionFactory(redisStandaloneConfiguration)
}
@Bean
fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate {
return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory)
}
}
RedisChatMessageListenerЗдесь мы создаем подписку на топик по имени базового класса (обычно название топиков выносят в проперти). Получив сообщение из канала преобразуем его в объект (строка 13) и дальше передаем в sendMessage, который достанет участников чата и попробует разослать им это сообщение, если таковы имеются среди подключенных к инстансу.
@Component
class RedisChatMessageListener(
private val logger: Logger,
private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,
private val objectStringConverter: ObjectStringConverter,
private val chatService: ChatService
) {
fun subscribeOnCommonMessageTopic(): Mono<Void> {
return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name))
.map { message -> message.message }
.doOnNext { logger.info("Receive new message: $it") }
.flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) }
.flatMap { message ->
when (message) {
is TextMessage -> chatService.sendMessage(message)
is ImageMessage -> chatService.sendMessage(message)
else -> Mono.error(RuntimeException())
}
}
.then()
}
}
RedisChatMessagePublisherПаблишер имеет один метод для транслирования CommonMessage на все инстансы. Объект сообщения приводится к строке и публикуется в топик по имени базового класса.
@Component
class RedisChatMessagePublisher(
val logger: Logger,
val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,
val objectStringConverter: ObjectStringConverter
) {
fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {
return objectStringConverter.objectToString(commonMessage)
.flatMap {
logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}")
reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it)
}
.then()
}
}
RedisListenerStarterВ этом классе стартуются все листенеры из RedisChatMessageListener. В нашем случае - единственный листенер subscribeOnCommonMessageTopic
@Component
class RedisListenerStarter(
val logger: Logger,
val redisChatMessageListener: RedisChatMessageListener
) {
@Bean
fun newMessageEventChannelListenerStarter(): ApplicationRunner {
return ApplicationRunner { args: ApplicationArguments ->
redisChatMessageListener.subscribeOnCommonMessageTopic()
.doOnSubscribe { logger.info("Start NewMessageEvent channel listener") }
.onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) }
.subscribe()
}
}
}
Импелементация сервисаУпрощенная реализация, без сохранения сообщений в БД с замоканным chatRepository. В виду того, что статья выходит и так больше, чем я рассчитывал.Метод handleNewMessageEvent вызывается из WebSocketHandler и получает на вход userId отправителя и NewMessageEvent - простое текстовое сообщение. В методе происходит проверка на то, что отправитель действительно является участником чата и дальше это сообщение транслируется между инстансами.
@Service
class DefaultChatService(
val logger: Logger,
val sinkWrapper: SinkWrapper,
val chatRepository: ChatRepository,
val redisChatPublisher: RedisChatMessagePublisher
) : ChatService {
override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> {
logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent")
return chatRepository.findById(newMessageEvent.chatId)
.filter { it.chatMembers.map(ChatMember::userId).contains(senderId) }
.flatMap { chat ->
val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false)
chat.lastMessage = textMessage
return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage))
}
.flatMap { broadcastMessage(it.t2) }
}
/**
* Broadcast the message between instances
*/
override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {
return redisChatPublisher.broadcastMessage(commonMessage)
}
/**
* Send the message to all of chatMembers of message chat direct
*/
override fun sendMessage(message: CommonMessage): Mono<Void> {
return chatRepository.findById(message.chatId)
.map { it.chatMembers }
.flatMapMany { Flux.fromIterable(it) }
.flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) }
.then()
}
override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> {
return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) }
.then()
}
}
ЗаключениеВ качестве дальнейших доработок можно произвести разделение получаемых и отправляемых ивентов на отдельные классы. Также в месте, где происходит получение сообщения по сокетам от клиента, его приведение к WebSocketEvent и передача в обработчик, можно попробовать избавиться от хардкодного маппинка event => handler. Пока не думал, как это можно сделать красивее, но уверен, что решение есть.Проект на GitHub
===========
Источник:
habr.com
===========
Похожие новости:
- [JavaScript, Программирование, Клиентская оптимизация, Математика] Кэширование данных увеличивает скорость даже в неожиданных случаях
- [JavaScript, Node.JS, Управление разработкой] Крупные компании, использующие Node.js (перевод)
- [Kotlin, Карьера в IT-индустрии, Конференции, Kubernetes] 22 апреля — новый QIWI Server Party
- [Программирование, Разработка мобильных приложений, Учебный процесс в IT, Карьера в IT-индустрии] Апрельский дайджест: приглашаем на онлайн-практикумы и митапы
- [JavaScript, ReactJS] Исходники React.memo или что такое SimpleMemo
- [Разработка веб-сайтов, JavaScript, Отладка, Браузеры] Используй console.log () как про (перевод)
- [Информационная безопасность, Разработка веб-сайтов, JavaScript, CTF] Как хакнуть Github и заработать $35000? (перевод)
- [Java] What can we do with Java16? Краткий обзор нового релиза JDK (март 2021)
- [JavaScript, Программирование, Совершенный код] Стек вызовов JavaScript и ещё большая магия
- [Разработка веб-сайтов, JavaScript, Программирование] Создаем Booking приложение с Webix UI
Теги для поиска: #_java, #_kotlin, #_java, #_kotlin, #_spring, #_microservice, #_redis, #_chat, #_java, #_kotlin
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:51
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Содержание
@Configuration
class LoggingConfig { @Bean @Scope("prototype") fun logger(injectionPoint: InjectionPoint): Logger { return LoggerFactory.getLogger( injectionPoint.methodParameter?.containingClass ?: injectionPoint.field?.declaringClass ) } } @Component
class ChatWebSocketHandlerService( private val logger: Logger ) data class Chat(
val chatId: UUID, val chatMembers: List<ChatMember>, @JsonSerialize(using = LocalDateTimeSerializer::class) @JsonDeserialize(using = LocalDateTimeDeserializer::class) val createdDate: LocalDateTime, var lastMessage: CommonMessage? ) data class ChatMember(
val userId: UUID, var fullName: String, var avatar: String, var deletedChat: Boolean ) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)
open class CommonMessage( val messageId: UUID, val chatId: UUID, val sender: ChatMember, @field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class) val messageDate: LocalDateTime, var seen: Boolean ) class TextMessage(
messageId: UUID, chatId: UUID, sender: ChatMember, var content: String, messageDate: LocalDateTime, seen: Boolean ) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen) @Configuration
class ObjectMapperConfig { @Bean fun objectMapper(): ObjectMapper = ObjectMapper() .registerModule(JavaTimeModule()) .registerModule(Jdk8Module()) .registerModule(ParameterNamesModule()) .registerModule(KotlinModule()) .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) .apply { registerSubtypes( NamedType(NewMessageEvent::class.java, "NewMessageEvent"), NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"), NamedType(TextMessage::class.java, "TextMessage"), NamedType(ImageMessage::class.java, "ImageMessage") ) } } @Component
class JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager { override fun authenticate(authentication: Authentication): Mono<Authentication> { val token = authentication.credentials.toString() val validateToken = jwtUtil.validateToken(token) var username: String? try { username = jwtUtil.extractUsername(token) } catch (e: Exception) { username = null println(e) } return if (username != null && validateToken) { val claims = jwtUtil.getClaimsFromToken(token) val role: List<String> = claims["roles"] as List<String> val authorities = role.stream() .map { role: String? -> SimpleGrantedAuthority(role) } .collect(Collectors.toList()) val authenticationToken = UsernamePasswordAuthenticationToken( username, null, authorities ) authenticationToken.details = claims Mono.just(authenticationToken) } else { Mono.empty() } } }
@Component
class SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository { override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> { return Mono.error { IllegalStateException("Save method not supported") } } override fun load(exchange: ServerWebExchange): Mono<SecurityContext> { val authHeader = exchange.request .headers .getFirst(HttpHeaders.AUTHORIZATION) val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) { authHeader.substring(7) } else exchange.request .queryParams .getFirst("access_token") ?: return Mono.empty() val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken) return authenticationManager .authenticate(auth) .map { authentication: Authentication -> SecurityContextImpl(authentication) } } } @EnableWebFluxSecurity
@EnableReactiveMethodSecurity class SecurityConfig( val reactiveAuthenticationManager: ReactiveAuthenticationManager, val securityContextRepository: SecurityContextRepository ) { @Bean fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain { return httpSecurity .exceptionHandling() .authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException -> Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED } } .accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException -> Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN } } .and() .csrf().disable() .cors().disable() .formLogin().disable() .httpBasic().disable() .authenticationManager(reactiveAuthenticationManager) .securityContextRepository(securityContextRepository) .authorizeExchange() .pathMatchers("/actuator/**").permitAll() .pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER") .anyExchange().authenticated() .and() .build() } }
@Configuration
class ReactiveWebSocketConfig { @Bean fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping { val map: MutableMap<String, WebSocketHandler> = HashMap() map["/ws/chat"] = chatWebSocketHandler val handlerMapping = SimpleUrlHandlerMapping() handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues())) handlerMapping.order = 1 handlerMapping.urlMap = map return handlerMapping } @Bean fun handlerAdapter(): WebSocketHandlerAdapter { return WebSocketHandlerAdapter() } } @Component
class ChatWebSocketHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { TODO("Not yet implemented") } } Представим, что участники одного чата User 1 и User 2 подключены к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User 2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в чат и будет отправлено в Message broker, оттуда разослано по всем инстансам. Chat-Instance-1 получит это сообщение и увидит, что у него есть User 2, который относится к этому чату и ему необходимо отправить это сообщение.РеализацияТеперь займемся имплементацией нашего обработчика ChatWebSocketHandlerНам понадобится мапа userId => session, для того, чтобы хранить открытые сессии и иметь возможность достать их по userId. Для поддержки одновременной работы с несколькими сессиями из под одного userId интерфейс мапы будет следующим: MutableMap<UUID, LinkedList<WebSocketSession>>.Добавлять в мапу запись мы будем при подписке на стрим session.receive, а подчищать будем в doFinally.В методе getReceiverStream создается стрим-обработчик сообщений, пришедших от клиента. Мы получаем payload как строку и преобразуем его к базовому WebSocketEvent, после чего в зависимости от типа event'a передаем его на обработку в слой сервисов.В методе getSenderStream происходит конфигурация стрима, который занимается отправкой сообщений по сокету клиенту @Component
class ChatWebSocketHandler( val objectMapper: ObjectMapper, val logger: Logger, val chatService: ChatService, val objectStringConverter: ObjectStringConverter, val sinkWrapper: SinkWrapper ) : WebSocketHandler { private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap() override fun handle(session: WebSocketSession): Mono<Void> { return ReactiveSecurityContextHolder.getContext() .flatMap { ctx -> val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString()) val sender = getSenderStream(session, userId) val receiver = getReceiverStream(session, userId) return@flatMap Mono.zip(sender, receiver).then() } } private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> { return session.receive() .filter { it.type == WebSocketMessage.Type.TEXT } .map(WebSocketMessage::getPayloadAsText) .flatMap { objectStringConverter.stringToObject(it, WebSocketEvent::class.java) } .flatMap { convertedEvent -> when (convertedEvent) { is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent) is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId) else -> Mono.error(RuntimeException()) } } .onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) } .doOnSubscribe { val userSession = userIdToSession[userId] if (userSession == null) { val newUserSessions = LinkedList<WebSocketSession>() userIdToSession[userId] = newUserSessions } userIdToSession[userId]?.add(session) } .doFinally { val userSessions = userIdToSession[userId] userSessions?.remove(session) } .then() } private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> { val sendMessage = sinkWrapper.sinks.asFlux() .filter { sendTo -> sendTo.userId == userId } .map { sendTo -> objectMapper.writeValueAsString(sendTo.event) } .map { stringObject -> session.textMessage(stringObject) } .doOnError { logger.error("", it) } return session.send(sendMessage) } } @Component
class SinkWrapper { val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer() }
@Configuration
class RedisConfig { @Bean fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory { val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port) redisStandaloneConfiguration.setPassword(redisProperties.password) return LettuceConnectionFactory(redisStandaloneConfiguration) } @Bean fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate { return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory) } } @Component
class RedisChatMessageListener( private val logger: Logger, private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate, private val objectStringConverter: ObjectStringConverter, private val chatService: ChatService ) { fun subscribeOnCommonMessageTopic(): Mono<Void> { return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name)) .map { message -> message.message } .doOnNext { logger.info("Receive new message: $it") } .flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) } .flatMap { message -> when (message) { is TextMessage -> chatService.sendMessage(message) is ImageMessage -> chatService.sendMessage(message) else -> Mono.error(RuntimeException()) } } .then() } } @Component
class RedisChatMessagePublisher( val logger: Logger, val reactiveStringRedisTemplate: ReactiveStringRedisTemplate, val objectStringConverter: ObjectStringConverter ) { fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> { return objectStringConverter.objectToString(commonMessage) .flatMap { logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}") reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it) } .then() } } @Component
class RedisListenerStarter( val logger: Logger, val redisChatMessageListener: RedisChatMessageListener ) { @Bean fun newMessageEventChannelListenerStarter(): ApplicationRunner { return ApplicationRunner { args: ApplicationArguments -> redisChatMessageListener.subscribeOnCommonMessageTopic() .doOnSubscribe { logger.info("Start NewMessageEvent channel listener") } .onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) } .subscribe() } } } @Service
class DefaultChatService( val logger: Logger, val sinkWrapper: SinkWrapper, val chatRepository: ChatRepository, val redisChatPublisher: RedisChatMessagePublisher ) : ChatService { override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> { logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent") return chatRepository.findById(newMessageEvent.chatId) .filter { it.chatMembers.map(ChatMember::userId).contains(senderId) } .flatMap { chat -> val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false) chat.lastMessage = textMessage return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage)) } .flatMap { broadcastMessage(it.t2) } } /** * Broadcast the message between instances */ override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> { return redisChatPublisher.broadcastMessage(commonMessage) } /** * Send the message to all of chatMembers of message chat direct */ override fun sendMessage(message: CommonMessage): Mono<Void> { return chatRepository.findById(message.chatId) .map { it.chatMembers } .flatMapMany { Flux.fromIterable(it) } .flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) } .then() } override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> { return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) } .then() } } =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:51
Часовой пояс: UTC + 5