ВНИМАНИЕ!

Большая часть контента статьи представлена как комментарии в коде!

Введение

Изначально этот проект был всего лишь экспериментом с libffi и можно ли сделать с помощью него полноценный P2P RPC. Вроде бы получилось :-) И в здесь я попытаюсь рассказать что я сделал и какие у этого rpc особенности.

Особенности

Самое главное что дает этот RPC это возможность использовать готовые функции без их переписывания или оберток(под обертки попадает и IDL, которого здесь нет), но есть некоторые ограничения, например структуры или 2+D массивы, для них тут существует специальные типы. Ну и главная особенность, возможность отправлять часть аргументов обратно на клиент, что вызовет обновление их оригиналов на клиенте.

Протокол

Протокол не был особенно продуман но свои задачи выполнял, все сетевое взаимодействие завязано на 2 функциях

ssize_t rpcmsg_write_to_fd(struct rpcmsg* msg, int fd); //Пишет сообщение в сокет
int get_rpcmsg_from_fd(struct rpcmsg* msg ,int fd);  //Получает сообщение из сокета

/*Структура которую эти функции шлют в сокет*/
struct rpcmsg{
    enum rpcmsg_type msg_type; //тип сообщения
    uint64_t payload_len;      //Длина полезных данных в байтах
    char* payload;             //Сами полезные данные
    uint8_t payload_crc;       //Самая тупейшая контрольная сумма в мире,crc = crc ^ msg->payload[i] ^ i;
};

Я не выкладываю исходники этих функций в статью, ибо мне стыдно за них, но они они работают и выполняют свою задачу.

Сам протокол (до начала работы цикла обработки запросов) достаточно прост:

А вот сам протокол обмена данных между клиентом из его потоком на сервере будет чуть-чуть интереснее, но сначала про аутентификацию, она реализована через хэш от пароля и его поиска в хэштаблице

    if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) == 0 && gotmsg.msg_type == AUTH && gotmsg.payload != NULL){
        /*понимаем что клиент нам отослал все правильно и не словил timeout*/
        struct rpctype type; //Сюда будет распакован упакованный uint64_t хеш
        uint64_t hash = type_to_uint64(arr_to_type(gotmsg.payload,&type)); //распаковавываем gotmsg.payload в тип а этот тип в uint64_t
        free(gotmsg.payload); //Очищаем уже лишние данные,да free тут будет ОЧЕНЬ много
        if(hash != 0){        //хеш правильный? / Был упакован validный тип?
            int* gotusr = NULL; //указатель на привелегии пользователя
            if(hashtable_get_by_hash(thrd->serv->users,hash,(void**)&gotusr) == 0){
                /*Получаем эти привелегии по хешу пароля*/
                assert(gotusr); //проверка что все правильно, потому что если не правильно то нельзя быть увереным что в сервере все хорошо
                is_authed = 1;  //ставим переменную что пользователь аутентифицирован
                user_perm = *gotusr;  //устанавлием привелегии в локальную переменную,они влиляют можно ли вызвать конкретную функцию конкретному клиенту
            }
        }
      free(type.data); //Да, опять очистка, но теперь уже чистим наш распакованный тип
    }else printf("%s: no auth provided\n",__PRETTY_FUNCTION__); // если клиент прислал что-то не то или отключился

