Статья разбита на несколько частей:
- часть 1
- часть 3 - в работе
исходники этой части

В прошлой части мы развернули методы бизнес логики, защитились от CSRF и настроили GraphQL сервер.

В этой части нужно авторизовать Клиента и создать слушателей готовых принимать авторизацию пользователя. Необходимо иметь возможность удаленного завершения сессии Клиента.

Реализация

Упрощенная схема:

  1. При создании подключения Клиент получает ClientID

  2. Сервер создает сессию websocket привязанную к ClientID

  3. Сообщения отправляются всем слушателям привязанным к ClientID

  4. Слушателей ClientID может быть минимум 2, до передачи соединения в новую вкладку

Задачи

  1. Менеджер сессий

  2. Как Gqlgen обрабатывает websocket

  3. Создание Клиента и его наблюдателей

  4. Сохраняем сессию Клиента в JWT-токен

  5. Создание слушателей websocket

  6. Отправляем сообщение по websocket

1. Менеджер сессий

Мы уже имеем модель сессии /models/models_gen.go сгенерирована из схемы session.graphqls. Создадим в этой же директории файл с методами для безопасной работы с сессией /models/session.go

// Создает новую сессию
func NewSession() *Session {
	return &Session{

		// Идентификатором будет UUID
		// go get github.com/google/uuid
		// или go mod vendor при указании импорта
		Sid: uuid.New().String(),
	}
}

// Создаст сессию с существующим идентификатором
func NewSessionWithSid(sid string) *Session {
	return &Session{
		Sid: sid,
	}
}

// Получить идентификатор сессии
func (s *Session) GetSid() (sid string, err error) {
	if s.Sid == "" {
		return "", fmt.Errorf("session: not found")
	}
	return s.Sid, nil
}

// Подтверждает активность клиента
func (s *Session) SetOnline() {
	s.Online = true
	return
}

// Сохраняет сессию в контекст
func (s *Session) WithContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, sessionCtxKey{"session"}, s)
}

// Ключ контекста
type sessionCtxKey struct {
	name string
}

Создаем новый файл session.go в директории pkg/store

// Обрабатывает сессию клиента
func (s *Store) SessionHandleClient(w http.ResponseWriter, r *http.Request) *http.Request {

	// Получим контекст
	ctx := r.Context()

	// Создадим сессию
	var sess *model.Session

	// Проверим наличие токена c ClientID
	cookie, err := r.Cookie("_sid")
	if err != nil {

		// Нет ClientID, создадим сессию
		sess = model.NewSession()

	} else {

		// Тут должна быть логика валидации
		// Но нам сейчас удобно видеть действительную запись
		sess = model.NewSessionWithSid(cookie.Value)

		// Клиент имеет ID
    // Вебсокет не может устанавливать cookie, 
    // значит если идентификатор отсутствует, 
    // то возможные соединения websocket 
    // являются не авторизованными и должны быть отклонены
    // 
    // В данный кейс попадает Клиент с ранее имеющимся 
    // ClientId, в этом случае соединение 
    // по websocket возможно настроим это
		sess.SetOnline()
	}

	// Если есть ошибка – устанавливаем новые cookie
	if err != nil {

		// Получим ID клиента
		sid, err2 := sess.GetSid()
		if err2 != nil {
			fmt.Printf(err.Error())
			return r
		}

		// Создадим cookie
		cookie = &http.Cookie{
			Name: "_sid",
			// Сid следует завернуть в токен, например JWT
			Value: sid,
			HttpOnly: true,
			//Secure: true,
		}

		// Установим cookie
		http.SetCookie(w, cookie)
	}

	// Сохраним сессию в контекст и вернем *http.Request
	return r.WithContext(sess.WithContext(ctx))
}

Теперь мы умеем обрабатывать сессию Клиента.

Создадим метод обработки HTTP запросов в файле pkg/store/auth.go и в нем подключим только что созданный метод:

