Всем привет! Продолжаем разбираться с io_uring. Сегодня попробуем использовать io_uring для решения прикладных задач. Сначала мигрируем приложения из предыдущего материала с C на GO, затем подготовим почву для написания полноценных TCP серверов. Ну и без сравнения производительности не обойдется. Обойдемся без длинного вступления, вперед экспериментировать!


Дисклеймер

Это вторая статья из серии, посвященной io_uring. Чтобы понимать, о чем речь — ознакомьтесь с первым материалом.  

Сегодня оставим в стороне теорию io_uring, но cфокусируемся на практических аспектах применения этой технологии, так что кода будет много. Для практических экспериментов была выбрана работа с сокетами, а в качестве эталона — runtime GO. Таким образом, в этом материале, код будет на языке GO, да и применение io_uring рассмотрим в контексте GO.

Если данная тема будет неинтересна, то подходите к статье номер 3. Там снова будет обсуждение фич io_uring, настроек и режимов работы, тюнинга производительности.

Echo-сервер, тушёный с приправами

Прежде чем соперничать с runtime'ом GO, немного поговорим об инструменте, который будем использовать. Для работы с io_uring можно было воспользоваться связкой CGO + liburing, но, так как особых помех для работы с системными вызовами семейства io_uring в GO нет (ну ладно, они есть, но говорить сегодня о них не будем), почему бы не сделать полностью нативную библиотеку? Давайте перепишем echo-сервер из прошлой статьи с помощью библиотеки go-uring:

GO echo-server
package main

import (
	"errors"
	"flag"
	"fmt"
	"github.com/godzie44/go-uring/uring"
	"log"
	"strconv"
	"syscall"
)

const MaxConns = 4096
const Backlog = 512
const MaxMsgLen = 2048

type connType int

const (
	_ connType = iota
	ACCEPT
	READ
	WRITE
)

// Для каждого активного соединения будем держать в памяти connInfo структуру.
// fd файловый дескриптор сокета.
// typ - состояние в котором находится сокет (ожидает чтения/записи/accept'а).
// sendOp, recvOp - закешированные операции чтения из сокета/записи в сокет.
type connInfo struct {
	fd  int
	typ connType

	sendOp *uring.SendOp
	recvOp *uring.RecvOp
}

// Для каждого соединения предалоцируем операции чтения и записи.
// Переиспользование операция чтения и записи снизит нагрузку на GC.
func makeConns() [MaxConns]connInfo {
	var conns [MaxConns]connInfo
	for fd := range conns {
		conns[fd].recvOp = uring.Recv(uintptr(fd), nil, 0)
		conns[fd].sendOp = uring.Send(uintptr(fd), nil, 0)
	}
	return conns
}

// Буфер для соединений.
var conns = makeConns()

// Для каждого соединения инициализируем буфера для записи/чтения.
func makeBuffers() [][]byte {
	buffs := make([][]byte, MaxConns)
	for i := range buffs {
		buffs[i] = make([]byte, MaxMsgLen)
	}
	return buffs
}

var buffs = makeBuffers()

func main() {
	flag.Parse()
	port, _ := strconv.Atoi(flag.Arg(0))

  // Создаем серверный сокет и слушаем порт.
 	// Отметим, что при создании сокета флаг O_NON_BLOCK не устанавливается,
	// при этом все операции read/write и тд преобразуются в неблокирующие системные вызовы внутри io_uring
	serverSockFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
	checkErr(err)
	defer syscall.Close(serverSockFd)

	checkErr(syscall.SetsockoptInt(serverSockFd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1))

	checkErr(syscall.Bind(serverSockFd, &syscall.SockaddrInet4{
		Port: port,
	}))
	checkErr(syscall.Listen(serverSockFd, Backlog))

	fmt.Printf("io_uring echo server listening for connections on port: %d\n", port)

	// Создаем экземпляр io_uring, не используем никакие кастомные настройки.
	// Вместимость SQ/SQ буферов устанавливаем в 4096 элементов.
	ring, err := uring.New(4096)
	checkErr(err)
	defer ring.Close()

	// Проверяем наличие фичи IORING_FEAT_FAST_POLL.
  // Для нас это наиболее "перформящая" фича в данном приложении,
  // фактически это встроенный в io_uring движок для поллинга I/O.
	if !ring.Params.FastPollFeature() {
		checkErr(errors.New("IORING_FEAT_FAST_POLL not available"))
	}

	// Добавляем первую операцию в SQ - слушаем серверный сокет на предмет новых входящих соединений.
	acceptOp := uring.Accept(uintptr(serverSockFd), 0)
	addAccept(ring, acceptOp)

	cqes := make([]*uring.CQEvent, Backlog)

	var cqe *uring.CQEvent
	for {
		// Сабмитим все SQE которые были добавлены на предыдущей итерации..
		_, err = ring.Submit()
		checkErr(err)

		// Ждем когда в CQ буфере появится хотя бы одно CQE.
		_, err = ring.WaitCQEvents(1)
		if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) {
			continue
		}
		checkErr(err)

		// Помещаем все "готовые" CQE в буфер cqes.
		n := ring.PeekCQEventBatch(cqes)

		for i := 0; i < n; i++ {
			cqe = cqes[i]

      // В поле user_data находится индекс соответствующего connInfo
      // в которой находится служебная информация по сокету.
			ud := conns[cqe.UserData]
			typ := ud.typ
			res := cqe.Res

			ring.SeenCQE(cqe)

			// Проверяя тип мы идентифицируем операцию результат которой находится в CQE (accept / recv / send).
			switch typ {
			case ACCEPT:
				addRead(ring, int(res))
				addAccept(ring, acceptOp)
			case READ:
				if res <= 0 {
					_ = syscall.Shutdown(ud.fd, syscall.SHUT_RDWR)
				} else {
					addWrite(ring, ud.fd, res)
				}
			case WRITE:
				addRead(ring, ud.fd)
			}
		}
	}
}