Теперь сам протокол клиента, он происходит в цикле и представляет из себя "простенький" конечный автомат)

        if(is_authed){
            /* говорим клиенту: "Все отлично,бро, вот тебе тут твои привелегии только хз зачем они"*/
            repl.msg_type = OK;
            struct rpctype perm = {0};
            int32_to_type(user_perm,&perm);
            repl.payload = malloc((repl.payload_len = type_buflen(&perm)));
            assert(repl.payload);
            type_to_arr(repl.payload,&perm); //тут переводится rpcтип в массив
            rpcmsg_write_to_fd(&repl,thrd->client_fd); repl.msg_type = 0;
            free(repl.payload);
            free(perm.data);
            repl.payload = NULL;
            repl.payload_len = 0;
            /*=====================================================================================*/
            printf("%s: auth ok, OK is replied to client\n",__PRETTY_FUNCTION__);
            while(thrd->serv->stop == 0){   /*крутимся пока сервер "включен"*/
                /*установка всех локальных переменных*/
                repl.msg_type = 0; repl.payload = NULL; repl.payload_len = 0; 
                gotmsg.msg_type = 0; gotmsg.payload = NULL; gotmsg.payload_len = 0;
                struct rpccall call = {0}; struct rpcret ret = {0};
                /*===================================*/
                if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) < 0) {printf("%s: client disconected badly\n",__PRETTY_FUNCTION__);goto exit;}
                /*Получаем и обрабрабатываем что хочет клиент*/
                switch(gotmsg.msg_type){
                    case PING:
                                    //это сообщение от клиента просто чтобы сбросить timeout
                                    free(gotmsg.payload);
                                    break;
                    case DISCON:
                                    //обработка "нормального отключения" клиента,она также выходит из потока
                                    free(gotmsg.payload);
                                    printf("%s: client disconnected normaly\n",__PRETTY_FUNCTION__);
                                    goto exit;
                    case CALL:
                                    //ГЛАВНОЕ и ЕДИНСТВЕННОЕ не служебное сообщение которое обрабатывает сервер
                                    
                                    //здесь из gotmsg.payload распаковывается структура вызова функции, в ней находятся аргументы и их количество + имя функции
                                    if(buf_to_rpccall(&call,gotmsg.payload) != 0 ){
                                        printf("%s: internal server error or server closing\n",__PRETTY_FUNCTION__);
                                        rpctypes_free(call.args,call.args_amm);
                                        free(call.fn_name);
                                        free(gotmsg.payload);
                                        goto exit;
                                    }
                                    free(gotmsg.payload);
                                    //проверяем что сервер не находится в состояние изменения (регистрация или удалении функции)
                                    pthread_mutex_lock(&thrd->serv->edit); pthread_mutex_unlock(&thrd->serv->edit);

                                    //получаем структуру функции которая будет использоваться в функции вызова
                                    /*
                                    struct fn{
                                        char* fn_name; //имя функции
                                        void* fn; //libffi;
                                        uint8_t nargs; //кол-во аргументов
                                        enum rpctypes rtype; //возращаемый тип
                                        enum rpctypes* argtypes; //типы аргументов в RPC формате
                                        ffi_type** ffi_type_free;  // заглушка для высвобождения, не спрашивайте
                                        ffi_cif cif;               //cif от libffi, нужен для вызова функции, для большего смотрите в доки libffi
                                        void* personal;            //для PSTORAGE
                                        int perm;                  //минимальные привелегии для вызова функции
                                    };
                                    */
                                    struct fn* cfn = NULL;hashtable_get(thrd->serv->fn_ht,call.fn_name,strlen(call.fn_name) + 1,(void**)&cfn);
                                    if(cfn == NULL){
                                        repl.msg_type = NOFN;
                                        printf("%s: '%s' no such function\n",__PRETTY_FUNCTION__,call.fn_name);
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        free(call.fn_name); rpctypes_free(call.args,call.args_amm);
                                        break;
                                    //проверка привелегий клиента, -1 позволяет вызывать все функции!
                                    }else if(cfn->perm > user_perm && user_perm != -1){
                                        repl.msg_type = LPERM;
                                        printf("%s: low permissions, need %d, have %d\n",__PRETTY_FUNCTION__,cfn->perm,(int)user_perm);
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        break;
                                    }else repl.msg_type = OK;

                                    //Отдаем сгенерированный предыдущими проверками ответ клиенту
                                    if(rpcmsg_write_to_fd(&repl,thrd->client_fd) < 0){
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        printf("%s: client connection closed\n",__PRETTY_FUNCTION__);
                                        goto exit;
                                    }
                                    //получаем от клиента ответ
                                    if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) < 0) {
                                        free(gotmsg.payload);
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        printf("%s: client disconected badly\n",__PRETTY_FUNCTION__);
                                        goto exit;
                                    }
                                    free(gotmsg.payload);
                                    if(gotmsg.msg_type == NONREADY){printf("%s: client nonready\n",__PRETTY_FUNCTION__);free(call.fn_name); rpctypes_free(call.args,call.args_amm); break;} // я понятия не умею зачем оно тут -____- Клиент его не использует
                                    if(gotmsg.msg_type == READY){
                                        //клиент готов и тут начинается вызов функции
                                        repl.msg_type = RET;
                                        int err = 0;
                                        int callret = 0;
                                        //вызов функции, и обработка ошибок
                                        if((callret = __rpcserver_call_fn(&ret,thrd->serv,&call,cfn,&err)) != 0 && err == 0){
                                            free(call.fn_name);
                                            rpctypes_free(call.args,call.args_amm);
                                            printf("%s: internal server error\n",__PRETTY_FUNCTION__);
                                            goto exit;
                                        }else if(callret != 0 && err != 0){
                                            //Если клиент выдал не те аргументы(отличные от прототипа функции)
                                            repl.msg_type = BAD;
                                            printf("%s: client provided wrong arguments\n",__PRETTY_FUNCTION__);
                                            rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                            free(call.fn_name);
                                            rpctypes_free(call.args,call.args_amm);
                                            break;
                                        }
                                        //отправка rpcret и очистка
                                        free(call.fn_name);
                                        repl.payload = rpcret_to_buf(&ret,&repl.payload_len); //упаковка return функции и возвращаемых аргументов
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        rpctypes_free(ret.resargs,ret.resargs_amm);
                                        free(repl.payload);
                                        if(ret.ret.data) free(ret.ret.data);
                                        break;
                                    }
                                    free(gotmsg.payload);
                                    free(call.fn_name);
                                    rpctypes_free(call.args,call.args_amm);
                                    printf("%s: client bad reply\n",__PRETTY_FUNCTION__);
                                    goto exit;
                    default:
                            free(gotmsg.payload);
                            free(call.fn_name);
                            rpctypes_free(call.args,call.args_amm);
                            printf("%s: client sent non call or disconected badly(%d), exiting\n",__PRETTY_FUNCTION__,gotmsg.msg_type);
                            goto exit;
                }
            } //всякие ошибки
        }else {repl.msg_type = BAD;rpcmsg_write_to_fd(&repl,thrd->client_fd);printf("%s: client not passed auth\n",__PRETTY_FUNCTION__);}
    } else printf("%s: no auth provided\n",__PRETTY_FUNCTION__);

