В первой части нашего цикла статей «Пишем блог на микросервисах» мы описали общий подход к решению задачи.

Теперь пришла очередь API Gateway или API GW.

В нашем c ptimofeev API GW мы реализуем следующие функции:

  • Конвертация REST запросов в gRPC запросы и наоборот.
  • Логирование запросов.
  • Аутентификация запросов.
  • Присвоение каждому запросу Trace ID для дальнейшей передачи его между микросервисами по всей цепочке выполнения запроса.

Итак, поехали…

Для реализации функции конвертация REST/gRPC будем использовать гошную библиотеку grpc-gateway.

Далее в протофайле каждого микросервиса, который хотим опубликовать по REST, необходимо добавить описание option в секции описания интерфейсов сервиса. Здесь собственно указывается путь и метод, по которому будет осуществляться доступ по REST.

//Описание сервиса Category
service CategoryService {
  
//Создание записи
  rpc Create (CreateCategoryRequest) returns (CreateCategoryResponse) {
    option (google.api.http) = {
      post: "/api/v1/category"
    };
  }    
}

На основе этой информации скрипт генерации кода (./bin/protogen.sh) создаст код gRPC сервера (в каталоге микросервиса), gRPC клиента (в каталоге api-gw) и сгенерирует актуальную документацию API (в формате {{имя сервиса}}.swagger.json)

Далее нам нужно написать код HTTP Proxy, который с одной стороны будет HTTP сервером (для обработки REST запросов), а с другой стороны будет gRPC клиентом для наших микросервисов (gRPC серверов).

Этот код мы разместим в файле ./services/api-gw/main.go.

В начале в секции import подключаем клиентские библиотеки к наших микросервисов
(их нам сгенерил protogen.sh):

