Java. Повний огляд мережевих моделей. Socket API, forking, non-blocking sockets, event-driven API

Зміст

Вступ

При створенні мережевих додатків, я вважаю, потрібно розуміти принципи роботи сокетів і специфіку використання UNIX File API. Мережевий інтерфейс програмування сокетів був вперше розроблений для UNIX-систем як частина Berkeley Software Distribution (BSD) і з часом став стандартом для багатьох операційних систем. В цьому довгочиті розглянемо основи сокетів, блокуючі та неблокуючі виклики, багатопотокові сервери та подійно-орієнтовані API, а також їх реалізацію в Java.

Основи сокетів та UNIX File API

UNIX File API дозволяє користувацьким програмам взаємодіяти з файлами за допомогою файлових дескрипторів. Файловий дескриптор - це числовий ідентифікатор, який програма(процес) може використовувати для читання, запису та закриття файлів. Ця концепція поширюється на мережеву комунікацію через сокети. Сокети є кінцевими точками для відправки та отримання даних через мережу, схожими на файлові дескриптори, але пристосованими для мережевих з'єднань.

У Java сокети представлені класами java.net.Socket для клієнтської комунікації та java.net.ServerSocket для серверної комунікації. Ці класи забезпечують методи для читання та запису в мережеві з'єднання, аналогічно до файлових операцій у UNIX.

Хоча сокет - це один файловий дескриптор, він є двонаправленим каналом комунікації, який використовується для одночасного відправлення і отримання даних. Операційна система та мережеві протоколи забезпечують управління потоками даних, що дозволяє коректно розрізняти дані, що надходять і відправляються.

Як розрізняються дані в сокеті:

  • На рівні операційної системи: ОС використовує черги (buffers) для кожного сокета, що дозволяє одночасно обробляти вхідні та вихідні дані. *в моєму пості про мережі було про це

  • На рівні протоколу (TCP): TCP забезпечує механізми сегментації та реасемблювання даних. Кожен сегмент даних маркується інформацією про відправника і отримувача, що дозволяє розмежовувати різні потоки даних.

TCP сокет сервер

Почнемо з простого TCP сокет сервера в Java:

package org.archivisiondev;

import java.io.*;
import java.net.*;

public class ThreadedTCPServer {
    public static void main(String[] args) throws IOException {
        int port = 9000;
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("Server is listening on port " + port);

        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("New client connected: " + socket.getPort());

            new Thread(new ClientHandler(socket)).start();
        }
    }
}

class ClientHandler implements Runnable {
    private Socket socket;

    public ClientHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            InputStream input = socket.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(input));

            OutputStream output = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(output, true);

            String text;
            while ((text = reader.readLine()) != null) {
                System.out.println("Received: " + text);
                writer.println("Echo: " + text);
            }

            socket.close();
        } catch (IOException e) {
            if (e instanceof SocketException) {
                if (e.getMessage().equals("Connection reset")) {
                    System.out.println("Client is disconnected");
                }
            } else {
                e.printStackTrace();
            }
        }
    }
}

Цей сервер слухає на порту 9000, приймає вхідні з'єднання та відправляє отримані повідомлення назад клієнту. Однак, цей сервер обробляє одне з'єднання за раз і є блокуючим, тобто чекає завершення операцій перед тим, як продовжити. Інтерфейс для сокетів аналогічний файловим операціям: використовуються виклики read та write, а після завершення роботи з сокетом його закривають за допомогою close. Однак, ці виклики можуть бути блокуючими. На відміну від файлів, мережеві з'єднання не передбачувані, тому коли ви читаєте з файлу, ви впевнені, що отримаєте дані або отримаєте помилку відразу, але при читанні з сокета дані можуть просто не прийти.

Це легко продемонструвати. Наприклад, наш сервер працює, і ми маємо клієнта, який блокується. Замість використання curl для відправки запиту, використовуємо netcat. Якщо ми частково відправимо запит без завершення його відправки, наша програма застрягне в циклі читання запиту. В цей час сервер не зможе приймати нові з'єднання, що є проблемою. Один із способів вирішення цієї проблеми - використання потоків.

  • Що ми маємо зараз

Щоб не бавитися на стороні клієнта з сокетами, я просто використовую командний рядок щоб приєднатися до сервера. Більше мені нічого від нього не потрібно.

package org.archivisiondev;

import java.io.IOException;

public class TCPClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Process exec = Runtime.getRuntime().exec("nc localhost 9000");
        exec.waitFor();
    }
}
Сервер