// Вызывается в AuthMiddleware
// Обрабатывает HTTP заголовки
// Проводит авторизации клиента и пользователя
func (s *Store) HandleAuthHTTP(w http.ResponseWriter, r *http.Request) *http.Request {

	// Обработаем сессию клиента
	r = s.SessionHandleClient(w, r)

	return r
}

Необходимо опубликовать HandleAuthHTTP в качестве промежуточного ПО HTTP-роутера. Для этого создадим файл auth.go в директории pkg/middleware:

func AuthMiddleware(store *store.Store) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

			// Метод из Store, обрабатывает логику авторизации
			r = store.HandleAuthHTTP(w, r)
			next.ServeHTTP(w, r)
		})
	}
}

Подключим его в main.go:

func main() {
  // ...
  router := mux.NewRouter()
  
  // Подключим Auth middleware и передадим store в качестве параметра
	router.Use(middleware.AuthMiddleware(store))
  
  // ...
}

Запускаем сервер, открываем браузер. Теперь Клиент имеет свой идентификатор в виде cookie с именем _sid

Коммит данного этапа

2. Как Gqlgen обрабатывает websocket

По мере развития проекта мы собираемся подписываться не только на авторизацию. Необходимо реализовать общую логику подписки для всех сущностей проекта.

Важно учесть:

  1. ClientID – может быть присвоен только браузеру

  2. Разные вкладки браузера, имеют свои уникальные websocket идентификаторы, не связанные с ClientID

Клиент подключающийся по websocket должен иметь cookie с ClientID, которую он получает после GET-запроса. Отсекаем запросы не имеющие авторизации: параметры можно узнать из model.Session которую извлекаем из контекста – необходимо создать метод.

Фронтенд получает состоние авторизации по вебсокет, слушая изменения в subscription.auth. Для этих целей у на сервере есть метод Auth обрабатывающий subscriptionResolver. На его примере разберем процесс добавления и удаления слушателя. Для этого добавим 2 метода в models/session:

// Получает сессию из контекста
func SessionFromContext(ctx context.Context) (*Session, error) {
	if meta := ctx.Value(sessionCtxKey{"session"}); meta != nil {
		return meta.(*Session), nil
	}
	return nil, fmt.Errorf("meta: not found")
}

// Подтверждает активность клиента
func (s *Session) CheckOnline() bool {
	return s.Online
}

Отредактируем сам метод подписки Auth, он импортирует AuthWebsocket – из Store. Откроем файл /pkg/store/auth.go и отредактируем этот метод:

// Авторизовывает websocket
// Создает сессию
// Обрабатывает подключение и создает канал
//
// Каждый клиент вызывавший данный
// метод – является уникальным
func (r *Store) AuthWebsocket(ctx context.Context) (<-chan *model.Auth, error) {

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {

		// Если произошла ошибка то не стоит здесь
		// ее отправлять дальше.
		//
		// Ее нужно логировать и вернуть на фронт
		// что-то более обобщенное
		fmt.Printf("Auth subscriptionResolver. %v", err)

		return nil, gqlerror.Errorf("internal error")
	}

	// Проверим инициатора запроса.
	// Если запрос поступил по вебсокет и от Клиента
	// ранее не имеющего ClientID – не обрабатываем его
	if ok := sess.CheckOnline(); !ok {

		// Если клиент не имеет авторизации
		return nil, gqlerror.Errorf("unauthorized")
	}

	// Подключившийся клиент – уникален
	// Создадим websocket ID
	wsid := uuid.New().String()

	// Создаем канал в который будем писать сообщения
	in := make(chan *model.Auth)

	// Выведем в терминал сообщение при подключении
	fmt.Printf("WS connect. ID: %v\n", wsid)

	// Обработаем остановку соединения
	go func() {

		// Чтобы узнать об отключении websocket
		// достаточно слушать сигнал из контекста
		<- ctx.Done()
		fmt.Printf("WS disconnect. ID: %v\n", wsid)
	}()

	// Тестовая публикация сообщения
	go func() {
		
		// Сразу опубликуем сообщение
		in <- &model.Auth{
			ClientID: time.Now().String(),
		}

		// Небольшая задержка и отправим следующее
		time.Sleep(time.Second * 2)
		in <- &model.Auth{
			ClientID: time.Now().String(),
		}
	}()

	// Вернем канал
	return in, nil
}

