[Алгоритмы, Высокая производительность, Математика, Программирование] Динамическая балансировка нагрузки в pull-схеме

Автор Сообщение
news_bot ®

Стаж: 6 лет 4 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
02-Сен-2020 00:31

В прошлой новости про принципы работы коллекторов логов PostgreSQL я упомянул, что одним из недостатков pull-модели является необходимость динамической балансировки нагрузки. Но если делать ее аккуратно, то недостаток превращается в достоинство, а система в целом становится гораздо более устойчивой к изменениям потока данных.

Давайте посмотрим, какие решения есть у этой задачи.
Распределение объектов «по мощности»
Чтобы не углубляться в неинтересные абстракции, будем рассматривать на примере конкретной задачи — мониторинга. Соотнести предлагаемые методики на свои конкретные задачи, уверен, вы сможете самостоятельно.
«Равномощные» объекты мониторинга
В качестве примера можно привести наши коллекторы метрик для Zabbix, которые исторически имеют с коллекторами логов PostgreSQL общую архитектуру.
И правда, каждый объект мониторинга (хост) генерирует для zabbix практически стабильно один и тот набор метрик с одной и той же частотой все время:

Как видно на графике, разница между min-max значениями количества генерируемых метрик не превышает 15%. Поэтому мы можем считать все объекты равными в одинаковых «попугаях».
Сильный «дисбаланс» между объектами
В отличие от предыдущей модели, для коллекторов логов наблюдаемые хосты совсем не являются однородными.
Например, база рабочего один хост может генерировать в лог миллион планов за сутки, другой десятки тысяч, а какой-то — и вовсе единицы. Да и сами эти планы по объему и сложности и по распределению во времени суток сильно отличаются. Так и получается, что нагрузку сильно «качает», в разы:

Ну, а раз нагрузка может меняться настолько сильно, то надо учиться ей управлять
Координатор
Сразу понимаем, что нам явно понадобится масштабирование системы коллекторов, поскольку один отдельный узел со всей нагрузкой когда-то точно перестанет справляться. А для этого нам потребуется координатор — тот, кто будет управлять всем зоопарком.
Получается примерно такая схема:

Каждый worker свою нагрузку «в попугаях» и в процентах CPU периодически сбрасывает master'у, те — коллектору. А он, на основании этих данных, может выдать команду типа «новый хост посадить на ненагруженный worker#4» или «hostA надо пересадить на worker#3».
Тут еще надо помнить, что, в отличие от объектов мониторинга, сами коллекторы имеют вовсе не равную «мощность» — например, на одном у вас может оказаться 8 ядер CPU, а на другом — только 4, да еще и меньшей частоты. И если нагрузить их задачами «поровну», то второй начнет «затыкаться», а первый — простаивать. Отсюда и вытекают…
Задачи координатора
По сути, задача всего одна — обеспечивать максимально равномерное распределение всей нагрузки (в %cpu) по всем доступным worker'ам. Если мы сможем решить ее идеально, то и равномерность распределения %cpu-нагрузки по коллекторам получим «автоматом».
Понятно, что, даже если каждый объект генерирует одинаковую нагрузку, со временем какие-то из них могут «отмирать», а какие-то возникать новые. Поэтому управлять всей ситуацией надо уметь динамически и поддерживать баланс постоянно.
Динамическая балансировка
Простую задачу (zabbix) мы можем решить достаточно банально:
  • вычисляем относительную мощность каждого коллектора «в задачах»
  • делим все задачи между ними пропорционально
  • между worker'ами распределяем равномерно


Но что делать в случае «сильно неравных» объектов, как для коллектора логов?..
Оценка равномерности
Выше мы все время употребляли термин "максимально равномерное распределение", а как вообще можно формально сравнить два распределения, какое из них «равномернее»?
Для оценки равномерности в математике давно существует такая вещь как среднеквадратичное отклонение. Кому лениво вчитываться:
S[X] = sqrt( sum[ ( x - avg[X] ) ^ 2 of X ] / count[X] )

