Всем привет! Многие знают что такое Redis, а если вы не знаете, то официальный сайт может ввести вас в курс дела. Большинство знают Redis как кэш и иногда как очередь сообщений. Но что если немножко сойти с ума и попытаться спроектировать целое приложение, используя в качестве хранилища данных только лишь Redis? Какие задачи можно решить с использованием Redis'а?

В этой статье мы и попытаемся ответить на эти вопросы.

Не переключайтесь.

Чего не будет в этой статье?

  • В ней не будет подробного рассказа про каждую структуру данных Redis’а. Для этого лучше поискать отдельные статьи или почитать документацию.

  • Так же тут не будет продакшн реди кода, который вы могли бы использовать в своей работе.

Что будет в этой статье?

  • Будут использованы различные структуры данных Redis’а на примере реализации аналога дейтинг приложения.

  • Будут примеры кода на kotlin + spring boot.

Учимся создавать и запрашивать профили пользователей

Давайте первым шагом научимся создавать карточки с нашими пользователями: имена, лайки т.е. их профили по сути.

Для этого нам пригодится простое key value хранилище. Как это сделать?

Очень просто, у редиса есть структура данных - hash. По сути своей это просто всем нам привычная hash map’а.

Достаешь по ключу. Кладешь по ключу. Все просто.

Команды к редису напрямую можно найти тут и тут. в документации даже есть интерактивное окошко чтобы прямо на сайте попробовать выполнить эти команды. А со всем списком команд можно ознакомиться вот тут.

Аналогичные ссылки работают и для всех последующих команд, которые мы будем рассматривать.

В коде же мы используем RedisTemplate и используем его практически везде. Это такая базовая вещь в экосистеме Spring'а для работы с Redis.

Единственное отличие тут от map’ы, что первым аргументом мы передаем так называемый field, т.е. наш hash с которым мы будем работать.

fun addUser(user: User) {
  val hashOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
	hashOps.put(Constants.USERS, user.name, user)
}

fun getUser(userId: String): User {
	val userOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
	return userOps.get(Constants.USERS, userId)?: throw NotFoundException("Not found user by $userId")
}

Собственно выше приведен пример того, как это может выглядеть на kotlin с использованием библиотеки Spring’а.

На протяжении всей статьи будут использоваться аналогичные куски кода из github.

Обновляем лайки пользователей с использованием Redis листов

Отлично, теперь у нас есть пользователи с информация про лайки.

Теперь нужно найти способ как правильно обновлять эти лайки. Мы предполагаем что события могут происходить очень часто. И поэтому давайте используем асинхронную схему обновления через некую очередь. А информацию из очереди мы будем вычитывать по расписанию.

У редиса есть листы. И вот этот набор команд. Redis lists можно использовать и как очередь fifo, и как стек lifo.

В Spring’е работаем все по той же схеме, достаем необходимые operations, в этот раз нам нужны ListOperations из RedisTemplate.

Чтобы записать в list, нам нужно записать вправо, поскольку тут мы имитируем fifo очередь справа налево.

fun putUserLike(userFrom: String, userTo: String, like: Boolean) {
  val userLike = UserLike(userFrom, userTo, like)
  val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
	listOps.rightPush(Constants.USER_LIKES, userLike)
}

И теперь по расписанию будем запускать нашу job'у и выполнять в ней уже необходимую нам логику.

По сути своей мы просто перекладываем информацию из одного типа данных редиса в другой. Этого нам достаточно для наглядного примера.

fun processUserLikes() {
  val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
	userLikes.forEach{updateUserLike(it)}
}

Обновление нашего юзера максимально простое, привет HashOperations из предыдущего раздела.

private fun updateUserLike(userLike: UserLike) {
  val userOps: HashOperations<String, String, User> = userLikeRedisTemplate.opsForHash()
  val fromUser = userOps.get(Constants.USERS, userLike.fromUserId)?: throw UserNotFoundException(userLike.fromUserId) 
  fromUser.fromLikes.add(userLike)
  val toUser = userOps.get(Constants.USERS, userLike.toUserId)?: throw UserNotFoundException(userLike.toUserId)
  toUser.fromLikes.add(userLike)

  userOps.putAll(Constants.USERS, mapOf(userLike.fromUserId to fromUser, userLike.toUserId to toUser))
}

Достаем значения из списка мы соответсвенно слева и мы хотим достать не одно значение, а сразу несколько и для этого используем метод range. И тут есть важный момент. С помощью range мы лишь получим данные из списка, но не удалим их.

Поэтому после получения наших данных нужно отдельной командой удалить их используя уже операцию trim(кажется тут уже могут возникнуть вопросы).