Запустим сервер, откроем http://localhost:2000/ в открывшемся окне Playground выполним запрос:

subscription{
  auth{
    client_id
  }
}

Получаем 2 сообщения с задержкой:

Теперь проверим ответ если Клиент является не авторизованным. При соединении браузер получил cookie с ClientID, зайдем в инструменты разработчика, удалим cookie и снова попробуем подключиться:

Мы разобрали механизм подписки в Gqlgen, теперь можно приступить к этапу сборки слушателя.

Исходники этапа

3. Создание Клиента и его наблюдателей

Структура Клиента и его слушателя:

  1. Client: объединяет слушателей единым ClientID. В нашем контексте – браузер

  2. Observer: является непосредственным получателем сообщений – разные вкладки браузера. Имеет свой уникальный идентификатор

Важный момент!

Observer-ом, может являться не только новая вкладка. Но и новый запрос в текущей вкладке.

На схеме:

Observer – вкладка браузера

Синим обозначены соединения Client-1. Видим что через Observer-1 проходит 2 канала. Это происходит из-за отсутствия идентификатора у Observer.

Как решаем?

При создании нового слушателя – создаем сессию, со своим идентификатором SessionID.

  1. Создаем сессию для каждой вкладки браузера

  2. Создаем сессию, только для websocket соединений

  3. Клиент получает Session-ID в ответе websocket

  4. Сессия активна – пока активно соединение

  5. Если Клиент имеет SessionID, то при любых запросах включает его в HTTP-заголовок: Session-ID

Схема после создания идентификатора:

Приступим к реализации

Откроем файл /pkg/store/session.go, нас интересует код в методе SessionHandleClient:

func (s *Store) SessionHandleClient(w http.ResponseWriter, r *http.Request) *http.Request {
  // ...
  
  // Проверим наличие токена c ClientID
	cookie, err := r.Cookie("_cid")
	if err != nil {

		// Нет ClientID, создадим сессию
		sess = model.NewSession()
	} else {
    
    // ...
	}
  
  // ...
}

Здесь при отсутствии сессии Клиента мы создаем новую сессию, далее сохраняем ее в контекст. В случае если клиент имеет ClientID, мы снова создаем сессию с ClientID полученным из cookie.

Будет разумнее если при наличии cookie мы будем не создавать, а получать существующую сессию.

Нам не выгодно создавать хранилище данных сессий на стороне сервера. Значит будем хранить ее у клиента. Здесь будут разумны 2 варианта:

  1. Сохранить сессию в cookie как JSON

  2. Сохранить сессию в JWT-токен

4. Сохраняем сессию Клиента в JWT-токен

В прошлой главе мы разобрали необходимость создания сессии для каждого слушателя, также пришли к выводу что ее необходимо где-то сохранять.

На первый взгляд, самое простое решение: сохранять сессию в cookie. Но нам нужна сессия привязанная к конкретной вкладке. В случае с cookie, она станет видна всему браузеру: придется создавать уникальные cookie. В случае с уникальным cookie – как их чистить? Берем во внимание что websocket, не работает с cookie и не сможет их удалить после отписки.

Чтобы не изобретать: сохраним сессию в JWT-токен и передадим его Клиенту. При следующем обращении Клиент: отправит токен в HTTP-заголовке Session-ID.

Почему именно Session-ID?
1. Мы можем хранить сессию на стороне Сервера и при получении Session-ID запрашивать ее из хранилища
2. Технически, JWT-токен не является сессией. В зашифрованом виде бесполезен для Клиента

JWT-валидация

