Сегодня захотелось написать небольшую заметку об асинхронной работе с PostgreSQL в C. Мотивы просты: для небольшой утилитки встала необходимость реализовать такой функционал, гугл на тему понятных и рабочих примеров предательски молчал (нашелся только пример в pqxx для C++ — там есть метод асинхронного соединения и pipeline-класс для запросов), а официальная документация по этому вопросу хоть и весьма подробная, но не слишком структурированная, да и сам алгоритм работы с библиотекой libpq в асинхронном режиме имеет много подводных камней. Поэтому разобравшись в вопросе хочется поделиться результатами с общественностью, на случай если кому-то это будет полезным.

Итак, будем считать, что рассказывать что такое PostgreSQL никому не нужно, и чем синхронный (блокирующий) режим работы отличается от асинхронного читатели тоже примерно понимают. Кстати, кроме первого и очевидного достоинства асинхронных вызовов (они не блокируют ввод-вывод и выполнение потока, что освобождает от необходимости создавать дополнительные треды, синхронизировать их, и т.д.), в случае с Postgre есть еще один плюс: обычный метод PQexec позволяет за один раз получить результат выполнения только одного SQL-запроса, а асинхронные функции libpq такого ограничения не имеют.

Как я уже говорил, у libpq в асинхронном режиме есть довольно много подводных камней. Бывают библиотеки, где асинхронный режим реализован красиво и завершенно (разработчик вызывает абсолютно любой асинхронный метод, назначив ему callback, а после этого достаточно просто «вращать» event loop библиотеки (бесконечно или по таймеру вызывать метод), а далее уже сама библиотека позаботится об обработке команд в нужной последовательности, отлове событий и вызове колбэков), то у PostgreSQL модель работы другая.

Имеется большое множество команд для асинхронных соединений и запросов, которые должны быть вызываны в строго определенной последовательности в зависимости от текущего состояния и результата предыдущей операции, плюс необходимо вручную проверять готовность сокетов. Достаточно в каком-то месте ошибиться и вызвать функцию не вовремя или наоборот не вызвать, или попробовать обратиться к занятому сокету, и это может привести к блокировке потока (в худшем случае — бесконечной, то есть зависанию). И еще библиотека в асинхронном режиме почти никак не контроллирует таймаут операций — обо всем нужно будет заботиться самим.

В официальной документации большая часть информации по работе в асинхронном режиме приведена в следующих двух разделах: раз и два.

Ну а мы перейдем сразу к делу.

Чтобы установить соединение с БД в асинхронном режиме, порядок действий должен быть примерно таков:

1. Выделить память под структуру соединения и начать подключение методом PQconnectStart()

2. Запомнить текущее время, чтобы можно было в дальнейшем контроллировать таймаут операции.

3. Проверить успешность подключения, вызвав PQstatus(). Если результат равен CONNECTION_BAD, значит инициализация была не успешной (Например, ошибка в строке подключения или не удалось аллоцировать память), иначе же можно продолжать

4. Проверить методом PQconnectPoll() текущий статус подключения.

Возможные результаты:

PGRES_POLLING_WRITING - ожидание завершения отправки данных из сокета
PGRES_POLLING_READING - ожидание завершения чтения данных из сокета
PGRES_POLLING_FAILED - произошла ошибка во время обмена данными с сервером
PGRES_POLLING_OK - подключение выполнено успешно

5. В случае статуса PGRES_POLLING_WRITING или PGRES_POLLING_READING необходимо получить используемый сокет подключения методом PQsocket() и системными функциями select() или poll() проверять его доступность для записи или чтения данных до тех пор пока он не освободится, после чего повторить пункт 4 до достижения результата OK или FAILED, либо до истечения таймаута (не забываем, таймаут нужно проверять вручную).

Если следующий вызов PQconnectPoll() будет _до_ освобождения сокета, поток заблокируется, и это надо иметь в виду.

После всего этого, если все прошло успешно, мы получаем установленное соединение с БД. Порядок действий для выполнения SQL-запросов будет выглядить же примерно так:

1. Подготовить запрос к отправке на сервер командой PQsendQuery().

2. Установить неблокирующий режим для отправки запроса методом PQsetnonblocking(), потому что по умолчанию в libPq асинхронно выполняется только чтение, а не запись в сокет.

3. Выполнять PQflush() до тех пор пока она не выдаст 0 (запрос отправлен успешно) или -1 (ошибка).

4. Получить активный сокет и проверить его на готовность к чтению через select() или poll(), до тех пор пока он не будет готов к операции.

5. Выполнить PQconsumeInput(). Если функция вернула 0, то произошла ошибка.

6. Выполнить PQisBusy(). Если функция вернула 1, значит обработка запроса или чтения ответа сервера еще не завершено и нужно заново повторить алгоритм начиная с пункта 4.
Ну и не забываем контроллировать таймауты, само собой.