// addAccept - добавляет в SQ accept операцию, fd - файловый дескриптор сервер сокета.
func addAccept(ring *uring.Ring, acceptOp *uring.AcceptOp) {
	conns[acceptOp.Fd()].fd = acceptOp.Fd()
	conns[acceptOp.Fd()].typ = ACCEPT

	err := ring.QueueSQE(acceptOp, 0, uint64(acceptOp.Fd()))
	checkErr(err)
}

// addRead - добавляет в SQ read операцию, fd - файловый дескриптор клиентского сокета.
func addRead(ring *uring.Ring, fd int) {
	buff := buffs[fd]

	ci := &conns[fd]
	ci.fd = fd
	ci.typ = READ
	ci.recvOp.SetBuffer(buff)

	err := ring.QueueSQE(ci.recvOp, 0, uint64(fd))
	checkErr(err)
}

// addWrite - добавляет в SQ write операцию, fd - файловый дескриптор клиентского сокета.
func addWrite(ring *uring.Ring, fd int, bytesRead int32) {
	buff := buffs[fd]

	ci := &conns[fd]
	ci.fd = fd
	ci.typ = WRITE
	ci.sendOp.SetBuffer(buff[:bytesRead])

	err := ring.QueueSQE(ci.sendOp, 0, uint64(fd))
	checkErr(err)
}

func checkErr(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

Код на GO получился несколько компактнее чем этот же сервер на plain C. С другой стороны, оба исходника довольно схожи как семантически, так и синтаксически. А что по поводу производительности? С подробным описанием методики сравнения можно ознакомиться здесь. Рассмотрим исключительно результаты:

c: 100 bytes: 128

c: 50 bytes: 1024

c: 500 bytes: 128

c: 500 bytes: 1024

c: 1000 bytes: 128

c: 1000 bytes: 1024

io_uring-echo-server (C lang)

235356

224783

173670

155477

149407

139987

echo-server

227884

222709

169001

150275

143664

128783

Отлично! GO-код не сильно проигрывает коду на C в контексте I/O нагрузки. Но, есть проблема, реализация транспортного слоя прибита гвоздями к логике приложения, что не позволяет использовать ее где-либо кроме как в echo-сервере. Для того чтобы писать полноценные приложения с io_uring в GO, нам нужно реализовать интерфейсы из пакета net: net.Conn и net.Listener. А для этого нужно разобраться в реализации стандартных net.Conn и net.Listener для протокола TCP и написать свои реализации. Но обо всем по порядку.

В логове net package

Рассмотрим необходимые нам интерфейсы подробнее.

net.Listener

Задача listener'а — слушать определенный порт и при подключении к серверу очередного клиента создать структуру реализующую net.Conn, для работы с этим клиентом. Соответствующий интерфейс:

// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (Conn, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() Addr
}

net.Conn

net.Conn - абстракция над клиентским соединением, позволяет выполнять чтение и запись в соответствующий клиентский сокет. Интерфейс (для удобства немного упрощен):

type Conn interface{
	// Read reads data from the connection.
	Read(b []byte) (n int, err error)

	// Write writes data to the connection.
	Write(b []byte) (n int, err error)

	// Close closes the connection.
	Close() error
}