Состояния при валидации токена:

  1. Token invalid – не рабочее: когда то, что пришло вообще не является токеном

  2. Claims invalid – не рабочее: когда не удалось извлечь полезную нагрузку

  3. Expired – подходит для дальнейшей валидации: в этом случае токен должен быть обновлен. Возможно, после дополнительной валидации сессии

  4. Valid – рабочий токен

При разработке авторизации Пользователя, нам понядобится параметр Expired. В этом случае: токен должен будет содержать в себе поле: AccessToken. С ним мы будем обращаться в хранилище сессий на Сервере. И если он валиден – будем обновлять JWT-токен.

– Именно по такому принципу работает OAuth

Не будем углубляться в структуру токена. Тем более, есть хорошая статья:
Пять простых шагов для понимания JSON Web Tokens (JWT)

Нас интересует как работать с JWT в Golang, для этого возьмем популярный пакет jwt-go. Добавим его к проекту:

go get github.com/dgrijalva/jwt-go

Создадим файл /pkg/token/jwt.go:

package token

import (
	"fmt"
	"github.com/dgrijalva/jwt-go"
	model "react-apollo-gqlgen-tutorial/backoffice/models"
	"time"
)

// Создадим структуру Jwt
type Jwt struct {
	SecretKey  string
	Issuer     string
	Expiration int64
}

// Опции при генерации токена
type JwtClaims struct {

	// Нужен для обновления токена
	AccessToken string

	// Прикрепим сессию
	Sess *model.Session

	jwt.StandardClaims
}

// Генерация токена
func (j *Jwt) Generate(opt JwtClaims) (token string, err error) {

	// Получим Claims
	claims := &opt

	// Инициализация StandardClaims
	//
	// Здесь "подключаются" все настройки
	// Необходимые для валидации токена
	//
	// Указываются при инициализации структуры Jwt
	claims.StandardClaims = jwt.StandardClaims{
		ExpiresAt: time.Now().Local().Add(time.Second * time.Duration(j.Expiration)).Unix(),
		Issuer:    j.Issuer,
	}

	// Генерация токена
	t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)

	return t.SignedString([]byte(j.SecretKey))
}

// Опции при валидации токена
type JwtValidateOptions struct {
	Token 	string
}

// Валидация токена
func (j *Jwt) Validate(opt JwtValidateOptions) (claims *JwtClaims, err error) {

	// Попробуем получить полезную нагрузку
	token, err := jwt.ParseWithClaims(
		opt.Token,
		&JwtClaims{},
		func(token *jwt.Token) (interface{}, error) {
			return []byte(j.SecretKey), nil
		},
	)

	// Полезной нагрузки нет
	// Что-то явно не валидное – вернем ошибку
	if token == nil {
		return nil, fmt.Errorf("token invalid")
	}

	// Получим Claims
	claims, ok := token.Claims.(*JwtClaims)
	if !ok {
		return nil, fmt.Errorf("error token claims")
	}

	// Проверим срок жизни токена
	if j.Expiration > 0 && claims.ExpiresAt < time.Now().Local().Unix() {

		// Если токен протух
		// Вернем полезную нагрузку вместе с ошибкой
    // 
    // Для дальнейшей валидации:
    // claims будет содержать AccessToken
		return claims, fmt.Errorf("token is expired")
	}

	return claims, nil
}

// Опции структуры Jwt
type JwtOptions struct {
	SecretKey 	string
	Issuer 		string
	ExpSeconds 	int64
}

func NewJwt(opt JwtOptions) *Jwt {
	return &Jwt{
		SecretKey: 	opt.SecretKey,
		Issuer: 	opt.Issuer,
		Expiration: opt.ExpSeconds,
	}
}

Теперь мы можем сохранить сессию в токене, вернемся к файлу /pkg/store/session.go. Добавим метод вадидирующий токен и метод валидирующий сессию слушателя:

// Валидирует сессию слушателя
func (s *Store) ValidateClientSession(ctx context.Context) (sessionID string, err error) {

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {
		return "", fmt.Errorf("internal error")
	}

	if ok := sess.CheckOnline(); !ok {

		// Если клиент не авторизован: SessionID отсутствует
		// Создадим SessionID, и отправим клиенту
		sessionToken, err2 := s.token.SessionID.Generate(token.JwtClaims{
			Sess: sess,
		})

		if err2 != nil {
			fmt.Println(err2)
			return "", fmt.Errorf("internal error")
		}

		return sessionToken, nil
	}

	return "", nil
}

// Валидирует токен слушателя
func (s *Store) ValidateSessionToken(sid string) (*model.Session, error) {

	// Валидируем токен
	// Считаем токен не валидным если нет claims
	if claims, _ := s.token.SessionID.Validate(token.JwtValidateOptions{
		Token: sid,
	}); claims != nil {
		sess := claims.Sess

		// Сессию получили из заголовка: клиент онлайн
		sess.SetOnline()

		// Сохраним сессию в контекст
		return sess, nil
	}

	return nil, fmt.Errorf("invalid session token")
}

В этом же файле /pkg/store/session.go , перейдем к методу SessionHandleClient, отредактируем его так:

// Обрабатывает сессию клиента
func (s *Store) SessionHandleClient(w http.ResponseWriter, r *http.Request) *http.Request {

	// Получим контекст
	ctx := r.Context()

	// Сюда запишем сессию, если сработает кейс
	var sess *model.Session
	var ClientID string

	// Проверим наличие cookie c ClientID
	cookie, err := r.Cookie("_cid")
	if err == nil {
		ClientID = cookie.Value

		// У клиента есть ClientID
		// 1. Проверим наличие заголовка Session-ID
		// 2. Получаем токен и валидируем его
		// 2.1. Токен валидный: сохраним сессию из токена
		// 2.2. Токен протух: сохраним сессию из токена
		// 2.3. Токен Invalid: создадим новую сессии

		// Ищем заголовок Session-ID
		if t := r.Header.Get("Session-ID"); t != "" {

			// Нашли сессию
			if ss, err2 := s.ValidateSessionToken(t); err2 == nil {
				sess = ss
			}
		}

		// Этот метод теперь удален
		//sess = model.NewSessionWithSid(cookie.Value)
	}

	// Если сессии нет: создаем сессию
	if sess == nil {
		sess = model.NewSession()
		
		if ClientID != "" {
			sess.AddClientID(ClientID)
		}
	}

	// Если есть ошибка при чтении cookie
	if err != nil {

		// Получим ID клиента
		cid, err2 := sess.GetSid()
		if err2 != nil {
			fmt.Printf(err.Error())
			return r
		}

		// Создадим cookie
		cookie = &http.Cookie{
			Name: "_cid",
			Value: cid,
			HttpOnly: true,
			//Secure: true,
		}

		// Установим cookie
		http.SetCookie(w, cookie)
	}

	// Сохраним сессию в контекст и вернем *http.Request
	return r.WithContext(sess.WithContext(ctx))
}

Мы получаем HTTP-заголовки. Но websocket авторизовывается иначе. Откроем файл /pkg/graph/resolver.go. Тут необходимо принять заголовок Session-ID, провалидировать токен, в случае успеха: получить сессию и сохранить ее в контекст.

func NewServer(opt Options) *handler.Server {
	// ...
  
	srv.AddTransport(transport.Websocket{
    
    // ...
		InitFunc: transport.WebsocketInitFunc(func(ctx context.Context, initPayload transport.InitPayload) (context.Context, error) {

			// Тут обрабатываются websocket соединения
			// Получим заголовок "Session-ID"
			if sid, ok := initPayload["Session-ID"]; ok {
				if sess, err := opt.Store.ValidateSessionToken(sid.(string)); err == nil {
					
          // Сохраним сессию в контекст
          ctx = sess.WithContext(ctx)
				}
			}

			return ctx, nil
		}),
	})
  
  // ...
}

Методы авторизации

Мы все подготовили, теперь нужно, где-то получить токен SessionID. При любом запросе model.Auth, нужно создавать сессию. За исключение случая когда она имеется.