Поскольку количество worker'ов на каждом из коллекторов у нас тоже может отличаться, то нормировать разброс по нагрузке надо не только между ними, но и между коллекторами в целом.
То есть распределение нагрузки по worker'ам двух коллекторов [ (10%, 10%, 10%, 10%, 10%, 10%) ; (20%) ] — это тоже не очень хорошо, поскольку на первом получается 10%, а на втором — 20%, что как бы вдвое больше в относительных величинах.
Поэтому введем единую метрику-расстояние для общей оценки «равномерности»:
d([%wrk], [%col]) = sqrt( S[%wrk] ^ 2 + S[%col] ^ 2 )
То есть величины среднеквадратичного отклонения для наборов величин нагрузки по всем worker'ам и по всем коллекторам воспринимаем как координаты вектора, длину которого будем стараться минимизировать.
Моделирование
Если бы объектов у нас было немного, то мы могли бы полным перебором «разложить» их между worker'ами так, чтобы метрика оказалась минимальной. Но объектов у нас — тысячи, поэтому такой способ не подойдет. Зато мы знаем, что коллектор умеет «перемещать» объект с одного worker'а на другой — давайте этот вариант и смоделируем, используя метод градиентного спуска.
Понятно, что «идеальный» минимум метрики мы так можем и не найти, но локальный — точно. Да и сама нагрузка может изменяться во времени настолько сильно, что искать за бесконечное время «идеал» абсолютно незачем.
То есть нам осталось всего лишь определить, какой объект и на какой worker эффективнее всего «переместить». И сделаем это банальным переборным моделированием:
  • для каждой пары (целевой host, worker) моделируем перенос нагрузки
  • нагрузку от host внутри исходного worker'а считаем пропорционально «попугаям»
    В нашем случае за «попугая» оказалось вполне разумно взять объем получаемого потока логов в байтах.
  • относительную мощность между коллекторами считаем пропорциональной «суммарным попугаям»
  • вычисляем метрику d для «перенесенного» состояния

Выстраиваем все пары по возрастанию метрики. В идеале, нам всегда стоит реализовать перенос именно первой пары, как дающий минимальную целевую метрику. К сожалению, в реальности сам процесс переноса «стоит ресурсов», поэтому не стоит запускать его для одного и того же объекта чаще определенного интервала «охлаждения».
В этом случае мы можем взять вторую, третью,… по рангу пару — лишь бы целевая метрика уменьшалась относительно текущего значения.
Если же уменьшать некуда — вот он локальный минимум!
Пример на картинке:

Запускать итерации «до упора» при этом вовсе не обязательно. Например, можно делать усредненный анализ нагрузки на интервале 1 мин, и по его завершению делать единственный перенос.
Микро-оптимизации
Понятно, что алгоритм со сложностью T(целей) x W(процессов) — это не очень хорошо. Но в нем стоит не забыть применить некоторые более-менее очевидные оптимизации, которые его могут ускорить в разы.
Нулевые «попугаи»
Если на замеренном интервале объект/задача/хост сгенерировал нагрузку «0 штук», то его не то что перемещать куда-то — его даже рассматривать и анализировать не надо.
Самоперенос
При генерации пар нет необходимости оценивать эффективность переноса объекта на тот же самый worker, где он и так находится. Все-таки уже будет T x (W - 1) — мелочь, а приятно!
Неразличимая нагрузка
Поскольку мы моделируем все-таки перенос именно нагрузки, а объект — всего лишь инструмент, то пробовать переносить «одинаковый» %cpu нет смысла — значения метрик останутся точно те же, хоть и для другого распределения объектов.
То есть достаточно оценить единственную модель для кортежа (wrkSrc, wrkDst, %cpu). Ну, а «одинаковыми» вы можете считать, например, значения %cpu, совпадающие до 1 знака после запятой.

Пример реализации на JavaScript

