[Python, PostgreSQL] Обрезаем большую таблицу PostgreSQL в production

Автор Сообщение
news_bot ®

Стаж: 6 лет 3 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
11-Мар-2021 13:31

Всем привет. Сегодня я хотел бы поделиться рецептом по обрезанию большой таблицы PostgreSQL в production. Пример: мы имеем в продовой БД достаточно большую таблицу с именем task (несколько сотен миллионов строк) с устаревшими данными, которые нам уже не нужны. Точнее, они мешают — БД долго дампится, а индексы становятся неэффективными. Мы хотим обрезать эту таблицу (удалить старые строки ранее определенной даты). Для простоты предположим, что в базе нет входящих foreign key на таблицу task (при их наличии решение задачи немного усложняется). Структура таблицы такая (упрощенный пример):
Решение в лоб (delete from task where id < 1234567) работает очень долго из-за большого количества индексов и ограничений в таблице и нас не устраивает.Более быстрый алгоритм:
  • Создаем новую таблицу task_new, в которую перенесем актуальные строки:
    CREATE TABLE task_new LIKE tasks;
    ALTER TABLE task_new ADD PRIMARY KEY (id);
  • Запускаем триггер для обновления новой таблицы.
  • Запускаем скрипт для переноса актуальных строк.
  • Добавляем индексы и ограничения для новой таблицы.
  • Меняем таблицы местами.
Шаг 1. Создание новой таблицыНовую таблицу создаем на основе текущей (вместе с типами данных, ограничениями на NULL и значениями по умолчанию). Индексы и ограничения навесим позднее, чтобы не замедлять копирование данных.
CREATE TABLE task_new (LIKE task INCLUDING DEFAULTS);
ALTER TABLE task_new ADD PRIMARY KEY (id);
Шаг 2. Запуск триггераДалее нам нужно задать триггер, который будет обновлять данные в соответствии с исходной таблицей (команды insert/update/delete).
create or replace function task_replication_trg_func()
returns trigger
AS $func$
begin
    if TG_OP = 'INSERT' then
        insert into task_new(
            id,
            created,
            updated,
            status,
            json_data,
            project_id,
            service_id,
            error
        )
        values (
            NEW.id,
            NEW.created,
            NEW.updated,
            NEW.status,
            NEW.json_data,
            NEW.project_id,
            NEW.service_id,
            NEW.error
        );
    elsif TG_OP = 'UPDATE' then
        update task_new set
            created = NEW.created,
            updated = NEW.updated,
            status = NEW.status,
            json_data = NEW.json_data,
            project_id = NEW.project_id,
            service_id = NEW.service_id,
            error = NEW.error
        where id = NEW.id;
    elsif TG_OP = 'DELETE' then
        delete from task_new
            where id = OLD.id;
    end if;
    return NULL;
end;
$func$ LANGUAGE plpgsql;
create trigger task_replication_trg
after insert or update or delete on task
for each row EXECUTE PROCEDURE task_replication_trg_func();
Шаг 3. Запуск скрипта для переноса строкПосле запуска триггера нам нужно запустить скрипт, который перенесет остальные актуальные данные из текущей таблицы в новую.Скрипт (написан на Python 3) доступен по ссылке - https://gist.github.com/olegborzov/1d056104875a5a8eb769c3c441ab6068 Входные параметры скрипта: 
  • conn_str — строка для подключения к БД;
  • first_row_id — начиная с этого идентификатора мы переносим строки в новую таблицу (можно получить запросом select id from task where created::date = '2020-01-01'::date - interval '1 day' limit 1;);
  • first_trigger_id — идентификатор первой строки, созданной в новой таблице триггером (можно получить запросом select id from task_new order by id limit 1);
  • sleep_ms — сколько миллисекунд должен спать скрипт между итерациями;
  • chunk_size — количество строк, переносимых за один раз.
Команда для запуска скрипта:
pip3 install psycopg2-binary~=2.8
python3 mastersber_transfer.py task --conn_str=postgresql://{username}:{password}@{host}:{port}/{db_name} --first_row_id={first_row_id} --first_trigger_id={first_trigger_id} 
Шаг 4. Добавляем индексы и ограниченияТеперь нам нужно навесить на новую таблицу индексы и ограничения в соответствии со старой.Получаем запросы для добавления индексов в текущую таблицу (кроме PK):
SELECT pg_get_indexdef(indexrelid) || ';' AS idx
FROM pg_index
WHERE indrelid = 'task'::regclass and not indisprimary;
Получили набор команд:
CREATE INDEX ix_task_status ON public.task USING btree (status);
CREATE INDEX task_project_id_index ON public.task USING btree (project_id);
Команды нужно немного преобразовать: добавить ключевое слово CONCURRENTLY, заменить название таблицы на новую и добавить префикс или суффикс “_new” к названиям индексов:
CREATE INDEX CONCURRENTLY new_ix_task_status ON public.task_new USING btree (status);
CREATE INDEX CONCURRENTLY new_task_project_id_index ON public.task_new USING btree (project_id);
Теперь получим команды для добавления ограничения для новой таблицы:
SELECT 'ALTER TABLE task_new ADD CONSTRAINT ' || conname || ' ' || pg_get_constraintdef(oid) || ';'
FROM pg_constraint
WHERE contype = 'f' AND conrelid::regclass::text = 'task'
ORDER BY conrelid::regclass::text, conname;
Получили набор команд (в них ничего менять не нужно):
ALTER TABLE task_new ADD CONSTRAINT task_project_id_fkey FOREIGN KEY (project_id) REFERENCES project(id);
ALTER TABLE task_new ADD CONSTRAINT task_service_id_fkey FOREIGN KEY (service_id) REFERENCES service(id);
Теперь нужно выполнить в транзакции эти команды:
BEGIN;
CREATE INDEX CONCURRENTLY new_ix_task_status ON public.task_new USING btree (status);
CREATE INDEX CONCURRENTLY new_task_project_id_index ON public.task_new USING btree (project_id);
ALTER TABLE task_new ADD CONSTRAINT task_project_id_fkey FOREIGN KEY (project_id) REFERENCES project(id);
ALTER TABLE task_new ADD CONSTRAINT task_service_id_fkey FOREIGN KEY (service_id) REFERENCES service(id);
COMMIT;
Шаг 5. Меняем таблицы местамиПоследний этап: нужно поменять таблицы местами, переключить sequence со старой таблицы на новую и удалить триггер.
BEGIN;
set statement_timeout = 3000;
set deadlock_timeout = '3s';
ALTER TABLE task_new ALTER COLUMN id SET DEFAULT nextval('task_id_seq');
ALTER SEQUENCE task_id_seq OWNED BY task_new.id;
ALTER TABLE task RENAME TO task_old;
ALTER TABLE task_new RENAME TO task;
DROP TRIGGER IF EXISTS task_replication_trg ON task_old CASCADE;
COMMIT;
-- Если не нужна старая таблица - удаляем ее:
DROP TABLE task CASCADE;
ЗаключениеТаким образом можно быстро очистить большую таблицу от устаревших данных, которые уже не нужны, но ухудшают производительность базы данных.Если на таблицу есть входящие foreign keys, перед чисткой основной таблицы аналогично чистим ссылающиеся на нее (по условию на колонку c foreign key на нужную).Если есть вопросы - пишите в комментариях.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_python, #_postgresql, #_python, #_postgresql, #_blog_kompanii_domklik (
Блог компании ДомКлик
)
, #_python, #_postgresql
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 14-Май 08:44
Часовой пояс: UTC + 5