Соответствующие реализации этих интерфейсов лежат в основе работы транспортного уровня в GO. Имея на руках реализации net.Listner и net.Conn для, например, TCP протокола можно реализовать любой вышестоящий протокол, будь то HTTP или любой другой. К примеру, вот так будет выглядеть echo-server с использованием стандартных net.TCPListener и net.TCPConn:

обычный echo-server
package main

import (
	"io"
	"log"
	"net"
)

const MaxConns = 4096
const MaxMsgLen = 2048

func initBuffs() [][]byte {
	buffs := make([][]byte, MaxConns)
	for i := range buffs {
		buffs[i] = make([]byte, MaxMsgLen)
	}
	return buffs
}

var buffs = initBuffs()

func main() {
	listener, err := net.ListenTCP("tcp", &net.TCPAddr{
		Port: 8080,
	})
	checkErr(err)

	for {
		conn, err := listener.Accept()
		checkErr(err)

		go handleConn(conn)
	}
}

func handleConn(conn net.Conn) {
	f, _ := conn.(*net.TCPConn).File()
	buff := buffs[int(f.Fd())]
	for {
		n, err := conn.Read(buff)
		if err == io.EOF || n == 0 {
			checkErr(conn.Close())
			return
		}
		checkErr(err)

		_, err = conn.Write(buff[:n])
		checkErr(err)
	}
}

func checkErr(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

Все довольно просто, в бесконечном цикле принимаем входящие соединения, а затем, для каждого соединения, создаем горутину в которой начинаем читать данные из клиентского сокета и писать туда же. Но представьте, что какой-то из клиентов решил не передавать нам данные, к примеру, в течение минуты. В таком случае поток, на котором находится горутина выполняющая чтение, заблокируется? Естественно нет! Любой GO разработчик знает — это не проблема для нашего сервера. Но каким образом, казалось бы, блокирующие вызовы conn.Read не блокируют поток исполнения? И тут в дело вступает runtime, вернее его компонент — netpoller.

netpoller - тропа через block I/O

Рассмотрим как горутины работают с I/O операциями над сокетами на примере чтения. В GO горутина (G) желающая прочитать данные из сокета, сначала попытается выполнить чтение в неблокирующем режиме. Если чтение в настоящий момент невозможно (к примеру, в сокете нет данных) — G будет заблокирована. При этом поток выполнения (M) не блокируется, а G инициирующая чтение уходит в состояние сна. Ее место на логическом процессоре (P) займет другая горутина. Наконец, когда сокет будет готов для I/O (в нашем случае появились данные для чтения) - планировщик разбудит G, и наша программа выполнит запланированный системный вызов.

Для реализации подобного механизма необходим компонент, который:

  • позволяет G добавить себя в очередь для оповещения о готовности I/O

  • будет использован планировщиком, чтобы понять, каким G требуется процессорное время в связи с появлением I/O events

Этот компонент runtime'а называется netpoller и в том числе, благодаря ему, сервера, написанные на GO, обладают такой скоростью.

А вот его упрощенный интерфейс, который нас интересует:

	func poll_runtime_pollWait(pd *pollDesc, mode int) int
	func netpoll(delay int64) gList
  • poll_runtime_pollWait -  предоставляет возможность для G подписаться на интересующее I/O событие в сокете. Структура pollDesc описывает файловый дескриптор сокета, а mode - тип события, которого следует ждать (read/write)

  • netpoll - эта функция используется планировщиком для получения списка G которым нужно дать процессорное время

Реализация netpoller зависит от конкретной OS. В Linux netpoller реализован с помощью семейства системных вызовов epoll. Таким образом, в GO используется паттерн подписки на I/O события:

так горутина общается с netpoller'ом
так горутина общается с netpoller'ом

Итак, с помощью netpoller'а можно делать системные вызовы, не боясь заблокировать приложение. В свою очередь, с помощью io_uring можно добиться тех же целей, но другими средствами. Концепция событий отходит в сторону, ведь с io_uring можно просто поставить необходимую операцию в очередь на исполнение и позднее узнать результат операции.

а так могла бы ощаться с io_uring
а так могла бы ощаться с io_uring

Тут нужно сделать ремарку: можно точно так же отлавливать I/O события и с помощью io_uring операции IORING_OP_POLL_ADD, правда, на производительности такой подход отражается скорее в негативную сторону.

Приручение reactor

Ну что же, теперь можно написать конкурента netpoller'у с использованием io_uring. Назовем его reactor. Суть идеи проста, реактор выполняет операции поддерживаемые io_uring и дергает callback в userspace по завершению той или иной операции. Таким образом, reactor будет полноценным асинхронным I/O бэкендом. Вот его наивная реализация:

reactor.go
type Callback func(event uring.CQEvent)

type Reactor struct {
	ring *uring.Ring
	callbacks     map[uint64]Callback
	callbacksLock sync.Mutex
	queueSQELock sync.Mutex
	currentNonce uint64
	submitSignal chan struct{}
}

func New(ring *uring.Ring) *Reactor {
	return &Reactor{
		ring:         ring,
		submitSignal: make(chan struct{}),
		callbacks:    map[uint64]Callback{},
	}
}

func (r *Reactor) Run(ctx context.Context) {
	wg := &sync.WaitGroup{}

	//запускаем два компонента реактора
	//consumer - обрабатывает новые CQE
	//publisher - отправляет новые SQE на обработку
	wg.Add(2)
	go func() {
		defer wg.Done()
		r.runConsumer(ctx)
	}()
	go func() {
		defer wg.Done()
		r.runPublisher(ctx)
	}()
	wg.Wait()
}

// сабмитим новые SQE в отдельной горутине
func (r *Reactor) runPublisher(ctx context.Context) {
	defer close(r.submitSignal)
	for {
		select {
		//при поступлении сигнала обрабатываем новые SQE
		case <-r.submitSignal:
			r.queueSQELock.Lock()
			_, err := r.ring.Submit()
			r.queueSQELock.Unlock()
			if err != nil {
				log.Println("io_uring submit", err)
			}
		case <-ctx.Done():
			return
		}
	}
}

// получаем результаты выполнения команд и дергаем соответствующие колбеки
func (r *Reactor) runConsumer(ctx context.Context) {
	cqeBuff := make([]*uring.CQEvent, 512)
	// цикл обработки результатов операций (CQE)
	for {
		// командуем Publisher'у обработать новые SQE
		r.submitSignal <- struct{}{}

		// ждем хотябы одну завершенную операцию
		_, err := r.ring.WaitCQEventsWithTimeout(1, time.Millisecond)
		if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.ETIME) {
			runtime.Gosched()
			goto CheckCtxAndContinue
		}

		if err != nil {
			log.Println("io_uring wait", err)
			goto CheckCtxAndContinue
		}

		for n := r.ring.PeekCQEventBatch(cqeBuff); n > 0; n = r.ring.PeekCQEventBatch(cqeBuff) {
			for i := 0; i < n; i++ {
				cqe := cqeBuff[i]

				nonce := cqe.UserData

				//находим соответствующий callback
				r.callbacksLock.Lock()
				cb := r.callbacks[nonce]
				delete(r.callbacks, nonce)
				r.callbacksLock.Unlock()

				cb(uring.CQEvent{
					UserData: cqe.UserData,
					Res:      cqe.Res,
					Flags:    cqe.Flags,
				})
			}

			//сообщаем io_uring о N просмотренных CQE
			r.ring.AdvanceCQ(uint32(n))
		}

	CheckCtxAndContinue:
		select {
		case <-ctx.Done():
			return
		default:
			continue
		}
	}
}

