Принципы обработки тысяч соединений в Java

Проблема C10k – это термин, обозначающий десять тысяч
одновременно обрабатываемых соединений. Для решения проблемы часто
приходится вносить изменения в настройки сетевых сокетов и Linux, следить за использованием буферов отправки и приёма
TCP и очередей.

1. Сделайте
приложение под C10k


Когда нужно максимально
продуктивно использовать процессор, следует держать количество потоков близким к числу процессоров, выделенных для приложения. Опираясь на это, следует подобрать неблокирующую логику с высоким соотношением времени
обработки CPU/IO.

Иногда будет нужно
перестроить код, добавив RabbitMQ или Kafka, для изменения распределённой
системы, чтобы иметь возможность буферизировать таски и отделить неблокирующий
код от того кода, который не получится легко переписать.

Неблокирующий код
позволит:

  • Разделить приложение на две части. Например, одна часть – это REST, которая может быть реализована с помощью HTTP-сервера на основе пула потоков, а вторая часть – клиент, записывающий что-то в БД.
  • Масштабировать количество экземпляров этих двух частей по-разному, потому как очень вероятно, что нагрузка/процессор/память абсолютно разные.

На что ещё обратить внимание:

  • Храните как можно меньше потоков. Проверяйте не только серверные потоки, но и другие части приложения: клиент, драйвер БД, настройки журнала. Всегда делайте дамп потока, чтобы видеть количество потоков и их содержание. Выполняйте это под нагрузкой, иначе пулы потоков не будут полностью инициализированы. Называйте пользовательские потоки понятным образом – так будет проще сопровождать код.
  • Помните о блокировке вызовов HTTP/DB. Можно использовать реактивные клиенты, которые автоматически регистрируют для входящего ответа обратный вызов. Рассмотрите возможность использования протокола для связи service-2-service, например, RSocket.
  • Проверьте, содержит ли приложение постоянно низкое количество потоков, имеет ли оно пулы потоков и способно ли выдержать необходимую нагрузку.

Если приложение
имеет несколько потоков обработки, всегда проверяйте, какие из них
блокируются. При большом количестве блокирующих потоков вам потребуется почти каждый запрос обработать в другом потоке, чтобы
освободить поток цикла событий для следующего соединения.

В этом случае
рассмотрите возможность использования HTTP-сервера на основе thread-pool с воркерами,
где все запросы помещаются в другой поток из огромного пула для увеличения
пропускной способности – другого способа нет, если мы не в состоянии избавиться
от блокирующих вызовов.

2. Кэшируйте соединения, а не потоки

Этот принцип тесно
связан с темой программирования моделей для HTTP-сервера. Основная идея
заключается не в привязке соединения к одному потоку, а в использовании
некоторых библиотек, поддерживающих эффективный подход чтения из TCP.

Самая важная часть – это
рукопожатие TCP. Вы всегда должны поддерживать keep-alive соединение.
Если бы мы использовали TCP-соединение только для отправки одного сообщения, мы
бы оплатили накладные расходы в размере 8 сегментов TCP (connect and close the
connection
= 7 сегментов).

Подтверждение нового
TCP-соединения

Если мы находимся в
ситуации, когда нет возможности использовать постоянное соединение, то очень
вероятно, что за короткий промежуток времени накопится большое количество
созданных соединений. Они должны быть поставлены в очередь и ждать сигнала от
приложения.

Структура взаимодействия бэклогов TCP-соединения
Структура взаимодействия бэклогов TCP-соединения

На рисунке бэклоги SYN
и LISTEN. В SYN находятся соединения, ожидающие использования в TCP-рукопожатии.
А в LISTEN
полностью инициализированные соединения, с буферами отправки и приёма. Если хотите узнать,
почему нужны два бэклога, почитайте данный материал.

Есть ещё одна проблема. Под нагрузкой при большом числе входящих соединений потоки
приложений, отвечающие за приём соединений, могут быть заняты другой работой – выполнением
IO для уже подключенных клиентов.

            var bossEventLoopGroup = new EpollEventLoopGroup(1);
var workerEventLoopGroup = new EpollEventLoopGroup();

new ServerBootstrap()
     .channel(EpollServerSocketChannel.class)
     .group(bossEventLoopGroup, workerEventLoopGroup)
     .localAddress(8080)
     .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
     .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
     .childHandler(new CustomChannelInitializer());
        