Каждый запрос model.Auth: возвращает текущее состояние авторизации, за него отвечает метод Auth() в файле /pkg/store/auth.go. Откроем и отредактируем его:

// Возвращает состояние Auth исходя из текущего контекста
func (s *Store) Auth(ctx context.Context) (*model.Auth, error) {

	// создадим модель
	auth := &model.Auth{}

	// Проверим сессию
	sid, err := s.ValidateClientSession(ctx)
	if err != nil {
		return nil, gqlerror.Errorf("internal error")
	}

	// Если есть sid – добавим его к Auth
	if sid != "" {
		auth.AddSessionId(sid)
	}

	// Отправим текущее состояние
	return auth, nil
}

Также отредактируем метод подписки на websocket model.Auth:

func (s *Store) AuthWebsocket(ctx context.Context) (<-chan *model.Auth, error) {

	// Получим текущее состояние авторизации
	auth, err := s.Auth(ctx)
	if err != nil {
		return nil, gqlerror.Errorf("internal error")
	}

	// Создаем канал в который будем писать сообщения
	ch := make(chan *model.Auth)

	// Нужно вернуть текущее состояние
	go func() {
		ch <- auth
	}()

	// Вернем канал
	return ch, nil
}

Для чистоты эксперимента

Добавим слушатель websocket для User. Откроем файл schema.graphqls и добавим метод:

"""
Подписки на websocket
"""
type Subscription {

  """
  Добавлен метод:
  Подписка на Auth
  """
  user: User!

  """
  Подписка на Auth
  """
  auth: Auth!
}

Введем команду генерации смехы в терминале:

go run cmd/gqlgen.go

Переместим сгенерированный метод:
User(ctx context.Context) (<-chan *model.User, error)
в /pkg/graph/user и отредактируем его:

func (r *subscriptionResolver) User(ctx context.Context) (<-chan *model.User, error) {
	user := make(chan *model.User)

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {
		return nil, gqlerror.Errorf("internal error")
	}

	fmt.Printf("User. Session: %v\n", sess.Sid)

	return user, nil
}

Также добавим похожий код в метод подписки на Auth:

func (s *Store) AuthWebsocket(ctx context.Context) (<-chan *model.Auth, error) {

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {
		return nil, gqlerror.Errorf("internal error")
	}

	fmt.Printf("Auth. Session: %v\n", sess.Sid)
  
  // ...
}

Откроем терминал:

go run cmd/main.go

В браузере: http://localhost:2000/

Вводим запрос:

query{
  auth{
    sessionId
  }
}

Получаем токен:

Теперь подключаемся к Auth, используя данный токен. Для этого в Playground есть инструмент:

Вводим запрос:

subscription{
  auth{
    sessionId
  }
}

Следом вводим второй:

subscription {
  user{
    uid
  }
}

Смотрим в терминал, и видим что Auth и User имеют одинаковые идентификаторы. Это значит что сессия создается и читается:

Мы создали сессию для каждого окна, теперь нужно создать сам обработчик соединений.

5. Создание слушателей websocket

При подписке нового слушателя Auth или User необходимо:

  1. Получить: ClientID и SessionID

  2. Создать Клиента по ClientID

  3. Создать слушателя по SessionID

При отписке:

  1. Удалить слушатель Auth или User

  2. Проверить наличие других слушателей, в случае отсутствия: полностью удалить запись с сессией Клиента

Мы хотим иметь унифицированный обработчик для Auth и User или для других сущностей проекта. По этому не можем знать тип принимаемого канала или сообщения. Для решения этой задачи в качесте типа будем принимать пустой интерфейс и извлекать канал согласно его типа.

Создадим файл /pkg/websocket/observer.go обрабатывающий тип канала:

type observer struct {
	auth chan 	*model.Auth
	user chan 	*model.User
}