//Queue добавляет операцию в SQ.
func (r *Reactor) Queue(op uring.Operation, cb Callback) (uint64, error) {
	//генерируем уникальное значение UserData
	nonce := r.nextNonce()

	r.queueSQELock.Lock()
	defer r.queueSQELock.Unlock()

	//помещаем операцию в SQ
	err := r.ring.QueueSQE(op, 0, nonce)
	if err == nil {
		r.callbacksLock.Lock()
		r.callbacks[nonce] = cb
		r.callbacksLock.Unlock()
	}

	return nonce, err
}

func (r *Reactor) nextNonce() uint64 {
	return atomic.AddUint64(&r.currentNonce, 1)
}

C помощью подобного компонента можно добавить операцию в SQ очередь io_uring и реактивно обработать результат этой операции. Внутри параллельно работают consumer для добавления операций в SQ и publisher для обработки результатов из CQ. 
 
Теперь, при помощи reactor'а, можно реализовать заветные net.Listener и net.Conn интерфейсы.

net.Listener. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Listener struct {
	fd         int
	reactor    *Reactor
	acceptChan chan uring.CQEvent
	addr       net.Addr
}

func NewListener(addr string, reactor *Reactor) (*Listener, error) {
	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
	if err != nil {
		return nil, err
	}
	
	// serverSocket - создает и биндит серверный сокет
	sockFd, err := serverSocket(tcpAddr)
	if err != nil {
		return nil, err
	}

	return &Listener{
		fd:         sockFd,
		addr:       tcpAddr,
		reactor:    reactor,
		acceptChan: make(chan uring.CQEvent),
	}, nil
}

