[Программирование, Java] Итак, вы хотите оптимизировать gRPC. Часть 1 (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Часто возникает вопрос о том, как ускорить gRPC. gRPC позволяет реализовать высокопроизводительный RPC, но не всегда понятно как достичь этого быстродействия. И я решил попытаться показать ход своих мыслей при оптимизации программ.Рассмотрим простой сервис "ключ-значение", который используется несколькими клиентами. Сервис должен корректно работать при параллельных операциях изменения данных. Также должна быть возможность масштабирования. И, в конце концов, он должен быть быстрым. Для реализации подобного сервиса gRPC будет идеальным вариантом. Давайте посмотрим, как его реализовать.Я написал пример клиента и сервера на Java. В примере три основных класса и protobuf-файл, описывающий API:
- KvClient — имитирует пользователя сервиса "ключ-значение". Он случайным образом создает, извлекает, изменяет и удаляет ключи и значения. Размер используемых ключей и значений также определяется случайным образом, используя экспоненциальное распределение.
- KvService — реализация сервиса "ключ-значение". Он обрабатывает запросы от клиентов. Для имитации хранения данных на диске при операциях чтения и записи добавляются небольшие задержки в 10 и 50 мс, чтобы сделать пример похожим на настоящую базу данных.
- KvRunner — организует взаимодействие между клиентом и сервером. Это основная точка входа, запускающая процессы клиента и сервера, и ожидающая, пока клиент выполнит свою работу. Runner работает в течение 60 секунд, а затем выводит количество выполненных RPC.
- kvstore.proto — определение Protocol Buffers нашего сервиса. Здесь описывается, что клиенты могут ожидать от сервиса. Для простоты в качестве операций мы будем использовать Create, Retrieve, Update и Delete (широко известные как CRUD). Эти операции работают с ключами и значениями, состоящими из произвольных байт. Хотя это в некоторой степени похоже на REST, но у нас остается возможность добавить более сложные операции в будущем.
Protocol Buffers можно использовать и без gRPC — это удобный способ определения интерфейсов сервисов, а также генерации клиентского и серверного кода. Генерируемый код действует как клей между логикой приложения и библиотекой gRPC. Этот gRPC-код, используемый клиентом, мы называем стабом (stub, заглушка).Первоначальная версияКлиентТеперь, когда мы разобрались с тем, что программа должна делать, можно начать изучать, как она это делает. Как уже упоминалось выше, клиент выполняет случайные RPC. Например, следующий код делает запрос create:
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
Создается случайный ключ и случайное значение. Запрос отправляется на сервер, и клиент ждет ответа. Когда ответ получен, проверяется, соответствует ли он тому, что ожидалось, и если не соответствует, то бросается исключение. Несмотря на то что ключи генерируются случайным образом, они должны быть уникальны, поэтому необходимо убедиться, что ключ еще не используется. Для проверки уникальности мы отслеживаем созданные ключи, чтобы не использовать один и тот же ключ дважды. Однако вполне вероятно, что какой-то другой клиент уже создал определенный ключ, поэтому мы его логируем и идем дальше. В противном случае выбрасывается исключение.Здесь мы используем блокирующий gRPC API, который отправляет запрос и ждет ответа. Это самый простой gRPC-стаб, блокирующий поток. Получается, что клиент одновременно может выполнять не более одного RPC.СерверНа стороне сервера запрос обрабатывает следующий код:
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
Сервис извлекает из запроса ключ и значение как ByteBuffer. И для контроля, что одновременные запросы не повредят хранилище, метод объявлен как synchronized. После имитации записи на диск данные сохраняются в Map.В отличие от клиентского кода, серверный обработчик является неблокирующим. Для отправки ответа вызывается onNext() у responseObserver. Для завершения отправки сообщения вызывается onCompleted().ПроизводительностьКод выглядит корректным и безопасным — давайте посмотрим, как он работает. Я использую компьютер с Ubuntu, 12-ти ядерным процессором и 32 ГБ памяти. Давайте выполним сборку и запустим:
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
Ну и дела! На такой мощной машине только 16 RPC в секунду. Процессор почти не используется, и мы не знаем, сколько потреблялось памяти. Давайте выяснять, почему получился такой плохой результат.ОптимизацияАнализПрежде чем вносить какие-либо изменения, давайте разберемся, что делает программа. Нам нужно выяснить, где код тратит свое время, чтобы понять что необходимо оптимизировать. На этом этапе инструменты профилирования пока не нужны, мы можем просто проанализировать код.Клиент запускается и последовательно выполняет RPC примерно в течение минуты. На каждой итерации он случайным образом решает, какую операцию выполнить:
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
Это означает, что одновременно может выполняться не более одного RPC. Каждый вызов должен ждать завершения предыдущего. А сколько времени занимает каждый RPC? После изучения кода сервера мы видим, что самая тяжелая операция занимает около 50 мс. При максимальной эффективности получится выполнить только 20 запросов в секунду:20 запросов = 1000 мс / (50 мс / запрос)Наш код может выполнять около 16 запросов в секунду, что кажется правдой. Мы можем проверить это предположение, посмотрев на вывод команды time, используемой для запуска кода. При выполнении запросов в методе simulateWork сервер просто спит (sleep). Получается, что программа в основном простаивает, ожидая завершения RPC.Можно подтвердить это, посмотрев выше на реальное время выполнения между стартом и завершением (real) и на время использования процессора (user). Прошла одна минута, но процессор потратил только 10 секунд. Мой мощный многоядерный процессор был занят только 16% времени. Таким образом, если заставить программу выполнять больше работы в течение этого времени, то, похоже, можно увеличить количество RPC.ГипотезаТеперь мы четко видим проблему и можем предложить решение. Один из способов ускорить работу — убедиться, что процессор не простаивает. Для этого мы будем выполнять работу параллельно.В gRPC-библиотеке для Java есть три типа стабов: блокирующие, неблокирующие и ListenableFuture. Мы уже видели блокирующий в клиенте и неблокирующий на сервере. ListenableFuture API — это компромисс между ними, предлагающий как блокирующее, так и не блокирующее поведение. До тех пор, пока мы не блокируем поток, ожидающий завершения работы, мы можем запускать новые RPC, не дожидаясь завершения старых. ЭкспериментДля проверки нашей гипотезы давайте используем в клиенте ListenableFuture. А это значит, что теперь нам надо больше думать о конкурентности в коде. Например, при отслеживании используемых ключей на клиенте нам нужно безопасно читать, изменять и записывать ключи. Также необходимо убедиться, что в случае ошибки мы прекратим выполнять новые RPC (правильная обработка ошибок будет рассмотрена в следующем посте). Наконец, нам нужно увеличить счетчик выполненных RPC и не забыть, что RPC выполняются в разных потоках.Все эти изменения усложняют код. Но это компромисс, на который мы идем для улучшения производительности. Часто простота кода противоречит оптимизации. Тем не менее приведенный ниже код по-прежнему вполне читабельный и поток выполнения идет сверху вниз. Вот исправленный метод doCreate():
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
Стаб был изменен на KeyValueServiceFutureStub, который создает Future вместо прямого возврата значения. В gRPC Java используется ListenableFuture, который позволяет добавить обратный вызов при завершении Future. Здесь нас не сильно беспокоит ответ. Нас больше волнует, выполнился RPC или нет. Поэтому код в основном обрабатывает ошибки, а не ответ.Первое изменение в том как мы считаем количество RPC. Вместо того чтобы увеличивать счетчик в основном цикле, мы увеличиваем его по завершении RPC.Далее для каждого RPC мы создаем новый объект, который обрабатывает как успешные, так и неудачные попытки. Так как doCreate()уже завершится к моменту запуска метода обратного вызова после выполненного RPC, нам нужен способ распространения ошибок, отличный от throw. Вместо этого мы пытаемся обновить ссылку атомарно. Основной цикл время от времени проверяет, не произошла ли ошибка, и останавливается при ее обнаружении.И, наконец, в коде добавляется ключ в knownKeys только тогда, когда RPC действительно завершен, и удаляется тогда, когда известно, что произошел сбой. Мы синхронизируемся на переменной, чтобы убедиться, что два потока не конфликтуют. Примечание: хотя доступ к knownKeys потокобезопасный, но вероятность возникновения состояния гонки остается. Один поток может прочитать из knownKeys, потом второй удалить из knownKeys, а потом первый может выполнить RPC, используя прочитанный ключ. Синхронизация на ключах гарантирует только согласованность, но не их правильность. Исправление этого выходит за рамки данного поста, поэтому мы просто логируем это и двигаемся дальше. Вы увидите несколько таких сообщений в логе, когда запустите программу.Запускаем кодЕсли вы запустите эту программу, то увидите, что она не работает:
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
Что?! Зачем я показываю вам код, который не работает? Обычно в реальной жизни с первой попытки ничего не работает. В данном случае не хватило памяти. А когда программе не хватает памяти, начинают происходить странные вещи. И часто первопричину найти довольно трудно, так как бывает много сбивающих с толку моментов. Сообщение об ошибке говорит: "unable to create new native thread" (не удалось создать новый нативный поток), хотя мы не создавали никаких новых потоков. В устранении подобных проблем мне очень помогает опыт, а не отладка. Я часто встречался с OOM и понял, что Java говорит нам о последней капле, переполнившей чашу. Наша программа начала использовать много памяти и последнее выделение памяти, которое не удалось выполнить, случайно произошло при создании потока.Итак, что же случилось? В блокирующей версии очередной RPC не запускался, пока не завершался предыдущий. Это было медленно, но это и помогало нам не создать тонны RPC, для которых у нас в итоге не хватило памяти. Мы должны учесть это в версии с ListenableFuture.Для решения этой проблемы можно реализовать ограничение на количество активных RPC. Перед запуском нового RPC мы постараемся получить разрешение. Если мы его получаем, то выполняем RPC. Если нет, то ждем, пока оно не станет доступным. Когда RPC завершится (успешно или неудачно), разрешение возвращается. Для этого мы будем использовать семафор (Semaphore):
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
Теперь код работает успешно и память не заканчивается.РезультатыПосле внесенных изменений все выглядит намного лучше:
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
Наш код выполняет на 46% больше RPC в секунду, чем раньше. Также мы видим, что используется примерно на 20% больше процессора. Наша гипотеза оказалась верной, и внесенные изменения сработали. И при этом мы не делали никаких изменений на сервере. А также не использовали какие-то специальные профилировщики или трассировщики.Есть ли смысл в этих цифрах? Мы ожидаем, что примерно 1/4 операций будут мутабельными (create, update и delete). И чтение также будет в 1/4 случаях. Среднее время RPC должно быть примерно равно средневзвешенному значению:
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
При среднем значении в 40 мс на один RPC мы ожидаем, что количество RPC в секунду будет:25 запросов = 1000 мс / (40 мс / запрос)Это примерно то, что мы и видим. Но сервер по прежнему обрабатывает запросы последовательно, поэтому в будущем нам предстоит еще поработать над ним. Хотя на данный момент наша оптимизация, похоже, сработала.ВыводыЕсть много способов оптимизировать gRPC-код. Для этого необходимо понимать, что ваш код делает и что он должен делать. В этом посте показаны только самые основы того, как подходить к оптимизации. Всегда измеряйте производительность до и после внесения изменений и используйте эти измерения как руководство для оптимизации.
Перевод статьи подготовлен в преддверии старта курса Java Developer. Basic. Приглашаем всех желающих посетить бесплатный вебинар, в рамках которого наши эксперты расскажут о карьерных перспективах после прохождения курса, а также ответят на интересующие вас вопросы. Записаться на вебинар.
===========
Источник:
habr.com
===========
===========
Автор оригинала: Carl Mastrangelo
===========Похожие новости:
- [JavaScript, Программирование, Расширения для браузеров, Браузеры] Hello, Word! Разрабатываем браузерное расширение в 2021-м
- [JavaScript, ReactJS] Небольшая практика с JS Proxy для оптимизации перерисовок React компонентов при использовании useContext
- [Python, Программирование, Программирование микроконтроллеров] Маленькие Python для маленьких embedded-программистов: CircuitPython и MicroPython для MeowBit
- [Программирование, Разработка под Android, Kotlin] О взаимосвязи между корутинами, потоками и проблемами параллелизма (перевод)
- [Программирование, C++, C, Разработка под Linux] Введение в неблокирующие алгоритмы (перевод)
- [Высокая производительность, Программирование, Java, Параллельное программирование] Реактивное программирование на Java: как, зачем и стоит ли? Часть II
- [Высокая производительность, JavaScript, Программирование, WebAssembly] Разгоняем JS-парсер с помощью WebAssembly (часть 1: базовые возможности)
- [Программирование, Облачные сервисы, IT-компании] Обладательница фамилии True полгода не может воспользоваться своим аккаунтом в Apple iCloud
- [Разработка веб-сайтов, Программирование, Анализ и проектирование систем, SQL, Прототипирование] Применяем NOCODE и LOWCODE для вычислений
- [Разработка систем связи, Программирование микроконтроллеров] Составное устройство USB на STM32. Часть 3: Звуковое устройство отдельно, виртуальный СОМ-порт отдельно
Теги для поиска: #_programmirovanie (Программирование), #_java, #_java, #_grpc, #_optimizatsija (оптимизация), #_blog_kompanii_otus (
Блог компании OTUS
), #_programmirovanie (
Программирование
), #_java
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 10:02
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Часто возникает вопрос о том, как ускорить gRPC. gRPC позволяет реализовать высокопроизводительный RPC, но не всегда понятно как достичь этого быстродействия. И я решил попытаться показать ход своих мыслей при оптимизации программ.Рассмотрим простой сервис "ключ-значение", который используется несколькими клиентами. Сервис должен корректно работать при параллельных операциях изменения данных. Также должна быть возможность масштабирования. И, в конце концов, он должен быть быстрым. Для реализации подобного сервиса gRPC будет идеальным вариантом. Давайте посмотрим, как его реализовать.Я написал пример клиента и сервера на Java. В примере три основных класса и protobuf-файл, описывающий API:
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey(); try { CreateResponse res = stub.create( CreateRequest.newBuilder() .setKey(key) .setValue(randomBytes(MEAN_VALUE_SIZE)) .build()); if (!res.equals(CreateResponse.getDefaultInstance())) { throw new RuntimeException("Invalid response"); } } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Code.ALREADY_EXISTS) { knownKeys.remove(key); logger.log(Level.INFO, "Key already existed", e); } else { throw e; } } } private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override public synchronized void create( CreateRequest request, StreamObserver<CreateResponse> responseObserver) { ByteBuffer key = request.getKey().asReadOnlyByteBuffer(); ByteBuffer value = request.getValue().asReadOnlyByteBuffer(); simulateWork(WRITE_DELAY_MILLIS); if (store.putIfAbsent(key, value) == null) { responseObserver.onNext(CreateResponse.getDefaultInstance()); responseObserver.onCompleted(); return; } responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); } $ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient INFO: Starting Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient INFO: Did 16.55 RPCs/s real 1m0.927s user 0m10.688s sys 0m1.456s void doClientWork(AtomicBoolean done) {
Random random = new Random(); KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel); while (!done.get()) { // Pick a random CRUD action to take. int command = random.nextInt(4); if (command == 0) { doCreate(stub); continue; } /* ... */ rpcCount++; } } private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey(); ListenableFuture<CreateResponse> res = stub.create( CreateRequest.newBuilder() .setKey(key) .setValue(randomBytes(MEAN_VALUE_SIZE)) .build()); res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor()); Futures.addCallback(res, new FutureCallback<CreateResponse>() { @Override public void onSuccess(CreateResponse result) { if (!result.equals(CreateResponse.getDefaultInstance())) { error.compareAndSet(null, new RuntimeException("Invalid response")); } synchronized (knownKeys) { knownKeys.add(key); } } @Override public void onFailure(Throwable t) { Status status = Status.fromThrowable(t); if (status.getCode() == Code.ALREADY_EXISTS) { synchronized (knownKeys) { knownKeys.remove(key); } logger.log(Level.INFO, "Key already existed", t); } else { error.compareAndSet(null, t); } } }); } WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) ... private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) throws InterruptedException { limiter.acquire(); ByteString key = createRandomKey(); ListenableFuture<CreateResponse> res = stub.create( CreateRequest.newBuilder() .setKey(key) .setValue(randomBytes(MEAN_VALUE_SIZE)) .build()); res.addListener(() -> { rpcCount.incrementAndGet(); limiter.release(); }, MoreExecutors.directExecutor()); /* ... */ } $ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient INFO: Starting Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient INFO: Did 24.283 RPCs/s real 1m0.923s user 0m12.772s sys 0m1.572s .25 * 50ms (create)
.25 * 10ms (retrieve) .25 * 50ms (update) +.25 * 50ms (delete) ------------ 40ms Перевод статьи подготовлен в преддверии старта курса Java Developer. Basic. Приглашаем всех желающих посетить бесплатный вебинар, в рамках которого наши эксперты расскажут о карьерных перспективах после прохождения курса, а также ответят на интересующие вас вопросы. Записаться на вебинар.
=========== Источник: habr.com =========== =========== Автор оригинала: Carl Mastrangelo ===========Похожие новости:
Блог компании OTUS ), #_programmirovanie ( Программирование ), #_java |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 10:02
Часовой пояс: UTC + 5