private fun getUserLikesLast(number: Long): List<UserLike> {
  val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
  return (listOps.range(Constants.USER_LIKES, 0, number)?:mutableListOf()).filterIsInstance(UserLike::class.java)
    .also{ listOps.trim(Constants.USER_LIKES, number, -1) }
}

А вопросы, которые могут тут возникнуть это:

  • А как доставать данные из списка в несколько потоков?

  • И как гарантировать, что данные не потеряются в случае ошибки?

Такой возможности нет из коробки. Нужно получать данные из списка в одном потоке, а все возникающие нюансы обрабатывать самостоятельно.

Отправляем пуши пользователям с использованием pub/sub

Движемся, не останавливаемся!

У нас уже есть профили пользователей, и мы разобрались как обрабатывать поток лайков от этих самых пользователей.

Но что делать, если в случае взаимного лайка мы хотим сразу отправить push уведомление пользователю?

У нас уже есть асинхронный процесс обработки лайков, так давайте туда и встроим отправку push уведомлений. Отправлять мы их будем конечно по web socket’ам, и можно прям сразу как достали наш лайк, отправить его по вебсокету.

Но допустим, что мы хотим выполнить над ним некую долго-работающую логику или просто передать работу с вебсокетами одному компоненту, то как нам обойти новые ограничения?

Мы возьмем и вновь переложим наши данные из одного типа данных Redis'а (list) в другой тип данных Redis'а (pub/sub).

fun processUserLikes() {
  val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
  pushLikesToUsers(userLikes)
  userLikes.forEach{updateUserLike(it)}
}

private fun pushLikesToUsers(userLikes: List<UserLike>) {
  GlobalScope.launch(Dispatchers.IO){
		userLikes.forEach {
			pushProducer.publish(it)
		}
  }
}
@Component
class PushProducer(val redisTemplate: RedisTemplate<String, String>, val pushTopic: ChannelTopic, val objectMapper: ObjectMapper) {

    fun publish(userLike: UserLike) {
        redisTemplate.convertAndSend(pushTopic.topic, objectMapper.writeValueAsString(userLike))
    }
}

Привязка слушателя к топику располагается в конфигурации.

Таким образом мы можем вынести нашего подписчика в отдельный сервис.

@Component
class PushListener(val objectMapper: ObjectMapper): MessageListener {
    private val log = KotlinLogging.logger {}

    override fun onMessage(userLikeMessage: Message, pattern: ByteArray?) {
        // websocket functionality would be here
        log.info("Received: ${objectMapper.readValue(userLikeMessage.body, UserLike::class.java)}")
    }
}

Находим ближайших пользователей через geo операции

С лайками разобрались, но, что за тиндер без возможности находить ближайших пользователей к заданной точке.

GeoOperations - наше решение. Мы будем хранить key value пары, id пользователя и его координаты.

А использовать мы будем метод radius передавая id пользователя, относительно которого будем искать, и соответственно сам радиус поиска.

Redis возвращает результат включая user id, которое мы передали

fun getNearUserIds(userId: String, distance: Double = 1000.0): List<String> {
    val geoOps: GeoOperations<String, String> = stringRedisTemplate.opsForGeo()
    return geoOps.radius(USER_GEO_POINT, userId, Distance(distance, RedisGeoCommands.DistanceUnit.KILOMETERS))
        ?.content?.map{ it.content.name}?.filter{ it!= userId}?:listOf()
}

Обновляем расположение пользователей через streams

Почти все, что необходимо реализовали, но у нас снова ситуация как с лайками пользователей, только теоретически вызовов метода изменения координат пользователя может происходить на много чаще.

Поэтому нам снова нужна очередь, но хорошо бы иметь что-то более масштабируемое.

Redis streams могут помочь в решении этой проблемы.

Вероятно многие знают про то, что такое Kafka и что есть Kafka streams, но это совершенно разные вещи. А вот сама концепция Kafka это как раз очень похоже на Redis streams. Это так же write ahead log структура данных, у которой есть consumer group и offset.

Это уже более сложная структура данных, но она позволяет в отличии от lists отрабатывать данные параллельно и используя реактивный подход.

Для подробностей стоит обратиться к документации.

В Spring для работы со структурами данных Redis'а есть ReactiveRedisTemplate и RedisTemplate. Чтобы записать значение нам более удобно было бы использовать RedisTemplate, а для чтения уже ReactiveRedisTemplate. Если речь про streams. Но в таком случаи ничего работать не будет.

Если кто-то в курсе про то из-за чего это так работает, из-за Spring или Redis, то напишите в комментариях.