Запускаю двох клієнтів. Під’єднується лише один, оскільки ми в нашому потоці, де читаємо дані від клієнта, чекаємо - але їх немає. Через це другий клієнт чекає. Якщо відключити першого, другий успішно під’єднується. Щоб вирішити цю проблему, ми можемо створювати окремий потік на обробку запиту від кожного клієнта, щоб мати змогу завжди реагувати на вхідні з’єднання.

Багатопотоковий сервер

“Форкінг“ сервера передбачає створення нового процесу або потоку для кожного клієнтського з'єднання. У Java це зазвичай робиться за допомогою потоків.

import java.io.*;
import java.net.*;

public class ThreadedTCPServer {
    public static void main(String[] args) throws IOException {
        int port = 9000;
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("Server is listening on port " + port);

        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("New client connected");

            new Thread(new ClientHandler(socket)).start();
        }
    }
}

class ClientHandler implements Runnable {
    private Socket socket;

    public ClientHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            InputStream input = socket.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(input));

            OutputStream output = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(output, true);

            String text;
            while ((text = reader.readLine()) != null) {
                System.out.println("Received: " + text);
                writer.println("Echo: " + text);
            }

            socket.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}
Всі клієнти приєднуються, незалежно один від одного

Один з недоліків цього підходу полягає в тому, що для кожного клієнта створюється новий потік. Це може призвести до значного споживання ресурсів при великій кількості одночасних підключень. Якщо ми продовжимо нашу аналогію з файлами, ми б не створювали новий процес кожного разу, коли хочемо працювати з файлами. Створення тисяч потоків для роботи з мережевими підключеннями може бути не оптимальним рішенням.

Java Blocking IO

Ранні веб-сервіси і мережеві програми використовували саме цей механізм. Але що, якщо ми хочемо обробляти всі підключення в одному потоці?

Неблокуючі виклики з каналами (java.nio.Channel)

Щоб подолати обмеження блокуючих викликів, Java NIO (New I/O) надає неблокуючі операції введення/виведення за допомогою каналів і селекторів. Це дозволяє одному потоку ефективно керувати кількома з'єднаннями.

Java NIO

Як працюють неблокуючі сокети під капотом

Неблокуючі сокети дозволяють виконувати операції читання і запису без необхідності чекати, поки дані стануть доступними або поки операція завершиться. Це досягається встановленням спеціального прапорця для сокету, який робить його неблокуючим. Коли сокет знаходиться в неблокуючому режимі, методи читання і запису повертають управління негайно, навіть якщо операція не завершена.

  • NON_BLOCKING Mode: Встановлення сокету в неблокуючий режим (NON_BLOCKING) означає, що операції введення/виведення не блокують виконання програми. Це робиться шляхом встановлення прапорця O_NONBLOCK або FIONBIO (залежно від операційної системи) на сокеті.

  • Читання: Якщо при спробі читання з неблокуючого сокету дані ще не доступні, операція повертає відразу з кодом помилки EAGAIN або EWOULDBLOCK. Це означає, що програма повинна повторити спробу читання пізніше.

  • Запис: Аналогічно, при спробі запису в неблокуючий сокет, якщо операція не може бути завершена відразу (наприклад, буфер повний), вона також повертає негайно з кодом помилки EAGAIN або EWOULDBLOCK.

Використання каналів і селекторів

У Java NIO неблокуючі операції виконуються за допомогою каналів (java.nio.channels.SocketChannel) та селекторів (java.nio.channels.Selector). Селектор дозволяє одному потоку відслідковувати події на багатьох каналах, таких як готовність до читання, запису або прийняття нових з'єднань.

  • Канали (Channels): Канали представляють зв'язок між програмою і I/O пристроєм, таким як файл або мережевий сокет. SocketChannel є реалізацією каналу для TCP сокетів, яка підтримує неблокуючий режим.

  • Селектори (Selectors): Селектори використовуються для управління множинними каналами в одному потоці. Вони дозволяють програмі ефективно визначати, який канал готовий до операції без блокування. Це досягається реєстрацією каналів на селекторі і опитуванням їх стану.