SPL
var col = {
  'c1' : {
    'wrk' : {
      'w1' : {
        'hst' : {
          'h1' : 5
        , 'h2' : 1
        , 'h3' : 1
        }
      , 'cpu' : 80.0
      }
    , 'w2' : {
        'hst' : {
          'h4' : 1
        , 'h5' : 1
        , 'h6' : 1
        }
      , 'cpu' : 20.0
      }
    }
  }
, 'c2' : {
    'wrk' : {
      'w1' : {
        'hst' : {
          'h7' : 1
        , 'h8' : 2
        }
      , 'cpu' : 100.0
      }
    , 'w2' : {
        'hst' : {
          'h9' : 1
        , 'hA' : 1
        , 'hB' : 1
        }
      , 'cpu' : 50.0
      }
    }
  }
};
// вычисляем опорные метрики и нормализуем по "мощности"
let $iv = (obj, fn) => Object.values(obj).forEach(fn);
let $mv = (obj, fn) => Object.values(obj).map(fn);
// initial reparse
for (const [cid, c] of Object.entries(col)) {
  $iv(c.wrk, w => {
    w.hst = Object.keys(w.hst).reduce((rv, hid) => {
      if (typeof w.hst[hid] == 'object') {
        rv[hid] = w.hst[hid];
        return rv;
      }
      // нулевые значения ничего не решают, поэтому сразу отбрасываем
      if (w.hst[hid]) {
        rv[hid] = {'qty' : w.hst[hid]};
      }
      return rv;
    }, {});
  });
  c.wrk = Object.keys(c.wrk).reduce((rv, wid) => {
    // ID воркеров должны быть глобально-уникальны
    rv[cid + ':' + wid] = c.wrk[wid];
    return rv;
  }, {});
}
// среднеквадратичное отклонение
let S = col => {
  let wsum = 0
    , wavg = 0
    , wqty = 0
    , csum = 0
    , cavg = 0
    , cqty = 0;
  $iv(col, c => {
    $iv(c.wrk, w => {
      wsum += w.cpu;
      wqty++;
    });
    csum += c.cpu;
    cqty++;
  });
  wavg = wsum/wqty;
  wsum = 0;
  cavg = csum/cqty;
  csum = 0;
  $iv(col, c => {
    $iv(c.wrk, w => {
      wsum += (w.cpu - wavg) ** 2;
    });
    csum += (c.cpu - cavg) ** 2;
  });
  return [Math.sqrt(wsum/wqty), Math.sqrt(csum/cqty)];
};
// метрика-расстояние
let distS = S => Math.sqrt(S[0] ** 2 + S[1] ** 2);
// выбираем оптимальный перенос и моделируем его
let iterReOrder = col => {
  let qty = 0
    , max = 0;
  $iv(col, c => {
    c.qty = 0;
    c.cpu = 0;
    $iv(c.wrk, w => {
      w.qty = 0;
      $iv(w.hst, h => {
        w.qty += h.qty;
      });
      w.max = w.qty * (100/w.cpu);
      c.qty += w.qty;
      c.cpu += w.cpu;
    });
    c.cpu = c.cpu/Object.keys(c.wrk).length;
    c.max = c.qty * (100/c.cpu);
    qty += c.qty;
    max += c.max;
  });
  $iv(col, c => {
    c.nrm = c.max/max;
    $iv(c.wrk, w => {
      $iv(w.hst, h => {
        h.cpu = h.qty/w.qty * w.cpu;
        h.nrm = h.cpu * c.nrm;
      });
    });
  });
  // "текущее" среднеквадратичное отклонение
  console.log(S(col), distS(S(col)));
  // формируем набор хостов и воркеров
  let wrk = {};
  let hst = {};
  for (const [cid, c] of Object.entries(col)) {
    for (const [wid, w] of Object.entries(c.wrk)) {
      wrk[wid] = {
        wid
      , cid
      , 'wrk' : w
      , 'col' : c
      };
      for (const [hid, h] of Object.entries(w.hst)) {
        hst[hid] = {
          hid
        , wid
        , cid
        , 'hst' : h
        , 'wrk' : w
        , 'col' : c
        };
      }
    }
  }
  // реализация переноса нагрузки на целевой worker
  let move = (col, hid, wid) => {
    let w = wrk[wid]
      , h = hst[hid];
    let wsrc = col[h.cid].wrk[h.wid]
      , wdst = col[w.cid].wrk[w.wid];
    wsrc.cpu -= h.hst.cpu;
    wsrc.qty -= h.hst.qty;
    wdst.qty += h.hst.qty;
    // перенос на другой коллектор с "процентованием" нагрузки на CPU
    if (h.cid != w.cid) {
      let csrc = col[h.cid]
        , cdst = col[w.cid];
      csrc.qty -= h.hst.qty;
      csrc.cpu -= h.hst.cpu/Object.keys(csrc.wrk).length;
      wsrc.hst[hid].cpu = h.hst.cpu * csrc.nrm/cdst.nrm;
      cdst.qty += h.hst.qty;
      cdst.cpu += h.hst.cpu/Object.keys(cdst.wrk).length;
    }
    wdst.cpu += wsrc.hst[hid].cpu;
    wdst.hst[hid] = wsrc.hst[hid];
    delete wsrc.hst[hid];
  };
  // моделирование и оценка переноса для пары (host, worker)
  let moveCheck = (orig, hid, wid) => {
    let w = wrk[wid]
      , h = hst[hid];
    // тот же воркер - ничего не делаем
    if (h.wid == w.wid) {
      return;
    }
    let col = JSON.parse(JSON.stringify(orig));
    move(col, hid, wid);
    return S(col);
  };
  // хэш уже проверенных переносов (hsrc,hdst,%cpu)
  let checked = {};
  // перебираем все возможные пары (какой хост -> на какой воркер)
  let moveRanker = col => {
    let currS = S(col);
    let order = [];
    for (hid in hst) {
      for (wid in wrk) {
        // нет смысла пробовать повторно перемещать одну и ту же (с точностью до 0.1%) "мощность" между одной парой воркеров
        let widsrc = hst[hid].wid;
        let idx = widsrc + '|' + wid + '|' + hst[hid].hst.cpu.toFixed(1);
        if (idx in checked) {
          continue;
        }
        let _S = moveCheck(col, hid, wid);
        if (_S === undefined) {
          _S = currS;
        }
        checked[idx] = {
          hid
        , wid
        , S : _S
        };
        order.push(checked[idx]);
      }
    }
    order.sort((x, y) => distS(x.S) - distS(y.S));
    return order;
  };
  let currS = S(col);
  let order = moveRanker(col);
  let opt = order[0];
  console.log('best move', opt);
  // реализуем перенос
  if (distS(opt.S) < distS(currS)) {
    console.log('move!', opt.hid, opt.wid);
    move(col, opt.hid, opt.wid);
    console.log('after move', JSON.parse(JSON.stringify(col)));
    return true;
  }
  else {
    console.log('none!');
  }
  return false;
};
// пока есть что-куда переносить
while(iterReOrder(col));

В результате, нагрузка по нашим коллекторам распределяется практически одинаково в каждый момент времени, оперативно нивелируя возникающие пики:

===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_algoritmy (Алгоритмы), #_vysokaja_proizvoditelnost (Высокая производительность), #_matematika (Математика), #_programmirovanie (Программирование), #_balansirovka_nagruzki (балансировка нагрузки), #_srednekvadratichnoe_otklonenie (среднеквадратичное отклонение), #_gradientnyj_spusk (градиентный спуск), #_javascript, #_blog_kompanii_tenzor (
Блог компании Тензор
)
, #_algoritmy (
Алгоритмы
)
, #_vysokaja_proizvoditelnost (
Высокая производительность
)
, #_matematika (
Математика
)
, #_programmirovanie (
Программирование
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 06-Июл 23:20
Часовой пояс: UTC + 5