fun publishUserPoint(userPoint: UserPoint) {
  val userPointRecord = ObjectRecord.create(USER_GEO_STREAM_NAME, userPoint)
  reactiveRedisTemplate
    .opsForStream<String, Any>()
    .add(userPointRecord)
    .subscribe{println("Send RecordId: $it")}
}

А наш метод слушателя будет выглядеть следующим образом:

@Service
class UserPointsConsumer(
  private val userGeoService: UserGeoService
): StreamListener<String, ObjectRecord<String, UserPoint>> {
  
  override fun onMessage(record: ObjectRecord<String, UserPoint>) {
      userGeoService.addUserPoint(record.value)
  }
}

Тут мы просто перемещаем наши данные уже в структуру данных для geo поиска.

Считаем уникальные сеансы с использованием HyperLogLog

Ну и напоследок давайте представим, что нам нужно посчитать сколько пользователей зашли в приложение за день.

Причем условимся, что пользователей у нас может быть оооочень много. Поэтому простой вариант через hash map нам не подходит из-за того, что будет потреблять много памяти. Как мы это можем сделать используя меньше ресурсов?

Тут на сцену выходит такая вероятностная структура данных как hyper log log. Интересующиеся могут почитать подробнее про нее уже самостоятельно в википедии. Ключевая ее особенность в том, что эта структура данных позволяет нам решать нашу задачу с использованием значительно меньшего количества памяти, чем вариант с hash map.

fun uniqueActivitiesPerDay(): Long {
  val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
  return hyperLogLogOps.size(Constants.TODAY_ACTIVITIES)
}

fun userOpenApp(userId: String): Long {
  val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
  return hyperLogLogOps.add(Constants.TODAY_ACTIVITIES, userId)
}

Заключение

В данной статье мы рассмотрели различные структуры данных Redis’а. Включая geo операции и hyper log log, о которых мало кто знает. Использовали их для решения реальных задач.

Спроектировали по сути целый Tinder, можно и в FAANG после такого :-) Попутно подсветили основные нюансы и проблемы, с которыми можно столкнуться при работе с Redis.

Redis очень функциональное хранилище данных и возможно, если в вашей инфраструктуре он уже есть, то стоит посмотреть, какие ещё свои задачи вы можете решать используя уже то, что у вас и так есть без лишних усложнений.

P.S.:

Все примеры кода можно найти на github.

Пишите в комментарии, если что-то не достаточно точно описано.

Так же пишите как вам вообще подобный формат описания того как использовать ту или иную технологию.

И подписывайтесь в твиттере ???? @de____ro