exit:
    //главный кусок завершения потока чтобы не дублировать код еще больше)
    if(thrd->serv->stop == 1) printf("%s: server stopping, exiting\n",__PRETTY_FUNCTION__);
    struct rpcmsg lreply = {DISCON,0,NULL,0};
    rpcmsg_write_to_fd(&lreply,thrd->client_fd);
    close(thrd->client_fd);
    thrd->serv->clientcount--;
    free(thrd);
    pthread_detach(pthread_self());
    pthread_exit(NULL);

Вызов функции

Этим занимается функция __rpcserver_call_fn, она держит все задачи по распаковке аргументов, упаковки возвращаемых аргументов, упаковки значения возвращаемых функцией, эта функция базируется на libffi (это вы уже могли заметить в struct fn). Там ОЧЕНЬ много бойлерплейт кода, и я не особо знаю что тут говорить, поэтому просто покажу и распишу её работу там

int __rpcserver_call_fn(struct rpcret* ret,struct rpcserver* serv,struct rpccall* call,struct fn* cfn, int* err_code){
    void** callargs = calloc(cfn->nargs, sizeof(void*)); //создается массив аргументов для вызова из libffi
    assert(callargs);
    uint8_t j = 0;
    struct tqueque* rpcbuff_upd = tqueque_create();
    struct tqueque* rpcbuff_free = tqueque_create();
    struct tqueque* rpcstruct_upd = tqueque_create();
    struct tqueque* _rpcstruct_free = tqueque_create();
    assert(rpcbuff_upd);
    assert(rpcbuff_free);
    //создаем из аргументов прототип и проверяем его
    enum rpctypes* check = rpctypes_get_types(call->args,call->args_amm);
    if(!is_rpctypes_equal(cfn->argtypes,cfn->nargs,check,call->args_amm)){
        //посылаем такой вызов функции куда-подальше иначе свалится весь сервер
        *err_code = 7;
        free(check);
        goto exit;
    }
    free(check);
    assert(callargs != NULL && call->args_amm != 0);
    for(uint8_t i = 0; i < cfn->nargs; i++){
      /*Это специальные типы данных которые не существуют для клиента,они предоставляют локальное хранилище для функций или глобальное хранилище для функции*/
        if(cfn->argtypes[i] == PSTORAGE){
            callargs[i] = calloc(1,sizeof(void*));
            assert(callargs[i]);
            *(void**)callargs[i] = cfn->personal;
            continue;
        }
        if(cfn->argtypes[i] == INTERFUNC){
            callargs[i] = calloc(1,sizeof(void*));
            assert(callargs[i]);
            *(void**)callargs[i] = serv->interfunc;
            continue;
        }
      /*====================================================================================================================================================*/
        if(j < call->args_amm){
            //здесь идет распаковка аргументов по типам
            if(call->args[j].type == RPCBUFF){
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(void**)callargs[i] = unpack_rpcbuff_type(&call->args[j]);
                if(call->args[j].flag == 1)
                    tqueque_push(rpcbuff_upd,*(void**)callargs[i],1,NULL);
                else        //это костыль для обновления распакованного оригинала ибо он создает полностью новый кусок памяти в отличии от STR и SIZEDBUF
                    tqueque_push(rpcbuff_free,*(void**)callargs[i],1,NULL);
                free(call->args[j].data);
                call->args[j].data = NULL;
                j++;
                continue;
            }
            if(call->args[j].type == RPCSTRUCT){
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(void**)callargs[i] = unpack_rpcstruct_type(&call->args[j]);
                if(call->args[j].flag == 1)
                    tqueque_push(rpcstruct_upd,*(void**)callargs[i],1,NULL);
                else //тоже самое но для rpcstruct
                    tqueque_push(_rpcstruct_free,*(void**)callargs[i],1,NULL);
                free(call->args[j].data);
                call->args[j].data = NULL;
                j++;
                continue;
            }
            if(call->args[j].type == SIZEDBUF){
                //это тоже полу-специальный тип но он предстовляется для функции как char*, uint64_t
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                uint64_t buflen = 0;
                *(void**)callargs[i] = unpack_sizedbuf_type(&call->args[j],&buflen);
                callargs[++i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(uint64_t*)callargs[i] = buflen;
                j++;
                continue;
            }
            if(call->args[j].type == STR){  //распаковка "обычных" типов, поэтому я пропущу остальные
                callargs[i] = calloc(1,sizeof(char*));
                assert(callargs[i]);
                *(void**)callargs[i] =  unpack_str_type(&call->args[j]);
                j++;
                continue;
            }
            .........подобный STR код..........
        } else {*err_code = 7; goto exit;}
    }
    //кусок памяти под return функции
    void* fnret = NULL;
    if(cfn->rtype != VOID){
        fnret = calloc(1,sizeof(uint64_t)); //Самый большой тип стандартоного C(кроме структур), это шаг для отвязки от 64битных платформ в некотором роде
        assert(fnret);
    }
    ffi_call(&cfn->cif,cfn->fn,fnret,callargs); // наконец-то вызов функции!
    ret->resargs = rpctypes_clean_nonres_args(call->args,call->args_amm,&ret->resargs_amm); //понимаем какие аргументы нужно переслать обратно а какие очистить
    enum rpctypes rtype = cfn->rtype; //Просто копируем тип который вернет функция чтобы не долбится в heap(Это все отговорки, мне просто было лень писать длинное название)
    ret->ret.type = VOID; //инициализация к void, он тут останется если не распакуется другой тип
    if(rtype != VOID){
        if(rtype == CHAR)    char_to_type(*(char*)fnret,&ret->ret); //Упаковка возврата функции в rpcтип
        ...........Остальные типы упаковываются также как и CHAR...........

        if(rtype == STR && *(void**)fnret != NULL){
            create_str_type(*(char**)fnret,0,&ret->ret); //упаковываем строку
            free(*(char**)fnret); //чистим строку
        }else if(rtype == STR && *(void**)fnret == NULL){  //если функция вернула NULL то возвращаемый типо void
            ret->ret.type = VOID;
        }
    }
    /*Перепаковка расспакованных rpcbuff и rpcstruct т.к они не достают данные из типа а распаковывают их и создают свои структуры*/
    for(uint8_t i = 0; i < ret->resargs_amm; i++){
        if(ret->resargs[i].type == RPCBUFF){
            struct rpcbuff* buf = tqueque_pop(rpcbuff_upd,NULL,NULL);
            if(!buf) break;
            create_rpcbuff_type(buf,ret->resargs[i].flag,&ret->resargs[i]);
            _rpcbuff_free(buf);
        }
        if(ret->resargs[i].type == RPCSTRUCT){
            struct rpcstruct* buf = tqueque_pop(rpcstruct_upd,NULL,NULL);
            if(!buf) break;
            create_rpcstruct_type(buf,ret->resargs[i].flag,&ret->resargs[i]);
            rpcstruct_free(buf);free(buf);
        }
    }
    /*===========================================================================================================================*/
    tqueque_free(rpcbuff_upd);
    tqueque_free(rpcstruct_upd);
    void* buf = NULL;
    while((buf = tqueque_pop(rpcbuff_free,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(_rpcstruct_free,NULL,NULL)) != NULL) {rpcstruct_free(buf);free(buf);}
    tqueque_free(rpcbuff_free);
    tqueque_free(_rpcstruct_free);
    free(fnret);
    for(uint8_t i = 0; i < cfn->nargs; i++){
        free(callargs[i]);
    }
    free(callargs);
    return 0;
exit:
    while((buf = tqueque_pop(rpcbuff_free,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(rpcbuff_upd,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(rpcstruct_upd,NULL,NULL)) != NULL) {rpcstruct_free(buf); free(buf);}
    while((buf = tqueque_pop(_rpcstruct_free,NULL,NULL)) != NULL) {rpcstruct_free(buf);free(buf);}
    tqueque_free(rpcbuff_free);
    tqueque_free(rpcbuff_upd);
    for(uint8_t i = 0; i < cfn->nargs; i++){
        free(callargs[i]);
    }
    free(callargs);
    return 1;

API сервера

Здесь я расскажу про Api регистрации функций и общую для клиента и сервера систему типов, она не идеальная и есть одно болезненное ограничение(пока-что? Временно?) Нельзя возвращать RPCBUFF и RPCSTRUCT. Так-же нельзя вернуть SIZEDBUF ибо нельзя нормально указать откуда брать размер для его упаковки.

Система типов по большей части состоит из привязок к stdint.h из Си:

enum rpctypes{
  VOID = 0, //Нельзя использовать в аргументах функции
  CHAR = 1,
  STR = 2, //char* строки
  UINT16 = 3,
  INT16 = 4,
  UINT32 = 5,
  INT32 = 6,
  UINT64 = 7,
  INT64 = 8,
  FLOAT = 9,
  DOUBLE = 10,
  RPCBUFF = 11, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! многомерный массив, размерность которого не известна коду сервера(массив в массиве в массиве......)
  SIZEDBUF = 12, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! char* массив который передастся в функцию как void*,uint64_t
  PSTORAGE = 13, //локальный для одной функции(но для всех ее итераций глобальный) кусок памяти
  INTERFUNC = 14, // Глобальный для всех функций и ее итераций кусок памяти, нужно в ручную указывать через модифицирований struct rpcserver, rpcserver->interfunc = какой_то_указатель
  RPCSTRUCT = 15, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! Структура аля хештаблица
};

Для вызова функции в сервере используется прототип функции в в виде этого самого enum из него генерирует CIF из libffi. Тут будет пример с регистрацией функции на сервере:

void* pstorage = NULL; //персональное хранилище функции
int perm = 1235; //уровень привелегий необходимый для вызова функции
struct rpcserver* serv = rpcserver_create(1234); //создаем сервер на порту 1234
rpcserver_load_keys("keys.txt"); //загрузка ключей в формате "ключ"разрешение_как_число
enum rpctypes fn_proto[] = {STR}; //прототип функции (ПОМЕТКА: SIZEDBUF на сервере объявляется как SIZEDBUF,UINT64 а на клиенте как SIZEDBUF)
rpcserver_register_fn(serv, example_function, "example_function",
                      VOID, fn_proto,
                      sizeof(fn_proto) / sizeof(fn_proto[0]), pstorage, perm); //регистрация функции
rpcserver_start(serv); //запуск сервера

Так-же конечно же есть функции rpcserver_stop() и rpcserver_free(), первая просто останавливает сервер а вторая останавлиет и высвобождает память(не дает запустить сново через rpcserver_start)

Клиент

Клиент в моем rpc получился очень простой, в нем всего лишь 3 функции для конечного пользователя. Эти функции покрывают все нужные как по мне задачи а именно: подключение, вызов функции, отключение. Тут нет мусора(чисто для меня, не бейте меня за мое мнение) аля передача данных без вызова функций.


int rpcserver_connect(char* host,char* key,int portno,struct rpccon* con){
   if(!host || !key)
      return -1;
   int sockfd;
   struct sockaddr_in serv_addr;
   struct hostent *server;
   con->stop = 0;

   .......Бойлерплейт для настройки сокета........

   struct rpcmsg req = {0};
   struct rpcmsg ans = {0};
   req.msg_type = CON; //Это то самое магическое число из начала статьй(Число:53)
   if(rpcmsg_write_to_fd(&req,sockfd) == -1){
      close(sockfd);
      return 2;
   }
   struct rpctype auth = {0};
   uint64_to_type(_hash_fnc(key,strlen(key) + 1),&auth); //генерируем uint64_t тип из хэша пароля
   req.msg_type = AUTH;
   req.payload = malloc((req.payload_len = type_buflen(&auth)));
   assert(req.payload);
   type_to_arr(req.payload,&auth);  //запихиваем сгенерированый тип в req.payload
   if(rpcmsg_write_to_fd(&req,sockfd) == -1){  //пишем это в сокет!
      free(auth.data);
      free(req.payload);
      close(sockfd);
      return 2;
   }
   free(auth.data);
   free(req.payload);
   if(get_rpcmsg_from_fd(&ans,sockfd) != 0){
      close(sockfd);
      return 3;
   } //получаем ответ от сервера и если он не OK отключаемся от сервера
   if(ans.msg_type != OK) {
      free(ans.payload);
      close(sockfd);
      return 4;
   }
   //получаем уровень разрешений из ответа сервера
   arr_to_type(ans.payload,&auth);
   con->perm = type_to_int32(&auth);
   free(ans.payload);
   free(auth.data);
   con->fd = sockfd;
   pthread_mutex_init(&con->send,NULL);
   pthread_create(&con->ping,NULL,rpccon_keepalive,con); //создается keep-alive поток который будет сбрасывать timeout на сервере
   return 0;
}

Функция rpcclient_call сама упаковывает аргументы из variable arguments, сама распаковывает return и возвращенные аргументы(ВНИМАНИЕ: запрет на return rpcbuff,rpcstruct пока-что только на сервере из-за потенциальной утечки памяти). Сразу покажу затравку в виде API клиента:

struct rpccon con;
rpcserver_connect("localhost", "my_key", 1234, &con); //подключаемся к серверу на localhost:1234 по ключу "my_key"
char* arg = "Hello, server!";
enum rpctypes proto[] = {STR}; //прототип функции на клиенте, нужен для парсинга varargs в rpcclient_call
rpcclient_call(&con, "example_function", proto, NULL, 1, NULL, arg); //вызов "example_function"
rpcclient_discon(&con); //остановка подключения

А вот и код с пояснением того что он делает

int rpcclient_call(struct rpccon* con,char* fn,enum rpctypes* rpctypes,char* flags, int rpctypes_len,void* fnret,...){
   //система флагов используется для отслеживание нужно ли пересылать сигнал назад или нет, 1 если нужно чтобы он был переслан, 0 чтобы нет, NULL - все аргументы не пересылаются
   pthread_mutex_lock(&con->send);  //блокируем mutex чтобы keep-alive не сбивал конечный автомат клиентского потока
   va_list vargs;
   void** resargs_upd = NULL; //массив на указатели на оригиналы аргументов
   uint8_t resargs_updl = 0;
   struct rpcret ret = {0};
   if(flags)
      for(uint8_t i = 0; i < rpctypes_len; i++)
         if(flags[i] == 1)
            resargs_updl++; //считаем сколько аргументов будет пересланно, чтобы создать массив нужного размера под указатели на оригиналы
   if(resargs_updl != 0){
      resargs_upd = calloc(resargs_updl,sizeof(void*));
      assert(resargs_upd);
   }
   va_start(vargs, fnret);
   struct rpctype* args = calloc(rpctypes_len,sizeof(*args));
   assert(args);
   uint8_t j = 0;
   for(uint8_t i = 0; i < rpctypes_len; i++){
      if(rpctypes[i] == CHAR){
         char ch = va_arg(vargs,int); // получаем данные из varargs
         char_to_type(ch,&args[i]);
         continue;
      }
      .......Код упаковки остальных типов такой-же как CHAR........
      if(rpctypes[i] == SIZEDBUF){
         char* ch = va_arg(vargs,char*);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = ch; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         uint64_t buflen = va_arg(vargs,unsigned int); //получаем еще и длину SIZEDBUF
         create_sizedbuf_type(ch,buflen,flag,&args[i]); //пакуем в sizedbuf
         continue;
      }
      if(rpctypes[i] == RPCBUFF){
         struct rpcbuff* buf = va_arg(vargs,struct rpcbuff*);
         assert(flags);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = buf; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         create_rpcbuff_type(buf,flag,&args[i]);
         continue;
      }
      if(rpctypes[i] == RPCSTRUCT){
         struct rpcstruct* buf = va_arg(vargs,struct rpcstruct*);
         assert(flags);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = buf; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         create_rpcstruct_type(buf,flag,&args[i]);
         continue;
      }
   }
   struct rpccall call = {fn,rpctypes_len,args}; //собираем вызов функции
   struct rpcmsg req = {0};
   struct rpcmsg ans = {0};
   req.msg_type = CALL;
   req.payload = rpccall_to_buf(&call,&req.payload_len); //собираем struct rpccall в массив и запихиваем в сообщение
   if(rpcmsg_write_to_fd(&req,con->fd) < 0){ //шлём наш вызов функции
      rpctypes_free(args,rpctypes_len);
      free(req.payload);
      pthread_mutex_unlock(&con->send);
      return 1;
   }
   free(req.payload);
   rpctypes_free(args,rpctypes_len);
   if(get_rpcmsg_from_fd(&ans,con->fd) != 0){ //получаем ответ от сервера на наш запрос на вызов функции
      pthread_mutex_unlock(&con->send);
      return 2;
   };
   if(ans.msg_type != OK){   //если не OK возвращаемся из функции с ошибкой
      pthread_mutex_unlock(&con->send);
      return ans.msg_type;
   }
   memset(&req,0,sizeof(req));
   req.msg_type = READY;
   rpcmsg_write_to_fd(&req,con->fd);
   get_rpcmsg_from_fd(&ans,con->fd);
   if(ans.msg_type != RET){
      if(ans.msg_type == DISCON){  //если сервер прислал что он отключается, то мы тожет отключаемся и стопаем keep-alive поток
          close(con->fd);
          con->stop = 1;
          pthread_mutex_unlock(&con->send);
          return DISCON;
      }else return ans.msg_type;
   }
   pthread_mutex_unlock(&con->send);
   buf_to_rpcret(&ret,ans.payload); //распаковываем ответ от сервера в struct rpcret(структура с return функции и возвращенными аргументами)
   free(ans.payload);
   assert(resargs_updl == ret.resargs_amm);  // на всякий случай
   /*Это код для обновления аргументов, он распаковывает тип и записывает его в оригинал*/
   for(uint8_t i = 0; i < ret.resargs_amm; i++){
      if(ret.resargs[i].type == STR){
         char* new = unpack_str_type(&ret.resargs[i]);
         assert(new);
         strcpy(resargs_upd[i],new);
      }
      if(ret.resargs[i].type == SIZEDBUF){
         uint64_t cpylen = 0; //длина для memcpy
         char* new = unpack_sizedbuf_type(&ret.resargs[i],&cpylen); //распаковка SIZEDBUF
         assert(new); 
         memcpy(resargs_upd[i],new,cpylen); //Копируем в оригинал, тут нет утечки памяти так-как ret.resargs буду очищенны позже
      }
      if(ret.resargs[i].type == RPCBUFF){
         struct rpcbuff* new = unpack_rpcbuff_type(&ret.resargs[i]);
         assert(new);
         __rpcbuff_free_N_F_C(resargs_upd[i]); //функция которая удаляет rpcbuff но не очищает сам struct rpcbuff* (только его внутренности)
         memcpy(resargs_upd[i],new,sizeof(struct rpcbuff));
         free(new);
      }
      if(ret.resargs[i].type == RPCSTRUCT){
         struct rpcstruct* new = unpack_rpcstruct_type(&ret.resargs[i]);
         rpcstruct_free(resargs_upd[i]); //this is not heap-use-after-free since rpcstruct_free ONLY freeding struct internals
         memcpy(resargs_upd[i],new,sizeof(struct rpcstruct));
         free(new);
      }
   }
   enum rpctypes type = ret.ret.type;
   assert(fnret != NULL && type != VOID);
   if(type == CHAR){
      char ch = type_to_char(&ret.ret);
      *(char*)fnret = ch;
   }
   .......Весь код распаковки такой-же как CHAR.......
  
   free(resargs_upd);
   free(ret.ret.data);
   rpctypes_free(ret.resargs,ret.resargs_amm);
   return 0;
}

Вишенка на подгорелом тортике?

Это я сейчас про RPCBUFF. Это одна из самых интересных фишек моего RPC. Как я уже и говорил это массив в массиве и так далее. У него как по мне самый интересный код здесь.

struct rpcbuff* rpcbuff_create(uint64_t* dimsizes,uint64_t dimsizes_len,uint64_t lastdim_len){
    struct __rpcbuff_el* md_array = calloc(1,sizeof(struct __rpcbuff_el)); //создаем стартовый элемент
    struct rpcbuff* cont = NULL;
    if(dimsizes != NULL || dimsizes_len == 0){
        cont = calloc(1,sizeof(struct rpcbuff)); //создание struct rpcbuff
        assert(cont);
    }
    assert(md_array);
    if(dimsizes_len > 0){
        //копируем dimsizes в struct rpcbuff
        cont->dimsizes = calloc(dimsizes_len, sizeof(uint64_t));
        assert(cont->dimsizes);
        memcpy(cont->dimsizes,dimsizes, sizeof(uint64_t) *dimsizes_len);
    }
    cont->dimsizes_len = dimsizes_len;
    cont->start = md_array;
    cont->lastdim_len = lastdim_len;
    if(dimsizes != NULL && dimsizes_len > 0){
        struct tqueque* que = tqueque_create();
        assert(que);
        /*Сам алгоритм аллокации, так как код писал давно не смогу вспомнить что он точно делает*/
        assert(tqueque_push(que,md_array,sizeof(struct __rpcbuff_el*),NULL) == 0);
        for(uint64_t i = 0; i < dimsizes_len; i++){
            struct __rpcbuff_el* cur = NULL;
            uint64_t iter = tqueque_get_tagamm(que,NULL);
            for(uint64_t j = 0; j < iter; j++){
                cur = tqueque_pop(que,NULL,NULL);
                cur->childs = calloc(dimsizes[i],sizeof(struct __rpcbuff_el));
                assert(cur->childs);
                for(uint64_t k = 0; k <dimsizes[i]; k++){
                    assert(tqueque_push(que,&cur->childs[k], sizeof(struct __rpcbuff_el*), NULL) == 0);
                }
            }
        }
        struct __rpcbuff_el* cur = NULL;
        while((cur = tqueque_pop(que,NULL,NULL)) != NULL){
            cur->endpoint = (char*)0xCAFE;
        }
        tqueque_free(que);
        return cont;
      /*=======================================================================================*/
    }
    md_array->endpoint = (char*)0xCAFE;
    md_array->elen = 0;
    assert(md_array->endpoint);
    return cont;
}

Пример использования rpcbuff и его API:

uint64_t dimsizes[] = {3, 4}; //размеры массива
struct rpcbuff* my_buff = rpcbuff_create(dimsizes, 2, 1); //создаем массив, 1 это заглушка
int data[] = {1, 2, 3, 4};
uint64_t index[] = {0, 0};
rpcbuff_pushto(my_buff, index, 2, (char*)data, sizeof(data));

Заключение?

В этой статье я попытался описать свой проект и показать его внутренности. Если кому-то нужны полные исходные код и API то они будут тут. Так-же RPC получился достаточно портируемым и его получилось запустить с небольшими изменениями на esp32s3 (пришлось компилировать libffi для xtensa linux и линковать в esp-idf проект) и без изменений он запустился на arm64 debian

Комментарии (8)