func (o *observer) Add(ch interface{}) error {

	// Получим тип из интерфейса
	switch ch.(type) {
	case chan *model.Auth:
		o.auth = ch.(chan *model.Auth)
		return nil
	case chan *model.User:
		o.user = ch.(chan *model.User)
		return nil
	default:
    
    // Тип не обнаружен
		return fmt.Errorf("observer: unknown type")
	}
}

// Удаляет наблюдателя,
// если вернет true - можно удалить слушатель
func (o *observer) Delete(ch interface{}) bool {

	// Получим тип из интерфейса
	switch ch.(type) {
	case chan *model.Auth:
		o.auth = nil
	case chan *model.User:
		o.user = nil
	}

	return o.checkEmpty()
}

// Вернет истину если нет слушателей
func (o *observer) checkEmpty() bool {
	switch {
	case o.auth != nil:
		return false
	case o.user != nil:
		return false
	}
	return true
}

Слушателей объединяет Клиент, создадим файл /pkg/websocket/client.go:

type client struct {
	observers 	map[string]*observer
	mu 			sync.Mutex
}

// Добавляет слушателя Клиента
func (c *client) Add(sid string, ch interface{}) error {

	// Заблокируем мапу слушателей
	// чтобы безопасно с ней работать работать
	c.mu.Lock()

	// Разблокируем мапу после выхода из функции
	defer c.mu.Unlock()

	// Поищем слушателя
	obs, ok := c.observers[sid]
	if !ok {

		// Слушатель не найден, создадим
		obs = &observer{}

		// Добавим в мапу
		c.observers[sid] = obs
	}

	err := obs.Add(ch)
	if err != nil {
		return err
	}

	return nil
}

// Удаляет слушателя
// Возвращает признак наличия других слушателей
func (c *client) Delete(sid string, ch interface{}) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	obs, ok := c.observers[sid]
	if !ok {
		// Обсервер не найден?
		fmt.Println("panic")
	}

	// Удаляем канал
	if ok = obs.Delete(ch); ok {

		// Если вернулся признак пустоты
		// Удалим слушатель
		delete(c.observers, sid)
	}

	// Посчитаем количество слушателей
	// и вернем результат
	return len(c.observers) == 0
}

func newClient() *client {
	return &client{
		observers: make(map[string]*observer),
	}
}

Остался файл менеджера с единственным методом создающим нового слушателя /pkg/websocket/manager.go:

type Websocket struct {
	clients map[string]*client

	// Защищаем мапу
	mu sync.Mutex
}

// Создает Клиента
func (w *Websocket) NewObserver(ctx context.Context, ch interface{}) error {

	// Заблокируем мапу clients
	// чтобы безопасно с ней работать работать
	w.mu.Lock()

	// Разблокируем мапу после выхода из функции
	defer w.mu.Unlock()

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {
		return err
	}

	cid := sess.ClientID
	sid := sess.Sid

	// Найдем, или создадим клиента
	cli, ok := w.clients[cid]
	if !ok {

		// Клиент не найден, создадим
		cli = newClient()

		// Добавим в мапу
		w.clients[cid] = cli
	}

	// Добавим слушателя клиента
	err = cli.Add(sid, ch)
	if err != nil {
		return err
	}

	// Клиент отписывается – удаляем слушатель
	go func() {
		<- ctx.Done()

		cli.Delete(sid, ch)
	}()

	return nil
}

func New() *Websocket {
	return &Websocket{
		clients: make(map[string]*client),
	}
}

Мы создали менеджера слушателей websocket, осталось его подключить.

Откроем файл /pkg/store/auth.go, и добавим созданный менеджер к методу AuthWebsocket реализующему websocket соединение для Auth:

func (s *Store) AuthWebsocket(ctx context.Context) (<-chan *model.Auth, error) {

	// Получим текущее состояние авторизации
	auth, err := s.Auth(ctx)
	if err != nil {
		fmt.Println(err)
		return nil, gqlerror.Errorf("internal error")
	}

	// Создаем канал в который будем писать сообщения
	ch := make(chan *model.Auth)

	// Подключим канал к менеджеру websocket
	err = s.websocket.NewObserver(ctx, ch)
	if err != nil {
		fmt.Println(err)
		return nil, gqlerror.Errorf("internal error")
	}

	// Нужно вернуть текущее состояние
	go func() {
		ch <- auth
	}()

	// Вернем канал
	return ch, nil
}