Комментарии (19)


  1. korsetlr473
    05.01.2022 01:47
    +2

    и сколько выдержит ситуация когда ЛедиГага за 1 час получает около 10 млн лайков на 1 пост ?


    1. BugM
      05.01.2022 02:08
      +2

      2.7к рпс. Выглядит нестрашно для Редиса.


    1. polearnik
      05.01.2022 12:38

      зависит от сервера но у меня на XEON X5550 было 80к RPS к редису. одно ядро было занято на ~80%. Если грамотно подобрать структуры данных для хранения тобутылочным горлышком редис никогда не будет


      1. korsetlr473
        05.01.2022 16:49

        вы даже вопроса не поняли , понятное дело что контекст не в редисе , а в вашей структуре данных сетов , которая получается key with nested array . В Той же mongodb в структуре key - nested array в одном документе после 100к записей в массиве уже начинает >1секунды запросы.


        1. Antharas
          06.01.2022 16:42

          А индексы уже не используют? Почитайте спеку на монгу более детально.


          1. korsetlr473
            06.01.2022 17:43

            зачем мне читать то в чем я контрибутер?


  1. laatoo
    05.01.2022 02:19
    +11

    Спроектировали по сути целый Tinder, можно и в FAANG после такого :-)

    вы всего лишь записали хаотично выбранные данные в in-memory key-value хранилище и прочитали их оттуда, спокойно


    1. DaniilRoman Автор
      06.01.2022 11:33
      +1

      в этом и была шутка)


  1. sse
    05.01.2022 03:07
    +6

    Что будет, если этот код выполнится одновременно от двух "лайкеров" одному и тому же юзеру? Чей-то UserLike потеряется?


    1. ggo
      05.01.2022 12:08
      +1

      потеряется.

      но кто ж лайки считает, это же не деньги. одним больше, одним меньше.


  1. OkunevPY
    05.01.2022 16:10
    +2

    Блин дичь дичайшая, что за бред с очередями.

    Redis выполняет операции атомарно, для счётчиков есть INCR который избавит от всей этой дичи.

    Redis это in-memory хранилище, как только подключите Persistance, плакала ваша производительность.

    Все возможные статистики по посещениям, просиотрам и т.д. формируються поверх TimeSeries структур и уж точно не в редисе.

    Что за странная философия "Я узнал что есть инструмент, а давайте мы на нём сделаем всё!!!".

    Вот после таких постов люди творят дичь, а потом бегут к нам и спрашивают, а чёй-то у меня приложуха жрёт всю память на сервере, а после ребута инфа теряеться и данные попадают.


    1. vadlit
      07.01.2022 23:37

      >Redis это in-memory хранилище, как только подключите Persistance, плакала ваша производительность.

      Он же вроде не сразу скидывает на диск, а только по прошествии времени/наступления события? Или я путаю?


  1. Semenych
    05.01.2022 16:19

    Очень полезная статья. Наблюдаю посление лет 10 попытки хранить транзакционные данные в NoSQL базах. В зависимости от выбора БД это имеет разную степень успешности. Но Редис тут не годится совершенно. Что не так

    1. не масштабируется - редис быстрый но, но по сути однопоточный

    2. не скалируется по объему данных. Если у вас база 64Гб, то вам надо два сервера (мы же хотим отказоустойчивость) с 64Гб памяти для в общем-то крохотной базы которую легко держит postgres установленный на R Pi

    3. как только у вас начнутся конкрурентные изменения данных пользователя - вам придется либо существенно все замедлять и делать слой синхронизации, либо изменения будут иногда теряться

    4. Представьте, что вам нужен список всех пользователей из Костромы? А теперь все пользователи из Ростова старше 40 лет. То что в РСУБД делается за минуту человеком поверхностно знающим SQL тут требует в лучшем случае часов работы программиста

    Похожая история была вот у этих ребят https://freefeed.net/ - они сделали микро соц сетку на Редисе и уперлись в перечисленные выше проблемы, особенно больной была проблема номер два. В результате переписывали на постгрес.

    Буду обязательно давать читать эту статью начинающим архитекторам.


    1. aleks_raiden
      05.01.2022 22:05
      +1

      Часть замечаний справедлива, часть обходиться заменой редиса на KVRocks ) Редис прекрасно масштабируеться, если что ) До момента памяти, хотя сейчас норма сервера это уже 128+ Гб. Ну и конечно, данные из редиса можно перекладывать потом в другие базы для аналитики.


    1. feoktant
      06.01.2022 14:26
      +1

      Представьте, что вам нужен список всех пользователей из Костромы?

      Мешать прод базу с аналитикой очень плохая идея. Работать будет только на маленьких данных, или начальных этапах проекта.


      1. Semenych
        06.01.2022 14:42

        Написал и стер три комментария. Рассказыать бизнесу, что ему нужна ОТДЕЛЬНА АНАЛИТИЧЕСКАЯ БАЗА при том, что у него <100 запросов в секунду и все база жалкие 100Гб. Ну так тоже можно конечно, лох не мамонт.

        Но я человек старой закалки и мне такое совесть не позволяет.

        Отдельное спасибо, еще один пункт в копилку - как не надо делать.


        1. feoktant
          06.01.2022 15:03
          +1

          Если у вас Постгрес, то отдельной аналитической базой может быть тот же Постгрес с отдельной схемой, но с денормализированными таблицами. И уже всеми нужными индексами, которые нужны аналитикам.

          Если же <100 запросов в секунду и все база жалкие 100Гб, и роста не предвидется... Что ж) и такие проекты бывают. Если аналитический запрос с кучей джоинов и без нужных индексов не съедает весь ваш процессор и память на сервере, при этом не тормозя 100 запросов в секунду до 2 - почему бы и нет.

          Всё относительно, закалка тут не причем


  1. bondeg
    05.01.2022 20:13

    Не описаны крайне важные моменты.

    • Критичные данные типа пользовательских данных, которые редко меняются, обязаны храниться в режиме немедленной записи на диск в отдельном инстансе.

    • Чтобы не было описанных в комментах проблем, фичи нужно хранить в раздельных инстансах. При росте это позволит мигрировать данные между серверами без геморроя

    • Шардирование данных в целом

    • Использование транзакций при необходимости (см. watch или использовать локи для проверки атомарности)

    P.S. GEORADIUS: Deprecation notice: as of Redis version 6.2.0 this command is considered as deprecated. While it is unlikely that it will be completely removed, prefer using `GEOSEARCH` and `GEOSEARCHSTORE` with the `BYRADIUS` argument in its stead.


  1. Antharas
    06.01.2022 16:46

    Как это не может "многопоточно"?

    С точки зрения выполнения команд в коре - да. С точки зрения вашего кода - CAS и Retry уже не используется?