package org.archivisiondev;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NonBlockingTCPServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9000));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();

            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();

                if (!key.isValid()) continue;

                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                }
            }
        }
    }

    private static void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(key.selector(), SelectionKey.OP_READ);
        System.out.println("New connection from " + socketChannel.getRemoteAddress());
    }

    private static void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(256);
        int numRead = socketChannel.read(buffer);

        if (numRead == -1) {
            socketChannel.close();
            key.cancel();
            return;
        }

        String received = new String(buffer.array()).trim();
        System.out.println("Received: " + received);
        buffer.flip();
        socketChannel.write(ByteBuffer.wrap(("Echo: " + received).getBytes()));
    }
}

Цей сервер використовує Selector і ServerSocketChannel для обробки кількох з'єднань без блокування. Selector моніторить канали на події, такі як прийняття та читання, дозволяючи одному потоку ефективно керувати багатьма з'єднаннями.

Мінуси неблокуючого підходу з селекторами в NIO

Неблокуючий підхід з використанням селекторів у Java NIO має певні недоліки, які можуть вплинути на продуктивність і складність реалізації системи. Основною проблемою є так званий busy-loop: коли немає даних для читання або запису, селектор постійно опитує канали, що призводить до зайвого використання ресурсів процесора. Складність реалізації неблокуючих сокетів вимагає глибокого розуміння асинхронного введення/виведення та обробки подій, що може бути не тривіальною задачею для розробників.

Netty. Подійно-орієнтований API

Netty, високопродуктивний фреймворк на базі NIO для створення мережевих додатків, вирішує багато проблем, пов'язаних із неблокуючим введенням/виведенням та селекторами в Java, включаючи проблему busy-loops. Netty забезпечує вищий рівень абстракції над Java NIO, спрощуючи процес розробки та приховуючи нюанси, пов'язані з селекторами, буферами і каналами. Він використовує event-driven архітектуру з ефективним циклом подій, який оптимізує пробудження селектора та використовує черги завдань для уникнення busy waiting , ефективно переходячи в стан очікування при відсутності завдань.

Netty також забезпечує ефективне управління ресурсами завдяки власній системі управління буферами та ефективному управлінню з'єднаннями. Його архітектура для обробки конкурентності зменшує складність управління декількома потоками та уникає таких проблем, як умови гонок та взаємні блокування. Netty абстрагує платформо-залежні деталі, забезпечуючи нормальну роботу на різних операційних системах, що зменшує проблеми з портативністю.

Сервер на Netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.109.Final</version>
</dependency>
package org.archivisiondev;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandlerContext;

public class NettyTCPServer {
    private final int port;

    public NettyTCPServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new EchoServerHandler());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    private static class EchoServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("Received: " + msg);
            ctx.writeAndFlush("Echo: " + msg);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("New connection established on " + ctx.channel().remoteAddress());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyTCPServer(9000).start();
    }
}

Архітектура Netty

Netty використовує подієву архітектуру, засновану на циклі подій (event loop), який обробляє події за допомогою каналів (Channel) і обробників (ChannelHandler). Кожен цикл подій керує групою потоків, які обробляють завдання асинхронно. Це дозволяє ефективно обробляти велику кількість одночасних з'єднань без зайвих витрат на ресурси, забезпечуючи масштабованість і високу продуктивність.

Разработка высоконагруженного игрового WebSocket сервера на Java, Netty с  поддержкой BattleRoyale/Matchmaking / Хабр

Відео про BSD сокети

Поділись своїми ідеями в новій публікації.
Ми чекаємо саме на твій довгочит!
Oleksandr Klymenko
Oleksandr Klymenko@overpathz

Java Software Engineer

4.6KПрочитань
1Автори
71Читачі
На Друкарні з 19 квітня

Більше від автора

  • Secure networking. Deep Dive

    Глибоке занурення в протоколи TLS/SSL та інфраструктуру відкритих ключів (PKI). Основні поняття, процес встановлення захищеного з'єднання, роль сертифікатів та ланцюжка довіри

    Теми цього довгочиту:

    Security
  • Поширені помилки у дизайні REST API

    У довгочиті розглядаються поширені помилки при проектуванні REST API та способи їх уникнення: версіонування, використання DTO, підхід CQRS, робота з мікросервісами, та інші практики для підвищення продуктивності, безпеки й зручності API

    Теми цього довгочиту:

    Java
  • Java. Короткий огляд еволюції багатопотоковості

    У перших версіях Java багатопоточність реалізовувалася за допомогою класу Thread, який дозволяв створювати нові потоки. Проте ця модель мала багато недоліків:

    Теми цього довгочиту:

    Java

Вам також сподобається

Коментарі (0)

Підтримайте автора першим.
Напишіть коментар!

Вам також сподобається