В приведённом фрагменте (конфиг Netty Server API) находятся bossEventLoopGroup и
workerEventLoopGroup. В то время как workerEventLoopGroup по умолчанию
создаётся с (CPU * 2) потоками/циклами для выполнения IO-операций,
bossEventLoopGroup содержит один поток для приёма новых соединений. Netty
позволяет иметь только одну группу для обоих действий, но в этом случае принятие
новых соединений может долго ждать из-за выполнения IO
или более длительных операций.

Легко
проверить, способен ли процесс выдержать нагрузку входящих соединений – немного
модифицировав Websocket-Broadcaster для подключения 20 000 клиентов:

            $ ss -plnt sport = :8081|cat
State    Recv-Q    Send-Q        Local Address:Port        Peer Address:Port
LISTEN   42        128                       *:8081                   *:*        users:(("java",pid=7418,fd=86))
$ ss -plnt sport = :8081|cat
State    Recv-Q    Send-Q        Local Address:Port        Peer Address:Port
LISTEN   0         128                       *:8081                   *:*        users:(("java",pid=7418,fd=86))
$ ss -plnt sport = :8081|cat
State    Recv-Q    Send-Q        Local Address:Port        Peer Address:Port
LISTEN   20        128                       *:8081                   *:*        users:(("java",pid=7418,fd=86))
$ ss -plnt sport = :8081|cat
State    Recv-Q    Send-Q        Local Address:Port        Peer Address:Port
LISTEN   63        128                       *:8081                   *:*        users:(("java",pid=7418,fd=86))
$ ss -plnt sport = :8081|cat
State    Recv-Q    Send-Q        Local Address:Port        Peer Address:Port
LISTEN   0         128                       *:8081                   *:*        users:(("java",pid=7418,fd=86))
        
  • Send-Q: общий размер бэклога LISTEN.
  • Recv-Q: текущее количество подключений в бэклоге LISTEN.
Текущий дефолтный размер бэклога LISTEN

            cat /proc/sys/net/core/somaxconn
128
        

Буферы Send/Receive

Когда соединение
готово, наиболее проблемными частями являются буферы send/receive,
которые используются для передачи байтов из приложения, в базовый сетевой стек.
Размер этих буферов можно настраивать:

            new ServerBootstrap()
     .channel(EpollServerSocketChannel.class)
     .group(bossEventLoopGroup, workerEventLoopGroup)
     .localAddress(8080)
     .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
     .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
     .childHandler(new CustomChannelInitializer());
        

Дополнительные сведения
о параметрах сокетов в Java прочитайте в официальном хелпе StandardSocketOptions. Новые версии Linux способны автоматически
настраивать буферы для достижения оптимального размера в текущей нагрузке с
помощью TCP Congestion Window.

Прочитайте про TCP
Buffer Sizing
. Большие буферы могут
привести к утечкам памяти, а маленькие – задушат приложение т. к. не будет
места для передачи байтов в/из сетевого стека.

Почему кэширование
потока это плохо?

Java Thread – «дорогой» объект, ведь он сопоставлен один в один с потоком ядра. В Java мы
можем ограничить размер стека потока с помощью опции -Xss, которая по умолчанию
установлена в 1 Мб. Это означает, что один поток занимает 1 Мю виртуальной
памяти, а не Committed Memory. Если не используются жадные фреймворки или рекурсия, размер потока составляет 200-300 кБ. Этот вид памяти принадлежит к Native Memory, а все моменты можно отследить
с помощью Native Memory Tracking.

            $ java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary /
-XX:+PrintNMTStatistics -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.2+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.2+9, mixed mode)
Native Memory Tracking:
Total: reserved=6643041KB, committed=397465KB
-                 Java Heap (reserved=5079040KB, committed=317440KB
                            (mmap: reserved=5079040KB, committed=317440KB)
-                     Class (reserved=1056864KB, committed=4576KB)
                            (classes #426)
                            (  instance classes #364, array classes #62)
                            (malloc=96KB #455)
                            (mmap: reserved=1056768KB, committed=4480KB)
                            (  Metadata:   )
                            (    reserved=8192KB, committed=4096KB)
                            (    used=2849KB)
                            (    free=1247KB)
                            (    waste=0KB =0,00%)
                            (  Class space:)
                            (    reserved=1048576KB, committed=384KB)
                            (    used=270KB)
                            (    free=114KB)
                            (    waste=0KB =0,00%)
-                    Thread (reserved=15461KB, committed=613KB)
                            (thread #15)
                            (stack: reserved=15392KB, committed=544KB)
                            (malloc=52KB #84)
                            (arena=18KB #28)
        

Другая проблема с
большим количеством потоков – огромный Root Set. Например, есть 4
процессора и 200 потоков. В этом случае вы можете запустить только 4 потока, но
если все 200 потоков уже заняты обработкой, вы заплатите большую цену за
объекты, расположенные в куче, так как данный поток ждёт свободное процессорное
время. Все объекты, которые уже были выделены и всё ещё используются, не будут
затронуты сборщиком мусора.

Почему Root Set – это
такая большая проблема?


Красные точки могут обозначать
любой момент времени, когда работают только 4 потока, а остальные просто ждут в
очереди. Область Completeness of the
task
не означает, что все объекты, выделенные до сих пор, все еще живы, они
уже могли быть собраны в мусор или ожидают следующего цикла сборщика мусора. Что тут не так?

  • Слишком большой Live Set: каждый поток сохраняет выделенные живые объекты в куче, который просто ждёт. Этот момент нужно держать в памяти, когда мы определяем размер кучи.
  • Большие паузы GC из-за большего Root Set, усложняющего работу сборщика мусора. Современные сборщики начинают с идентификации Root Set (Snapshot-At-The-Beginning – работа с живыми потоками), а затем проходят через граф, чтобы найти текущий Live Set. Чем больше Root Set, тем больше работы для GC.
  • Движение к Old Generation: большой Live Set также будет влиять на время, в течение которого данный объект будет считаться живым. Это увеличивает вероятность того, что объект будет переведен в Old Generation, даже если поток, хранящий этот объект, провел большую часть своего времени вне ЦП.

3. Прекращаем
генерировать мусор


Для написания
приложения, которое будет находиться под большой нагрузкой, необходимо подумать
о выделении всех объектов и не позволять JVM тратить ни одного байта. В этом поможет
ByteBuffers.

Класс ByteBuffer имеет
две опции: HeapByteBuffer и DirectByteBuffer. Ключевая польза DirectByteBuffers
– возможность передавать напрямую в нативные функции ОС выполнение I/O. Другими
словами, когда вы работаете с I/O в Java, вы передаете ссылку на DirectByteBuffer
(со смещением и длиной).

Рассмотрим на примере. У
вас есть 10k соединений, и вы хотите им передать одно и то же строковое
значение. Нет никакой причины передавать строку 10k раз или еще хуже,
генерировать новый объект String для
каждого соединения, засоряя кучу одним и тем же массивом байтов. Вместо этого
мы можем создать собственный DirectByteBuffer и предоставить его всем
соединениям, а затем позволить им передать все в ОС через JVM.

Есть одно но – DirectByteBuffer
очень «дорого» выделять. Поэтому в JDK каждый поток, работающий с I/O, кэширует
один DirectByteBuffer для внутреннего использования. Почитайте об утечках памяти в ByteBuffer для большей наглядности.

Но зачем тогда нужен
HeapByteBuffer, если его нужно преобразовать в DirectByteBuffer для возможности
записи в ОС? HeapByteBuffer гораздо дешевле обходится. Если мы вернемся в пример
выше, то мы могли бы исключить первый шаг и не делать это 10k-раз. Тогда можно
рассчитывать на автоматический механизм кэширования DirectByteBuffer для
каждого потока внутри JDK.

4. Измеряем
нагрузку в часы пик

Ознакомьтесь с полезным
набором утилит для работы с TCP: BCC Tools и bpftrace.

Используя bpftrace, можно получить быстрый результат для исследования возможной проблемы. Пример демонстрирует, как socketio-pid.bt подсчитывает количество переданных
байтов на основе PID:

            #!/snap/bin/bpftrace
#include <linux/fs.h>
BEGIN
{
      printf("Socket READS/WRITES and transmitted bytes, PID: %un", $1);
}
kprobe:sock_read_iter,
kprobe:sock_write_iter
/$1 == 0 || ($1 != 0 && pid == $1)/
{
       @kiocb[tid] = arg0;
}
kretprobe:sock_read_iter
/@kiocb[tid] && ($1 == 0 || ($1 != 0 && pid == $1))/
{
       $file = ((struct kiocb *)@kiocb[tid])->ki_filp;
       $name = $file->f_path.dentry->d_name.name;
       @io[comm, pid, "read", str($name)] = count();
       @bytes[comm, pid, "read", str($name)] = sum(retval > 0 ? retval : 0);
       delete(@kiocb[tid]);
}
kretprobe:sock_write_iter
/@kiocb[tid] && ($1 == 0 || ($1 != 0 && pid == $1))/
{
       $file = ((struct kiocb *)@kiocb[tid])->ki_filp;
       $name = $file->f_path.dentry->d_name.name;
       @io[comm, pid, "write", str($name)] = count();
       @bytes[comm, pid, "write", str($name)] = sum(retval > 0 ? retval : 0);
       delete(@kiocb[tid]);
}
END
{
       clear(@kiocb);
}
        

Видим пять потоков (server-io-x)
и каждый поток висит в цикле событий. Каждый цикл имеет одного подключенного
клиента, и приложение транслирует случайно сгенерированное строковое сообщение
всем подключенным клиентам с помощью протокола Websocket.

  • @bytes – сумма r/w байтов;
  • @io – общее количество операций r/w.
            ./socketio-pid.bt 27069
Attaching 6 probes...
Socket READS/WRITES and transmitted bytes, PID: 27069
^C
@bytes[server-io-3, 27069, read, TCPv6]: 292
@bytes[server-io-4, 27069, read, TCPv6]: 292
@bytes[server-io-0, 27069, read, TCPv6]: 292
@bytes[server-io-2, 27069, read, TCPv6]: 292
@bytes[server-io-1, 27069, read, TCPv6]: 292
@bytes[server-io-3, 27069, write, TCPv6]: 1252746
@bytes[server-io-1, 27069, write, TCPv6]: 1252746
@bytes[server-io-0, 27069, write, TCPv6]: 1252746
@bytes[server-io-4, 27069, write, TCPv6]: 1252746
@bytes[server-io-2, 27069, write, TCPv6]: 1252746
@io[server-io-3, 27069, read, TCPv6]: 1
@io[server-io-4, 27069, read, TCPv6]: 1
@io[server-io-0, 27069, read, TCPv6]: 1
@io[server-io-2, 27069, read, TCPv6]: 1
@io[server-io-1, 27069, read, TCPv6]: 1
@io[server-io-3, 27069, write, TCPv6]: 1371
@io[server-io-1, 27069, write, TCPv6]: 1371
@io[server-io-0, 27069, write, TCPv6]: 1371
@io[server-io-4, 27069, write, TCPv6]: 1371
@io[server-io-2, 27069, write, TCPv6]: 1371
        

5. Баланс между
пропускной способностью и задержкой

Производительность
приложения рано или поздно упирается в компромисс между пропускной способностью
и задержкой. Предположим, что у нас есть Netty и
WebSocket-сервер, который отправляет сообщения подключенным
клиентам. Действительно ли нам нужно отправить сообщение как можно скорее? Или
можно подождать, создать пакет из пяти сообщений и отправить их вместе?

Netty поддерживает механизм,
идеально подходящий для этого случая. Допустим, мы решили пожертвовать
задержкой в пользу общей пропускной способности при создании пакета.

            pbouda.jfr.sockets.netty.server.SlowConsumerDisconnectHandler:
- we need to comment out flushing of every message and use simple write instead
- write method does not automatically write data into the socket, it waits for a flush context.writeAndFlush(obj) -> context.write(obj)
        
            pbouda.jfr.sockets.netty.Start#main 
- uncomment the section at the end of the method `Flush a bulk of 5 messages`
        

Если у вас стоит Java
14, включающая в себя функцию Java Flight Recorder Streaming, то вы можете
увидеть, как действует Netty в данном случае:

            Broadcaster-Server 2020-01-14 22:12:00,937 [client-nioEventLoopGroup-0] INFO p.j.s.n.c.WebSocketClientHandler - Received message: my-message (10 bytes)
Broadcaster-Server 2020-01-14 22:12:00,937 [client-nioEventLoopGroup-0] INFO p.j.s.n.c.WebSocketClientHandler - Received message: my-message (10 bytes)
Broadcaster-Server 2020-01-14 22:12:00,938 [client-nioEventLoopGroup-0] INFO p.j.s.n.c.WebSocketClientHandler - Received message: my-message (10 bytes)
Broadcaster-Server 2020-01-14 22:12:00,938 [client-nioEventLoopGroup-0] INFO p.j.s.n.c.WebSocketClientHandler - Received message: my-message (10 bytes)
Broadcaster-Server 2020-01-14 22:12:00,939 [client-nioEventLoopGroup-0] INFO p.j.s.n.c.WebSocketClientHandler - Received message: my-message (10 bytes)
jdk.SocketWrite {
    startTime = 22:12:01.603
    duration = 2.23 ms
    host = ""
    address = "127.0.0.1"
    port = 42556
    bytesWritten = 60 bytes
    eventThread = "server-nioEventLoopGroup-1" (javaThreadId = 27)
    stackTrace = [
        sun.nio.ch.SocketChannelImpl.write(ByteBuffer[], int, int) line: 167
        io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 420
        io.netty.channel.AbstractChannel$AbstractUnsafe.flush0() line: 931
        io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0() line: 354
        io.netty.channel.AbstractChannel$AbstractUnsafe.flush() line: 898
        ...
    ]
}
jdk.SocketRead {
    startTime = 22:12:01.605
    duration = 0.0757 ms
    host = ""
    address = "127.0.0.1"
    port = 8080
    timeout = 0 s
    bytesRead = 60 bytes
    endOfStream = false
    eventThread = "client-nioEventLoopGroup-0" (javaThreadId = 26)
    stackTrace = [
        sun.nio.ch.SocketChannelImpl.read(ByteBuffer) line: 73
        io.netty.buffer.PooledByteBuf.setBytes(int, ScatteringByteChannel, int) line: 247
        io.netty.buffer.AbstractByteBuf.writeBytes(ScatteringByteChannel, int) line: 1147
        io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(ByteBuf) line: 347
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read() line: 148
        ...
    ]
}
        

Подключенный клиент
получил пять сообщений, но мы видим только один сокет для чтения и записи, и
оба содержат весь пакет сообщений.

6. Не
отставайте от новых тенденций

Здесь представлены две
концепции, которые в настоящее время становятся сильными вариантами обработки
большого количества клиентов/запросов, даже если они явно не связаны с решением
проблемы C10k.

GraalVM Native Image – для экономии ресурсов

GraalVM – мощная штука,
работающая на AOT-компиляторе и
включающая в себя фреймворк SubstrateVM. В двух словах это означает, что Graal
используется во время сборки для генерации машинного кода без каких-либо данных
профилирования.

Он будет генерировать
автономный двоичный файл без каких-либо неиспользуемых классов/методов,
внутренних структур, таких как JIT-структуры данных. Такая оптимизация
гарантирует, что мы сможем запускать наше приложение с гораздо меньшим объемом
памяти, и даже если приложение имеет менее эффективный код, мы можем в конечном
итоге получить лучшее соотношение между потребляемой памятью и общим
количеством обработанных запросов. Это означает, что мы можем развернуть
несколько экземпляров приложения с ещё большей пропускной способностью и меньшим количеством ресурсов.

Проект Loom –
устранение боли от блокировки вызовов

Вы, наверное, слышали о
fibers/green threads/goroutines. Все эти термины означают одно – избежать
планирования потоков в ядре, дилегируя ответственность в пространство
пользователя. Типичным примером является то, что у нас есть много
блокирующих вызовов, каждый запрос к приложению заканчивается в JDBC/HTTP/. Далее
происходит вызов. Нам нужно заблокировать текущий поток Java и ждать, пока не вернется ответ.

Вместо этого мы можем
использовать Fibers из Project Loom. Это гарантирует, что блокирующий вызов
фактически не блокирует поток Java, а только
текущий Fiber. Поэтому мы можем запланировать новый Fiber на текущей запущенной
Java, а затем вернуться к исходному Fiber, когда
блокирующий вызов будет выполнен. Результатом будет возможность обрабатывать
все запросы, даже с очень ограниченным числом потоков Java, потому что Fibers «почти бесплатны».

Заключение

Попытка победить
проблему C10k в основном связана с
эффективностью использования ресурсов. Код, который абсолютно нормально
работает с десятками/сотнями параллельных пользователей, может подвести, когда
мы подсунем ему тысячи параллельных соединений.

Это абсолютно нормально, если приложение не
предназначено для такого большого количества соединений т. к. далеко не весь
софт может вывезти подобное. Но всегда полезно знать об этих принципах и начать
разрабатывать высокопроизводительное приложение с нуля, избежав сложности
расширенных функций для сохранения каждого байта памяти в условиях обработки
большого количества клиентов.

Источники

Специально для сайта ITWORLD.UZ. Новость взята с сайта Библиотека программиста