import (        
    …
    userService "./services/user/protobuf"
    postService "./services/post/protobuf"
    commentService "./services/comment/protobuf"
    categoryService "./services/category/protobuf"
    …

Далее указываем адреса и порты на которых «висят» наши gRPC сервисы (значения берем из переменных окружений):

var (
    // gRPC services
    userServerAdress=fmt.Sprintf("%s:%s",os.Getenv("USER_HOST"),os.Getenv("USER_PORT"))
    postServerAdress=fmt.Sprintf("%s:%s",os.Getenv("POST_HOST"),os.Getenv("POST_PORT"))
    commentServerAdress=fmt.Sprintf("%s:%s",os.Getenv("COMMENT_HOST"),os.Getenv("COMMENT_PORT"))
    categoryServerAdress=fmt.Sprintf("%s:%s",os.Getenv("CATEGORY_HOST"),os.Getenv("CATEGORY_PORT"))
)   

И, наконец, реализуем сам HTTP Proxy:

func HTTPProxy(proxyAddr string){

    grpcGwMux:=runtime.NewServeMux()

    //----------------------------------------------------------------
    // настройка подключений со стороны gRPC
    //----------------------------------------------------------------
    //Подключение к сервису User
    grpcUserConn, err:=grpc.Dial(
        userServerAdress,
        grpc.WithInsecure(),
    )
    if err!=nil{
        log.Fatalln("Failed to connect to User service", err)
    }
    defer grpcUserConn.Close()

    err = userService.RegisterUserServiceHandler(
        context.Background(),
        grpcGwMux,
        grpcUserConn,
    )
    if err!=nil{
        log.Fatalln("Failed to start HTTP server", err)
    }

    //----------------------------------------------------------------
    //Подключение к сервису Post
    grpcPostConn, err:=grpc.Dial(
        postServerAdress,
        grpc.WithUnaryInterceptor(AccessLogInterceptor), 
        grpc.WithInsecure(),
    )
    if err!=nil{
        log.Fatalln("Failed to connect to Post service", err)
    }
    defer grpcPostConn.Close()
    
    err = postService.RegisterPostServiceHandler(
        context.Background(),
        grpcGwMux,
        grpcPostConn,
    )
    if err!=nil{
        log.Fatalln("Failed to start HTTP server", err)
    }

    //----------------------------------------------------------------
    //Подключение к сервису Comment
    grpcCommentConn, err:=grpc.Dial(
        commentServerAdress,
        grpc.WithInsecure(),
    )
    if err!=nil{
        log.Fatalln("Failed to connect to Comment service", err)
    }
    defer grpcCommentConn.Close()

    err = commentService.RegisterCommentServiceHandler(
        context.Background(),
        grpcGwMux,
        grpcCommentConn,
    )
    if err!=nil{
        log.Fatalln("Failed to start HTTP server", err)
    }

    //----------------------------------------------------------------
    //Подключение к сервису Category
    grpcCategoryConn, err:=grpc.Dial(
        categoryServerAdress,
        grpc.WithInsecure(),
    )
    if err!=nil{
        log.Fatalln("Failed to connect to Category service", err)
    }
    defer grpcCategoryConn.Close()

    err = categoryService.RegisterCategoryServiceHandler(
        context.Background(),
        grpcGwMux,
        grpcCategoryConn,
    )
    if err!=nil{
        log.Fatalln("Failed to start HTTP server", err)
    }

    //----------------------------------------------------------------
    //  Настройка маршрутов с стороны REST
    //----------------------------------------------------------------
    mux:=http.NewServeMux()
    
    mux.Handle("/api/v1/",grpcGwMux)
    mux.HandleFunc("/",helloworld)

    fmt.Println("starting HTTP server at "+proxyAddr)
    log.Fatal(http.ListenAndServe(proxyAddr,mux))
}

В настройке подключения к микросервисам мы используем опцию grpc.WithUnaryInterceptor(AccessLogInterceptor), в которую в качестве параметра передаем функцию AccessLogInterceptor. Это не что иное как реализация middleware слоя, т.е. функция AccessLogInterceptor будет выполняться при каждом gRPC вызове дочернего микросервиса.

…
    //----------------------------------------------------------------
    //Подключение к сервису Post
    grpcPostConn, err:=grpc.Dial(
        …
        grpc.WithUnaryInterceptor(AccessLogInterceptor), 
        …
    )

В свою очередь, в функции AccessLogInterceptor мы уже реализуем механизмы аутентификации, логирования и генерации TraceId.

Если во входящем (REST) запросе в Header был указан атрибут authorization, то парсим и валидируем его в функции CheckGetJWTToken, которая либо возвращает ошибку, либо в случае успеха возвращает UserId и UserRole.

var traceId,userId,userRole string
    if len(md["authorization"])>0{
        tokenString:= md["authorization"][0]
        if tokenString!=""{
            err,token:=userService.CheckGetJWTToken(tokenString)
            if err!=nil{
                return err
            }
            userId=fmt.Sprintf("%s",token["UserID"])
            userRole=fmt.Sprintf("%s",token["UserRole"])
        }
    }

Далее формируем TraceId и заворачиваем его вместе с UserId и UserRole в контекст вызова и осуществляем gRPC вызов нашего микросервиса.

//Присваиваю ID запроса
    traceId=fmt.Sprintf("%d",time.Now().UTC().UnixNano())

    callContext:=context.Background()
    mdOut:=metadata.Pairs(
        "trace-id",traceId,
        "user-id",userId,
        "user-role",userRole,
    )
    callContext=metadata.NewOutgoingContext(callContext,mdOut)
    
    err:=invoker(callContext,method,req,reply,cc, opts...)

И, наконец, пишем в лог событие вызова сервиса.

msg:=fmt.Sprintf("Call:%v, traceId: %v, userId: %v, userRole: %v, time: %v", method,traceId,userId,userRole,time.Since(start))
app.AccesLog(msg)

Еще один middleware обработчик «вешаем» на ответы конкретных методов (SignIn, SignUp) сервиса User. Этот обработчик перехватывает gRPC ответы, забирает ответ UserID и UserRole, преобразует в JWT Token и отдает его (JWT Token) в REST ответе в качестве Header атрибута «authorization». Описанный middleware код реализован на стороне gRPC клиента в файле ./api-gw/services/user/protobuf/functions.go.

Подключаем обработчик ответов.

func init() {
    //Переопределяю обработку ответа для вызовова SignIn
    forward_UserService_SignIn_0 = forwardSignIn

    //Переопределяю обработку ответа для вызовова SignUp
    forward_UserService_SignUp_0 = forwardSignUp
}

Пример — обработчик ответа SignIn (обработчик SignUp аналогичен).

func forwardSignIn(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, req *http.Request, resp proto.Message, opts ...func(context.Context, http.ResponseWriter, proto.Message) error) {
    
    //Преобразую proto.Message в SignInResponse
    signInResponse:=&SignInResponse{}
    signInResponse.XXX_Merge(resp)

    token,err:=GetJWTToken(signInResponse.Slug,signInResponse.Role)
    if err!=nil{
        http.Error(w, fmt.Sprintf("%v",err), http.StatusUnauthorized)
        return
    }
    w.Header().Set("authorization", token)
    runtime.ForwardResponseMessage(ctx, mux, marshaler, w, req, resp, opts...)
}

Продолжение следует…

Да, демо проекта можно посмотреть здесь, а исходный код здесь.

Комментарии (7)


  1. codemafia
    29.10.2019 21:25

    Это нормально, что столько условных операторов идет в одной функции?


    1. Akuma
      29.10.2019 21:40

      Это же Go, там на каждый чих if err!=nil

      Мне кажется более ненормальным разбивать блог на столько микросервисов, которые друг от друга не отличаются.


      1. VolCh
        29.10.2019 22:50

        Я так думаю что это просто пример


      1. xkondorx
        29.10.2019 08:39

        Микросервисы могут не сильно отличаться, но некоторые могут быть более нагруженны чем другие. Намного легче оптимизировать маленький микросервис, пусть и путем его переусложнения, чем то-же самое делать с более универсальным вариантом. Есть еще нюанс с горизонтальным масштабированием, когда не потребуется увеличивать количество экземпляров тяжелого сервиса часть функций которого тянется ненужным балластом. В принципе для блога, применение микросервисов это немного не целевое использование, но это же пример)


      1. time2rfc
        29.10.2019 12:27

        каждый чих на if err не причина допускать такую огромную внутреннюю вложенность


  1. ferocactus
    29.10.2019 16:25
    +1

    Во сколько мифических человеко-месяцев (и чьих — junior..senior, web designer, backend...frontend developer, DevOps engineer) Вы оцениваете реализацию данного блога на контейнерах?


  1. piton_nsk
    29.10.2019 21:02

    Я правильно понимаю, что AccessLogInterceptor и DbConnect скопипашены между всеми сервисами? А если удалить пост, то комментарии остаются или если удалить категорию, то остаются посты?