После выполнения всех вышеперечисленных операций, работать с результатами запроса можно как обычно — PQgetResult(), PQgetvalue(), и т.д.

А теперь перейдем к практике. Код на C, однако если захочется обернуть его в класс для использования в программе на C++, то as you wish, всё очень просто.

// Компилировать будем как-то так: gcc pgtest4.c -I/usr/include/postgresql -lpq
#include <libpq-fe.h>   //< Си библиотека для работы PostgreSQL
#include <sys/socket.h> //< setsockopt() и некоторые константы
#include <sys/select.h> //< select()
#include <sys/time.h>   //< gettimeoftheday()
#include <unistd.h>     //< usleep() тоже может пригодиться

#define SOCK_POLL_TIMEOUT   100 // таймаут ожидания освобождения сокета (на сколько можно максимально блокировать основной поток?) в мс

typedef enum {
    DISCONNECTED = 0, 
    CONN_POLLING, 
    CONN_READING,
    CONN_WRITING,
    READY,
    QUERY_SENT,
    QUERY_FLUSHING,
    QUERY_BUSY,
    QUERY_READING,
    CLOSING,
    ERROR   
} pq_state;

typedef enum {
    NO_ERROR = 0,
    ALLOCATION_FAIL,
    POLLING_FAIL,
    READING_FAIL,
    WRITING_FAIL,
    TIMEOUT_FAIL
} pq_error;

struct pqconn_s{
    pq_state state;         //< текущее действие
    PGconn*  conn;          //< указатель на структуру с данными о соединении
    unsigned long start;    //< время начала текущей операции (для таймаута)
    long timeout;           //< таймаут текущей операции
    pq_error error;         //< если случится что-то не то, сюда прилетит код ошибки
};

/**
 * @brief  получить текущеем время
 * @return время в мс
 */
unsigned long time_ms(void)
{
    struct timespec tp;
    // gettimeoftheday() тут использовать нельзя, оно может плавать 
    clock_gettime(CLOCK_MONOTONIC, &tp);
    return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000);
}

/**
 * @brief проверить готовность (свободность) сокета к записи/чтению
 * @param socket_fd - дескриптор интересующего сокета
 * @param rw - 0 если проверяем на чтение, 1 если на запись
 * @return как и select(): -1 = ошибка, 0 - свободен (готов), 1 - занят
 */
int try_socket(int socket_fd, int rw)
{
    fd_set fset;
    struct timeval sock_timeout;

    sock_timeout.tv_sec = 0;
    sock_timeout.tv_usec = SOCK_POLL_TIMEOUT;

    FD_ZERO(&fset);
    FD_SET(socket_fd, &fset);

    setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval));
    //здесь кстати возможно не помешает еще выставить SO_SNDTIMEO. экспериментируйте.
    
    return select(socket_fd + 1, ((!rw) ? &fset : NULL),  ((rw) ? &fset : NULL), NULL, &sock_timeout);
    
}

/**
 * @brief начать процесс подключения к серверу БД
 * @param conninfo - строка подключения к БД
 * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
 * @param timeout - таймаут операции в мс
 * @return 0 - ошибка (можно узнать ее код в s->error), 1 - успех
 */
int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout)
{
    if (!s) return 0;

    if (!conninfo) 
    {
        s->error = ALLOCATION_FAIL;
        return 0;
    }

    s->conn = PQconnectStart(conninfo);
    s->state = CONN_POLLING;
    s->start = time_ms();
    s->timeout = timeout;
    s->error = NO_ERROR;

    ConnStatusType status;

    status = PQstatus(s->conn);
    if (status == CONNECTION_BAD) 
    {
        s->state = ERROR;
        s->error = POLLING_FAIL;
        return 0; 
    }

    return 1;
}

/**
 * @brief начать отправку запроса на сервер БД и получение ответа
 * @param command - SQL-запрос
 * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
 * @param timeout - таймаут операции в мс
 * @return 0 - ошибка, 1 - успех
 */
int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout)
{
    if (s->state != READY)
    {
        return 0;
    }

    if (!PQsendQuery(s->conn, command))
    {
        return 0;
    }

    PQsetnonblocking(s->conn, 0);

    s->state = QUERY_FLUSHING;
    s->start = time_ms();
    s->timeout = timeout;
    s->error = NO_ERROR;

    return 1;
}

/**
 * @brief основной цикл, метод должен вызываться периодично
  * @param s - указатель на структуру pqconn_s с данными о подключении и текущем состоянии
  */