func (l *Listener) Accept() (net.Conn, error) {
	//помещаем accept операцию в реактор
	op := uring.Accept(uintptr(l.fd), 0)
	l.reactor.Queue(op, func(event uring.CQEvent) {
		l.acceptChan <- event
	})
	// ждем появление подключения
	cqe := <-l.acceptChan

	if err := cqe.Error(); err != nil {
		return nil, err
	}

	fd := int(cqe.Res)
	rAddr, _ := op.Addr()
	tc := newConn(fd, l.addr, rAddr, l.reactor)
	return tc, nil
}
net.Conn. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Conn struct {
	Fd                  int
	lAddr, rAddr        net.Addr
	reactor             *Reactor
	readChan, writeChan chan uring.CQEvent
	readLock, writeLock sync.Mutex
}

func newConn(fd int, lAddr, rAddr net.Addr, r *Reactor) *Conn {
	return &Conn{
		lAddr:     lAddr,
		rAddr:     rAddr,
		Fd:        fd,
		reactor:   r,
		readChan:  make(chan uring.CQEvent),
		writeChan: make(chan uring.CQEvent),
	}
}

func (c *Conn) Read(b []byte) (n int, err error) {
	c.readLock.Lock()
	defer c.readLock.Unlock()

	//помещаем Recv операцию в реактор
	op := uring.Recv(uintptr(c.Fd), b, 0)
	c.reactor.Queue(op, func(event uring.CQEvent) {
		c.readChan <- event
	})
	//ждем завершения чтения из сокета
	cqe := <-c.readChan

	if err = cqe.Error(); err != nil {
		return 0, &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
	}
	if cqe.Res == 0 {
		err = &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.EOF}
	}

	runtime.KeepAlive(b)
	return int(cqe.Res), err
}

func (c *Conn) Write(b []byte) (n int, err error) {
	c.writeLock.Lock()
	defer c.writeLock.Unlock()

	//помещаем Send операцию в реактор
	op := uring.Send(uintptr(c.Fd), b, 0)
	c.reactor.Queue(op, func(event uring.CQEvent) {
		c.writeChan <- event
	})
	//ждем завершения записи в сокет
	cqe := <-c.writeChan

	if err = cqe.Error(); err != nil {
		return 0, &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
	}
	if cqe.Res == 0 {
		err = &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.ErrUnexpectedEOF}
	}

	runtime.KeepAlive(b)
	return int(cqe.Res), err
}

func (c *Conn) Close() error {
	return syscall.Close(c.Fd)
}

Проверим на практике новые реализации, давно мы не писали echo-server! Вот его полный код (вместе с reactor'ом, net.Listener и net.Conn).

Сравним новый сервер с io_uring reactor'ом под капотом, с таким же сервером, который использует net.TCPListener и net.TCPConn из стандартной бибилиотеки (а значит, под капотом там netpoller). Как обычно смотрим на rps.

c: 100 bytes: 128

c: 50 bytes: 1024

c: 500 bytes: 128

c: 500 bytes: 1024

c: 1000 bytes: 128

c: 1000 bytes: 1024

net/http

132664

139206

133039

139171

133480

139617

reactor (io_uring)

30892

30077

39192

38375

46120

51204

Да уж, сравнение не в нашу пользу. Текущая реализация не использует на полную ресурсы железа и возможности io_uring — расплата за это низкая производительность. Тем не менее в плане функциональности даже такой, наивной, реализации reactor достаточно чтобы заменить собой netpoller. Кроме того, reactor можно использовать не только в контексте socket I/O, но и для всех прочих I/O.

О завершающей части

Конечно, такая низкая производительность не может нас устроить. Поэтому в третьей и заключительной части разберемся, почему решение с "наивным" reactor работает так медленно, рассмотрим способы ускорения (как с помощью настройки io_uring, так и с помощью оптимизаций кода reactor), а также поставим точку (хотя скорее многоточие) в битве с netpoller. Спасибо за внимание, stay tuned.


Дата-центр ITSOFT — размещение и аренда серверов и стоек в двух дата-центрах в Москве. За последние годы UPTIME 100%. Размещение GPU-ферм и ASIC-майнеров, аренда GPU-серверов, лицензии связи, SSL-сертификаты, администрирование серверов и поддержка сайтов.

Комментарии (1)


  1. yellow79
    30.12.2021 14:24
    +1

    Спасибо за статью, очнь ценный материал. Крайне непредсказуемый финал, с нетерпением жду третью часть