Как-то раз встала задача по написанию простого и быстрого многопоточного TCP/IP сервера на C++ и при этом, чтобы работал из под Windows и Linux без требования как-либо изменять код за пределами класса самого сервера. Ранее, на чистом C++ без библиотек вроде Qt, Tcp-сервер не писал, и предвещал себе долгое время мучений с платформо-зависимостью. Но как оказалось всё гораздо проще чем казалось на первый взгляд, ведь в основном интерфейсы сокетов обоих систем похожи как две капли воды и различаются лишь в мелких деталях.
Итак класс сервера и клиента выглядит следующим образом:
TcpServer.h
#ifndef TCPSERVER_H
#define TCPSERVER_H
#include <cstdint>
#include <functional>
#include <thread>
#include <list>
#ifdef _WIN32 // Windows NT
#include <WinSock2.h>
#else // *nix
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#endif
//Буффер для приёма данных от клиента
static constexpr uint16_t buffer_size = 4096;
struct TcpServer {
class Client;
//Тип Callback-функции обработчика клиента
typedef std::function<void(Client)> handler_function_t;
//Статус сервера
enum class status : uint8_t {
up = 0,
err_socket_init = 1,
err_socket_bind = 2,
err_socket_listening = 3,
close = 4
};
private:
uint16_t port; //Порт
status _status = status::close;
handler_function_t handler;
std::thread handler_thread;
std::list<std::thread> client_handler_threads;
std::list<std::thread::id> client_handling_end;
#ifdef _WIN32 // Windows NT
SOCKET serv_socket = INVALID_SOCKET;
WSAData w_data;
#else // *nix
int serv_socket;
#endif
void handlingLoop();
public:
TcpServer(const uint16_t port, handler_function_t handler);
~TcpServer();
//! Set client handler
void setHandler(handler_function_t handler);
uint16_t getPort() const;
uint16_t setPort(const uint16_t port);
status getStatus() const {return _status;}
status restart();
status start();
void stop();
void joinLoop();
};
class TcpServer::Client {
#ifdef _WIN32 // Windows NT
SOCKET socket;
SOCKADDR_IN address;
char buffer[buffer_size];
public:
Client(SOCKET socket, SOCKADDR_IN address);
#else // *nix
int socket;
struct sockaddr_in address;
char buffer[buffer_size];
public:
Client(int socket, struct sockaddr_in address);
#endif
public:
Client(const Client& other);
~Client();
uint32_t getHost() const;
uint16_t getPort() const;
int loadData();
char* getData();
bool sendData(const char* buffer, const size_t size) const;
};
#endif // TCPSERVER_H
Как можно заметить различия минимальны помимо разных подключаемых заголовочных файлов различаются разве что тип сокета — SOCKET
для Windows и (как бы странно это не выглядело) int
для Linux. Разница здесь лишь в том что Linux использует стандартный int для хранения данных сокета, в то время как в Windows задекларирован собственный тип который относительно архитектуры принимает разный размер и значность целочисленного типа, что можно увидеть в оригинальных заголовочных файлах:
//file _socket_types.h
//...
#if 1
typedef UINT_PTR SOCKET;
#else
typedef INT_PTR SOCKET;
#endif
//...
//file BaseTsd.h
//...
#if defined(_WIN64)
typedef unsigned __int64 UINT_PTR;
#else
typedef unsigned int UINT_PTR;
#endif
//...
#if defined(_WIN64)
typedef __int64 INT_PTR;
#else
typedef int INT_PTR;
#endif
//...
Так же в Windows части TcpServer-хедера присутствует структура для обозначения используемой версии WinSocket — WSAData w_data;
(см. WSAData)
Перейдём к реализации сервера:
TcpServer.cpp
#include "../hdr/TcpServer.h"
#include <chrono>
//Конструктор принимает:
//port - порт на котором будем запускать сервер
//handler - callback-функция запускаямая при подключении клиента
// объект которого и передают первым аргументом в callback
// (пример лямбда-функции: [](TcpServer::Client){...do something...})
TcpServer::TcpServer(const uint16_t port, handler_function_t handler) : port(port), handler(handler) {}
//Деструктор останавливает сервер если он был запущен
//и вычищает заданную версию WinSocket
TcpServer::~TcpServer() {
if(_status == status::up)
stop();
#ifdef _WIN32 // Windows NT
WSACleanup ();
#endif
}
//Задаёт callback-функцию запускаямую при подключении клиента
void TcpServer::setHandler(TcpServer::handler_function_t handler) {this->handler = handler;}
//Getter/Setter порта
uint16_t TcpServer::getPort() const {return port;}
uint16_t TcpServer::setPort( const uint16_t port) {
this->port = port;
restart(); //Перезапустить если сервер был запущен
return port;
}
//Перезапуск сервера
TcpServer::status TcpServer::restart() {
if(_status == status::up)
stop ();
return start();
}
// Вход в поток обработки соединений
void TcpServer::joinLoop() {handler_thread.join();}
//Загружает в буфер данные от клиента и возвращает их размер
int TcpServer::Client::loadData() {return recv(socket, buffer, buffer_size, 0);}
//Возвращает указатель на буфер с данными от клиента
char* TcpServer::Client::getData() {return buffer;}
//Отправляет данные клиенту
bool TcpServer::Client::sendData(const char* buffer, const size_t size) const {
if(send(socket, buffer, size, 0) < 0) return false;
return true;
}
#ifdef _WIN32 // Windows NT
//Запуск сервера
TcpServer::status TcpServer::start() {
WSAStartup(MAKEWORD(2, 2), &w_data); //Задаём версию WinSocket
SOCKADDR_IN address; //Структура хост/порт/протокол для инициализации сокета
address.sin_addr.S_un.S_addr = INADDR_ANY; //Любой IP адресс
address.sin_port = htons(port); //Задаём порт
address.sin_family = AF_INET; //AF_INET - Cемейство адресов для IPv4
//Инициализируем наш сокет и проверяем корректно ли прошла инициализация
//в противном случае возвращаем статус с ошибкой
if(static_cast<int>(serv_socket = socket(AF_INET, SOCK_STREAM, 0)) == SOCKET_ERROR) return _status = status::err_socket_init;
//Присваиваем к сокету адресс и порт и проверяем на коректность сокет
//в противном случае возвращаем статус с ошибкой
if(bind(serv_socket, (struct sockaddr*)&address, sizeof(address)) == SOCKET_ERROR) return _status = status::err_socket_bind;
//Запускаем прослушку и проверяем запустилась ли она
//в противном случае возвращаем статус с ошибкой
if(listen(serv_socket, SOMAXCONN) == SOCKET_ERROR) return _status = status::err_socket_listening;
//Меняем статус, запускаем обработчик соединений и возвращаем статус
_status = status::up;
handler_thread = std::thread([this]{handlingLoop();});
return _status;
}
//Остановка сервера
void TcpServer::stop() {
_status = status::close; //Изменение статуса
closesocket (serv_socket); //Закрытие сокета
joinLoop(); //Ожидание завершения
for(std::thread& cl_thr : client_handler_threads) //Перебор всех клиентских потоков
cl_thr.join(); // Ожидание их завершения
client_handler_threads.clear (); // Очистка списка клиентских потоков
client_handling_end.clear (); // Очистка списка идентификаторов завершённых клиентских потоков
}
// Функиця обработки соединений
void TcpServer::handlingLoop() {
while(_status == status::up) {
SOCKET client_socket; //Сокет клиента
SOCKADDR_IN client_addr; //Адресс клиента
int addrlen = sizeof(client_addr); //Размер адреса клиента
//Получение сокета и адреса клиента
//(если сокет коректен и сервер зарущен запуск потока обработки)
if ((client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, &addrlen)) != 0 && _status == status::up){
client_handler_threads.push_back(std::thread([this, &client_socket, &client_addr] {
handler(Client(client_socket, client_addr)); //Запуск callback-обработчика
//Добавление идентификатора в список идентификаторов завершённых клиентских потоков
client_handling_end.push_back (std::this_thread::get_id());
}));
}
//Очистка отработанных клиентских потоков
if(!client_handling_end.empty())
for(std::list<std::thread::id>::iterator id_it = client_handling_end.begin (); !client_handling_end.empty() ; id_it = client_handling_end.begin())
for(std::list<std::thread>::iterator thr_it = client_handler_threads.begin (); thr_it != client_handler_threads.end () ; ++thr_it)
if(thr_it->get_id () == *id_it) {
thr_it->join();
client_handler_threads.erase(thr_it);
client_handling_end.erase (id_it);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
// Конструктор клиента по сокету и адресу
TcpServer::Client::Client(SOCKET socket, SOCKADDR_IN address) : socket(socket), address(address) {}
// Конструктор копирования
TcpServer::Client::Client(const TcpServer::Client& other) : socket(other.socket), address(other.address) {}
TcpServer::Client::~Client() {
shutdown(socket, 0); //Обрыв соединения сокета
closesocket(socket); //Закрытие сокета
}
// Геттеры хоста и порта
uint32_t TcpServer::Client::getHost() const {return address.sin_addr.S_un.S_addr;}
uint16_t TcpServer::Client::getPort() const {return address.sin_port;}
#else // *nix
//Запуск сервера (по аналогии с реализацией для Windows)
TcpServer::status TcpServer::start() {
struct sockaddr_in server;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons( port );
server.sin_family = AF_INET;
serv_socket = socket(AF_INET, SOCK_STREAM, 0);
if(serv_socket == -1) return _status = status::err_socket_init;
if(bind(serv_socket,(struct sockaddr *)&server , sizeof(server)) < 0) return _status = status::err_socket_bind;
if(listen(serv_socket, 3) < 0)return _status = status::err_socket_listening;
_status = status::up;
handler_thread = std::thread([this]{handlingLoop();});
return _status;
}
//Остановка сервера
void TcpServer::stop() {
_status = status::close;
close(serv_socket);
joinLoop();
for(std::thread& cl_thr : client_handler_threads)
cl_thr.join();
client_handler_threads.clear ();
client_handling_end.clear ();
}
// Функиця обработки соединений (по аналогии с реализацией для Windows)
void TcpServer::handlingLoop() {
while (_status == status::up) {
int client_socket;
struct sockaddr_in client_addr;
int addrlen = sizeof (struct sockaddr_in);
if((client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, (socklen_t*)&addrlen)) >= 0 && _status == status::up)
client_handler_threads.push_back(std::thread([this, &client_socket, &client_addr] {
handler(Client(client_socket, client_addr));
client_handling_end.push_back (std::this_thread::get_id());
}));
if(!client_handling_end.empty())
for(std::list<std::thread::id>::iterator id_it = client_handling_end.begin (); !client_handling_end.empty() ; id_it = client_handling_end.begin())
for(std::list<std::thread>::iterator thr_it = client_handler_threads.begin (); thr_it != client_handler_threads.end () ; ++thr_it)
if(thr_it->get_id () == *id_it) {
thr_it->join();
client_handler_threads.erase(thr_it);
client_handling_end.erase (id_it);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
// Конструктор клиента по сокету и адресу
TcpServer::Client::Client(int socket, struct sockaddr_in address) : socket(socket), address(address) {}
// Конструктор копирования
TcpServer::Client::Client(const TcpServer::Client& other) : socket(other.socket), address(other.address) {}
TcpServer::Client::~Client() {
shutdown(socket, 0); //Обрыв соединения сокета
close(socket); //Закрытие сокета
}
// Геттеры хоста и порта
uint32_t TcpServer::Client::getHost() {return address.sin_addr.s_addr;}
uint16_t TcpServer::Client::getPort() {return address.sin_port;}
#endif
Реализация для Linux и Windows практически идентична за исключением некотрых мест, обусловленных разве что различными структурами хранящим адреса(struct sockaddr_in/SOCKADDR_IN, struct sockaddr/SOCKADDR
) и сокеты(int/SOCKET
), а так же наличием у Windows объекта версии WinSocket(WSAData
).
Пример использования:
main.cpp
#include "server/hdr/TcpServer.h"
#include <iostream>
//Парсер ip в std::string
std::string getHostStr(const TcpServer::Client& client) {
uint32_t ip = client.getHost ();
return std::string() + std::to_string(int(reinterpret_cast<char*>(&ip)[0])) + '.' +
std::to_string(int(reinterpret_cast<char*>(&ip)[1])) + '.' +
std::to_string(int(reinterpret_cast<char*>(&ip)[2])) + '.' +
std::to_string(int(reinterpret_cast<char*>(&ip)[3])) + ':' +
std::to_string( client.getPort ());
}
int main() {
//Создание объекта TcpServer с передачей аргументами порта и лябда-фунции для обработк клиента
TcpServer server( 8080,
[](TcpServer::Client client){
//Вывод адреса подключившего клиента в консоль
std::cout<<"Connected host:"<<getHostStr(client)<<std::endl;
//Ожидание данных от клиента
int size = 0;
while (size == 0) size = client.loadData ();
//Вывод размера данных и самих данных в консоль
std::cout
<<"size: "<<size<<" bytes"<<std::endl
<< client.getData() << std::endl;
//Отправка ответа клиенту
const char answer[] = "Hello World from Server";
client.sendData(answer, sizeof (answer));
}
);
//Запуск серевера
if(server.start() == TcpServer::status::up) {
//Если сервер запущен вывести сообщение и войти в поток ожиданий клиентов
std::cout<<"Server is up!"<<std::endl;
server.joinLoop();
} else {
//Если сервер не запущен вывод кода ошибки и заверешение программы
std::cout<<"Server start error! Error code:"<< int(server.getStatus()) <<std::endl;
return -1;
}
}
Использовавашиеся статьи:
ABBAPOH
Это UB
Arech
Что-то не улавливаю. Поясните, плс?
monah_tuk
Гонка
mayorovp
Во-первых, у компилятора могут оказаться основания полагать, что поле
_status
в цикле не меняется, из-за чего цикл превратится в вечный. Кстати, бесконечный цикл — тоже UB, что разрешает компилятору корёжить программу дальше.Во-вторых, значение
status::up
может "застрять" в кеше процессора, из-за чего цикл сделает кучу холостых операций.Если один поток записал в переменную некоторое значение — это ещё не означает, что другой поток именно это значение и прочитает.
ABBAPOH
Это, кстати, распространённый миф, кэши процессора когерентны и если вы в одну кэш линию что-то записали, то она «разъедется» на все ядра.
Другое дело что никто не гарантирует что запись будет осуществлена в том месте где вы написали (процессор/компилятор вольны переставлять инструкции) или вообще осуществлена — никто же не сказал что это атомик, можно вообще запись в кэш/память выкинуть или на регистрах оставить.
MooNDeaR
Соббсно это и будет наблюдаться как эффект "застревания" в кэше. Инструкции, меняющие переменную могут выполнится хз когда, и все это время цикл в другом потоке будет в холостую гонять.
ABBAPOH
Нельзя менять/читать значение неатомарной переменной из разных потоков. Если компилятор сможет доказать что в этом потоке переменная не меняется, он может выкинуть проверку нафиг или считать значение из памяти один раз, заменив проверку в цикле на if перед бесконечным циклом.
Arech
да хватит, хватит уже, первого комента достаточно.
Я уже так привык, что race condition всегда называют своим термином, что и думать забыл о нём, как об UB.
slamon
Ну, для такого и придумали volatile. Что впрочем не отменяет необходимость синхронизации чтения/записи
IkaR49
Так-то volatile != atomic. Но да, вы правы, использовался он чаще именно для этого
Sild
Ну нет же, не для этого придумали volatile. И даже deprecated ему собираются прикрутить в таких сценариях.
Ryppka
Вроде только в C++, в нормальном языке вроде оставляют, нет?
ABBAPOH
Хз что там в «нормальном» языке, но использование volatile для многопоточки — вещь, за которую надо увольнять. Да, volatile защищает от оптимизаций компилятора, но он не делает UB код меньшим UB, просто заметает баги под ковер. Ровно до тех пор пока вы не запустите свой «офигенный» «быстрый» алгоритм на volatile и reinterpret_cast'ах на каком-нибудь ARM.
К сожалению, x86 и друзья являются «strong ordered» архитектурами и поймать instruction reorder там сложно (вот хорошая статья, демонстрирующая, как его всё-таки словить), а значит ваш супер-мега «локфри» на волатайлах будет успешно «работать», возможно в продакшне и не один год.
Ryppka
Вы такой категоричный! Не поверите, мой код работает как раз на «каких-то» ARM'ах, и volatile там вполне достаточно. На более продвинутых процессорах — там да, нужны барьеры памяти, даже в MMIO регистр записать. Но на тех платформах, работа с которыми заполняет мои дни — нет. Поэтому я использую «нормальный язык», в котором reintrpret_cast — имя переменной. И из него работающий инструмент не выбрасывают.
ABBAPOH
Вы же сейчас о ключевом слове volatile, который вешают на переменную, а не о штуках типа asm volatile("" ::: «memory»);?
Если да, то у меня для вас плохие новости — либо у вас где-то есть барьер, который маскирует проблему, либо вы просто ещё не нашли багу.
Ключевое слово volatile не спасает от проблем многопоточности на relaxed-ordered архитектурах, потому что инструкции может переставлять процессор, а не компилятор (точнее, это могут делать оба, но volatile запрещает только второму).
Другое дело, что стандарт «нормального языка» ничего не говорит о многопоточности и memory ordering (поправьте, если ошибаюсь), а значит оставляет вас и компилятор в серой зоне — компилятор может вам помочь и напихать барьеров памяти, видя volatile, а может и не помогать и тогда ваш код будет содержать трудноотлаживаемую багу.
ABBAPOH
Я дико рекомендую вот этот цикл статей на Хабре, развеивает кучу мифов и даёт представление о том куда и на что смотреть, если поймали data race habr.com/ru/post/195770.