Все готово, теперь наши соединения привязываются к ClientID и SessionID

Исходники этапа

6. Отправляем сообщение по websocket

Мы создали слушателей, осталось реализовать методы для отправки сообщений слушателям по их назначению.

Откроем файл /pkg/websocket/observer.go и добавим метод Send:

func (o *observer) Send(ch interface{}) {

	// Получим тип из интерфейса
	switch ch.(type) {
	case *model.Auth:

		// Валидируем канал
		if o.auth == nil {
			fmt.Println("Auth sending error")
			return
		}

		// Отправляем сообщение
		o.auth <- ch.(*model.Auth)
	case *model.User:

		// Валидируем канал
		if o.user == nil {
			fmt.Println("User sending error")
			return
		}

		// Отправляем сообщение
		o.user <- ch.(*model.User)
	default:
		fmt.Println("unknown message type")
	}
}

Далее создадим метод в /pkg/websocket/client.go:

func (c *client) Send(ch interface{}) {
	c.mu.Lock()
	defer c.mu.Unlock()

	// Необходимо отправить сообщение всем слушателям
	// Пройдемся в цикле и запустим отправку
	//
	// Лучше всего это сделать в отдельной горутине

	// Создадим WaitGroup
	// Про применение описано в этой статье:
	// https://habr.com/ru/company/otus/blog/557312/
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		for _, obs := range c.observers {
			obs.Send(ch)
		}
		wg.Done()
	}()
	wg.Wait()
}

Теперь добавим Send в менеджер /pkg/websoket/manager.go:

func (w *Websocket) Send(ctx context.Context, ch interface{}) error {
	w.mu.Lock()
	defer w.mu.Unlock()

	// Получим сессию из контекста
	sess, err := model.SessionFromContext(ctx)
	if err != nil {
		return err
	}

	// Получим ClientID
	cid := sess.ClientID

	// Найдем клиента
	cli, ok := w.clients[cid]
	if !ok {
		return fmt.Errorf("client not found")
	}

	// Отправляем сообщение
	cli.Send(ch)

	return nil
}

Отправляем Auth по websocket

Мы подготовились к отправке сообщений по вебсокет. Реализуем метод отправки текущего состояния Auth Клиенту:

func (s *Store) SendAuth(ctx context.Context) error {
	
	// Получим текущее состояние
	auth, err := s.Auth(ctx)
	if err != nil {
		return err
	}

	// Todo: удалить!!!
	// Чтобы увидеть результат изменений
	// Нужно что нибудь рандомное
	auth.Method = time.Now().String()

	if err = s.websocket.Send(ctx, auth); err != nil {
		return err
	}

	return nil
}

Тестируем соединение

Для теста изменим метод User запрашивающий пользователя, в файле /pkg/store/user.go:

func (s *Store) User(ctx context.Context) (*model.User, error) {

	err := s.SendAuth(ctx)
	fmt.Println("Запрашиваем Auth из метода User")
	fmt.Printf("Ошибка: %v\n", err)

	// ...
	return &model.User{
		Username: "LOLO",
	}, nil
}

Запускаем сервер:

go run cmd/main.go

Открываем: http://localhost:2000/

Выполняем запрос:

subscription{
  auth{
    sessionId,
    method
  }
}

Открываем новую вкладку в Playground, полученный токен помещаем в заголовок:

{
  "Session-ID": "TOKEN"
}

Выполняем запрос:

query{
  user{
    username
  }
}

Получаем результат во вкладке auth:

Вкладки браузера, и другие браузеры: тестировать самостоятельно.

На этом можно завершить данную часть. В следующей части мы разработаем механим доставки авторизации Пользователю и развернем фронтенд на React + Apollo.

Комментарии (0)