void pgsql_event_loop(struct pqconn_s* s)
{
    if ((s->state == DISCONNECTED) || (s->state == READY))
        return;

    if ((time_ms() - s->start) > s->timeout)
    {
        s->state = CLOSING;
        s->error = TIMEOUT_FAIL;
    }
    
    if (s->state == CONN_POLLING)
    {
        PostgresPollingStatusType poll_result;
        poll_result = PQconnectPoll(s->conn);
            
        if  (poll_result == PGRES_POLLING_WRITING)
            s->state = CONN_WRITING;
            
        if  (poll_result == PGRES_POLLING_READING)
            s->state = CONN_READING;
            
        if  (poll_result == PGRES_POLLING_FAILED)
        {
            s->state = ERROR;
            s->error = POLLING_FAIL;
        }
            
        if  (poll_result == PGRES_POLLING_OK)
            s->state = READY;
    }

    if (s->state == CONN_READING)
    {
        int sock_state = try_socket(PQsocket(s->conn), 0);
        if (sock_state == -1) 
        {
            s->error = READING_FAIL;
            s->state = CLOSING;
        }   
        if (sock_state > 0)     
            s->state = CONN_POLLING;
    }

    if (s->state == CONN_WRITING)
    {
        int sock_state = try_socket(PQsocket(s->conn), 1);
        if (sock_state == -1)
        {
            s->error = WRITING_FAIL;
            s->state = CLOSING;
        }   
        if (sock_state > 0)     
            s->state = CONN_POLLING;
    }

    if (s->state == CLOSING)
    {
        PQfinish(s->conn);
        s->state = ERROR;
    }

    if (s->state == QUERY_FLUSHING)
    {
        int flush_res = PQflush(s->conn);
        if (0 == flush_res)
            s->state = QUERY_READING;
        if (-1 == flush_res)
        {
            s->error = WRITING_FAIL;
            s->state = CLOSING;
        }
            
    }

    if (s->state == QUERY_READING)
    {
        int sock_state = try_socket(PQsocket(s->conn), 0);
        if (sock_state == -1) 
        {
            s->error = READING_FAIL;
            s->state = CLOSING;
        }   
        if (sock_state > 0)     
            s->state = QUERY_BUSY;
    }

    if (s->state == QUERY_BUSY)
    {
        if (!PQconsumeInput(s->conn))
        {
            s->error = READING_FAIL;
            s->state = CLOSING;
        }
        if (PQisBusy(s->conn))
            s->state = QUERY_READING;
        else
            s->state = READY;
    }
}

В начале описываем все необходимые нам состояния и возможные ошибки, и объявляем структуру в которой будут храниться данные подключения и выполняемого действия — указатель на структур PGconn необходимую библиотеке для работы с сервером, состояния автомата, код ошибок (если они будут) и время начала текущей операции (для контроля тайм-аута).

Две маленькие функции time_ms() и try_socket() представляют собой обертки над функциями стандартной библиотеки для получения текущего времени в миллисекундах и проверки сокета на занятость соответственно.

Использовать же все это можно как-то примерно так:

int main(void)
{
    struct pqconn_s s;

    pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000);

    while ((s.state != ERROR) && (s.state != READY))
    {
        pgsql_event_loop(&s);
    }

    if (s.state == ERROR)
    {
        perror("DB connection failed \n");
        return 1;
    }

    pgsql_send_query(&s, "SELECT * FROM history;", 50000);
    while ((s.state != ERROR) && (s.state != READY))
    {
        pgsql_event_loop(&s);
    }

    if (s.state == ERROR)
    {
        perror("DB query failed \n");
        return 1;
    }

    PGresult        *res;
    int             rec_count;
    int             row;
    int             col;
    
    res = PQgetResult(s.conn);
    if (PQresultStatus(res) != PGRES_TUPLES_OK) {
               perror("We did not get any data!\n");
               return 1;
    }
 
    rec_count = PQntuples(res);
 
    printf("Received %d records.\n", rec_count);
 
    for (row=0; row<rec_count; row++) 
    {
        for (col=0; col<3; col++) 
        {
            printf("%s\t", PQgetvalue(res, row, col));
        }
        puts("");
    }
 
    PQclear(res);
}

Понятно дело, что приведенный пример работает по факту все-таки в блокирующем режиме (т.к. происходит принудительно ожидание установки поля state структуры в состояние ERROR или READY), однако как можно догадаться, дело осталось за малым: нужно вместо этого добавить в pgsql_event_loop() вызов callback'ов в случае успешного соединения, получения данных или возникновения ошибки, а event loop крутить вместе с остальными действиями в основном цикле программы или вызывать его по таймеру, и тогда работа с базой будет идти по-настоящему асинхронно.

Искренне надеюсь, что вышеописанное окажется кому-нибудь полезным.

Комментарии (1)


  1. Hixon10
    28.02.2018 22:05

    Ох, как же стыдно. Никогда не думал, что в Postgre есть поддержка асинхронной работы. Всегда выделял тред пул, в котором работал с БД синхронно. Век живи — век учись…

    Спасибо за статью!