[Разработка веб-сайтов, Python] Забываете передавать аргументы в функцию? Вам поможет contextvars
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Мы в Яндекс.Такси любим писать логи. Ещё больше мы любим, когда логи помогают нам расследовать проблемы в продакшене. При нагрузке в десятки тысяч RPS просто набора лог-записей мало. Хочется уметь фильтровать логи по пользователю, видеть последовательность вызовов клиентского API, а также углубляться в логи запроса.
Для реализации такого интерфейса каждая лог-запись в обработчике сопровождается метаинформацией: id заказа, пользователя, запроса. Однако иногда разработчики забывают добавить метаинформацию при логировании.
Из логов хорошо понятно, что именно пошло не так
Проблема
Представим себе такой обработчик:
@route('/feedback')
async def feedback_handler(request):
log_extra = {
'handler': 'some-handler',
'user_id': request.user_id,
}
...
order = await db.feedbacks.find_one(order_id=request.order_id)
if order is None:
logger.warning(f'user has no active orders in db', extra=log_extra)
return process_without_order(request)
# output:
# text="user has no active orders in db" handler=some-handler user_id=some-guid
Хендлер запроса POST /feedback?order_id=d34db33f пытается найти указанный в query-параметре заказ, если заказа в БД нет — рапортует об этом в логе. Вызов логера сопровождается явной передачей аргумента extra. В нём содержится метаинформация, которую нужно вывести вместе с текстом лога. У такого подхода есть проблемы:
- Аргумент extra необязателен для функций-логеров.
- Отсутствие extra никак не влияет на результат тестов (тесты на логи мы обычно не пишем).
- Проблемы с extra ловятся на ревью часто, но далеко не всегда.
- Однажды созданный extra нужно тащить сквозь все сигнатуры, хотя фактически он не влияет на логику функции:
# handler.py
async def handler(request):
log_extra = derive_extra(request)
# ...
await foo.some_logic1(arg, kwarg=kwarg, log_extra=log_extra)
# ...
# foo.py
async def some_logic1(..., log_extra=None):
# ...
await bar.some_logic2(..., log_extra=log_extra)
# ...
# bar.py
async def some_logic2(..., log_extra=None):
# ...
await qux.some_logic3(..., log_extra=log_extra)
# ...
# qux.py
async def some_logic3(..., log_extra=None):
# ...
if something:
logger.warning('Something has happened', extra=log_extra)
# ...
На практике разработчики периодически забывают добавлять метаинформацию к логированию, из-за чего поведение некоторых хендлеров тяжело объяснить с точки зрения логов.
Из логов не понять причину 404 кода
Решения
Пропатчить логер
Зададимся целью сделать параметр extra обязательным для логирующих методов. Стандартная библиотека предоставляет возможность задать кастомный класс логера для потребителей функции logging.getLogger(). Воспользуемся этим и реализуем класс, для логинг-методов которого параметр extra будет обязательным.
Например, так
SPL
import logging
class MyLogger(logging.getLoggerClass()):
sentinel = object()
def __init__(self, name):
logging.Logger.__init__(self, name=name)
def _log(self, *args, extra=sentinel, **kwargs):
if extra == self.sentinel:
raise ValueError('Extra missed in logger call')
super()._log(*args, **kwargs)
logging.setLoggerClass(MyLogger)
logger = logging.getLogger(__name__)
logger.error('With extra', extra={'meta': 'data'})
logger.error('Explicit no extra', extra=None)
logger.error('Error message without extra')
# output:
# With extra
# Explicit no extra
# Traceback (most recent call last):
# ...
# ValueError: Extra missed in logger call
При запуске тестов приложение будет падать на каждом вызове логера без extra.
Такая реализация выглядит опасно. Если мы не покроем тестами все строчки с логингом, мы рискуем получить исключение при логировании в продакшене. Да, неприятно попадать в непокрытую тестами строчку на бою, однако вряд ли бизнес согласен пятисотить при непереданном extra. Попробуем развить идею форсирования далее.
Статический анализ
Попробуем автоматически отслеживать наличие extra в вызовах логера до запуска приложения, основываясь на исходном коде. Кажется, что такое делается едва ли не грепом, но раз уж мы пишем на питоне, воспользуемся модулем ast. За несколько минут можно сделать наколеночный скрипт, и он в простейшем случае обнаружит все вызовы объекта logger, которым не был передан параметр extra.
Пример простейшей реализации
SPL
# linter.py
import ast, sys
code = open(sys.argv[1]).read()
for node in ast.walk(ast.parse(code)):
# remember: not production-ready, just proof of concept
if isinstance(node, ast.Call) and not isinstance(node.func, ast.Name):
is_logger = node.func.value.id == 'logger'
if is_logger and 'extra' not in [k.arg for k in node.keywords]:
print(f'Missing extra at line {node.lineno}')
# lint_example.py
def foobar():
log_extra = {'meta': 'data'}
bar()
logger.warning('Has no extra')
baz()
logger.warning('Has extra', extra=log_extra)
logger.warning('No extra again')
# $ python3.7 linter.py lint_example.py
# Missing extra at line 4
# Missing extra at line 7
Вложив в него ещё несколько часов разработки, можно получить приемлемое для внедрения в CI решение. Однако у такого подхода есть два недостатка. Во-первых, существуют лог-записи без метаинформации (например, логи фреймворка до начала обработки запроса). Для таких вызовов разработчику придётся либо явно передавать extra=None, либо добавлять ignore-строку. Во-вторых, разработчики всё ещё вынуждены тащить log_extra сквозь сигнатуры всех методов.
Итог: вариант с линтером приемлем, но не идеален.
Автоматическое добавление extra
Обсудив проблему, мы решили попробовать автоматически добавлять метаинформацию к логированию, сняв эту обязанность с разработчика. Посмотрим, какие у нас есть варианты в зависимости от подхода к написанию приложений. Для простоты заменим обращения к логеру вызовами функции log, обёртки над print.
Синхронное однопоточное приложение
Первое решение, которое приходит в голову, — глобальная переменная, мы сохраняем в неё контекст в начале обработки запроса и достаём его при необходимости что-то залогировать.
LOG_EXTRA = {}
def set_log_extra(extra):
global LOG_EXTRA
LOG_EXTRA = extra
def get_log_extra():
return LOG_EXTRA
def log(text):
extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
msg = f'text="{text}"\t{extra_tskv}'
print(msg)
@app.route('/feedback')
def feedback_handler_sync_singlethreaded(request):
set_log_extra({ ... })
order = db.feedbacks.find_one(order_id=request.order_id)
if order is None:
log('user has no active orders in db')
return process_without_order(request)
# output:
# text="user has no active orders in db" handler=some-handler user_id=some-guid
Такой подход будет действовать, если каждый воркер веб-сервера работает в отдельном процессе. Однако многие синхронные веб-серверы поддерживают работу в многопоточном режиме, а для них решение с глобальной переменной не подходит из-за гонок.
Представим следующую ситуацию
SPL
- Начинаем обрабатывать запрос № 1, с помощью set_log_extra сохраняем в глобальную переменную контекст запроса.
- Не находим заказ в базе — наступает время логировать ошибку.
- В этот момент соседний поток начал выполнять запрос № 2 и вызвал set_log_extra со своим контекстом.
- Поскольку память у потоков общая, set_log_extra обновит значение глобальной переменной для всех тредов.
- В результате обработчик запроса № 1 залогирует ошибку с контекстом из запроса № 2.
Получается, что «всего лишь» глобальной переменной недостаточно для многопоточных серверов.
Многопоточное приложение
Проблема с гонками имеет тривиальное решение: будем хранить глобальную переменную не в общей для потоков памяти, а в локальной памяти каждого треда. В UNIX для этого есть готовый механизм thread-local storage, а Python предоставляет аналогичную по смыслу функциональность через threading.local:
import threading
LOG_EXTRA = threading.local()
LOG_EXTRA.data = {}
def set_log_extra(extra):
global LOG_EXTRA
LOG_EXTRA.data = extra
def get_log_extra():
return LOG_EXTRA.data
# ...
В таком случае каждый поток будет работать со своим собственным значением поля LOG_EXTRA.data.
Однако в последнее время большинство новых бэкендов пишутся на асинхронных фреймворках вроде aiohttp. В мире event-loop’ов все запросы обрабатываются в разных корутинах, но в рамках одного потока. Это значит, что для асинхронных приложений threading.local не решит проблему: с точки зрения корутин мы продолжаем использовать «единую» глобальную переменную.
Асинхронное приложение
А если бы у нас был асинхронный аналог threading.local, то есть нечто, что может хранить состояние «выполняемой в данном контексте» задачи. В случае с asyncio некоторым аналогом тредов являются таски — инстансы asyncio.Task, исполняющие корутины поверх event-loop’а.
В используемом нами фреймворке aiohttp при выполнении конкретного запроса запускается много корутин, но по умолчанию все они будут выполняться в рамках одной asyncio.Task (если мы, конечно, не породим новые таски самостоятельно).
Кажется, что этим фактом можно воспользоваться: asyncio предоставляет интерфейс для получения текущей таски через asyncio.current_task, и мы можем сохранить контекст выполнения нашего запроса в ней через определение собственного атрибута:
import asyncio
_log_extra_key = '_log_extra'
def set_log_extra(log_extra, task=None):
if task is None:
task = asyncio.current_task()
setattr(task, _log_extra_key, log_extra)
def get_log_extra(task=None):
if task is None:
task = asyncio.current_task()
return getattr(task, _log_extra_key, None)
def log(text):
extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
msg = f'text=""{text}""\t{extra_tskv}'
print(msg)
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
log(f'no explicit extra')
asyncio.run(feedback_handler_async())
# output:
# text="no explicit extra" handler=some-handler user_id=some-guid
Однако такой подход не будет работать, если при выполнении запроса мы запустим ещё одну таску. В качестве примера попробуем параллельно сделать запрос в два микросервиса:
# ...
async def call_foo():
# ...
log(f'service_foo')
async def call_bar():
# ...
log(f'service_bar')
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.gather(
call_foo(),
call_bar(),
)
log(f'no explicit extra')
asyncio.run(feedback_handler_example())
# output:
# text="service_foo"
# text="service_bar"
# text="no explicit extra" handler=some-handler user_id=some-guid
Дело в том, что вызов asyncio.gather порождает новую таску, для которой никто не вызывал set_log_extra, и, как следствие, у неё нет атрибута, хранящего extra-информацию. К счастью, asyncio позволяет кастомизировать механизм создания тасок для event-loop’а при помощи loop.set_task_factory. Если при создании новой таски мы будем присваивать ей атрибут от родительской задачи, мы решим проблему «теряющегося» extra:
# ...
def log_extra_factory():
default_task_factory = asyncio.Task
@functools.wraps(default_task_factory)
def custom_task_factory(loop, coro):
child_task = default_task_factory(coro, loop=loop)
if not loop.is_running():
return child_task
current_extra = get_log_extra()
set_log_extra(current_extra, task=child_task)
return child_task
return custom_task_factory
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.gather(
call_foo(),
call_bar(),
)
log(f'no explicit extra')
async def main():
asyncio.get_event_loop().set_task_factory(log_extra_factory())
await feedback_handler_example()
asyncio.run(main())
# output:
# text="fetch_data_from_db" handler=some-handler user_id=some-guid
# text="fetch_data_from_service" handler=some-handler user_id=some-guid
# text="no explicit extra" handler=some-handler user_id=some-guid
В целом мы уже получили достаточно интересное решение. Однако благодаря модулю contextvars, который появился в стандартной библиотеке Python 3.7, больше не нужно вручную задавать атрибуты тасок: желаемого результата можно добиться и без потенциально опасной работы с атрибутами.
Contextvars
В каком-то смысле контекстные переменные похожи на threadlocal-переменные: значение threading.local-переменной зависит от потока, а значение contextvars.ContextVar — от активного сейчас контекста (инстанса contextvars.Context), коих может быть несколько в рамках одного потока:
from contextvars import ContextVar, Context
ctx_var = ContextVar('ctx_var')
def get_name():
# значение ctx_var будет зависеть от контекста
return ctx_var.get()
for name in ['Alice', 'Bob']:
ctx = Context()
# ctx.run() запускает некоторый callable внутри контекста ctx
# устанавливаем значение ctx_var внутри контекста ctx
ctx.run(ctx_var.set, name)
# запускаем get_name внутри ctx
print(ctx.run(get_name))
# output:
# Alice
# Bob
У contextvars есть два важных свойства, которые помогут нам решить задачу с log_extra:
Встроенная интеграция с asyncio
Начиная с Python 3.7 методы Loop.call_{at,later,soon} (а также Future.add_done_callback), ответственные за выполнение корутин в рамках event-loop’а, принимают опциональный параметр context, в рамках которого потом будет выполняться корутина:
# asyncio pseudocode
def call_soon(self, callback, *args, context=None):
if context is None:
# use copy of current context by default
context = contextvars.copy_context()
# ... later
context.run(callback, *args)
Кроме того, теперь каждая таска по умолчанию выполняет все корутины в «своём» контексте, который по умолчанию равен копии текущего контекста:
# asyncio pseudocode
class Task:
def __init__(self, coro):
...
# Get the current context snapshot.
self._context = contextvars.copy_context()
self._loop.call_soon(self._step, context=self._context)
def _step(self, exc=None):
...
# Every advance of the wrapped coroutine is done in
# the task's context.
self._loop.call_soon(self._step, context=self._context)
...
Другими словами:
- Все корутины в рамках таски выполняются в едином контексте.
- Дочерние таски выполняются в контексте-копии текущей таски. Дети имеют доступ ко всем контекстным переменным родителя, однако модификация происходит исключительно в рамках локального контекста, без влияния на соседей.
Вот как это выглядит на практике:
import asyncio, contextvars
ctx_var = contextvars.ContextVar('ctx_var')
async def main():
ctx_var.set('aaa')
print(f'ctx_var in main: {ctx_var.get()}')
await same_task()
print(f'ctx_var in main after same_task call: {ctx_var.get()}')
await asyncio.Task(other_task())
print(f'ctx_var in main after asyncio.Task(other_task()) call: {ctx_var.get()}')
async def same_task():
print(f'ctx_var in same_task: {ctx_var.get()}')
ctx_var.set('bbb')
print(f'ctx_var in same_task after set: {ctx_var.get()}')
async def other_task():
print(f'ctx_var in other_task before set: {ctx_var.get()}')
ctx_var.set('ccc')
print(f'ctx_var in other_task after set: {ctx_var.get()}')
asyncio.run(main())
# output:
# ctx_var in main: aaa
# ctx_var in same_task: aaa
# ctx_var in same_task after set: bbb
# ctx_var in main after same_task call: bbb # (1)
# ctx_var in other_task before set: bbb # (2)
# ctx_var in other_task after set: ccc
# ctx_var in main after asyncio.Task(other_task()) call: bbb # (3)
Обратите внимание на три важных аспекта:
- Корутина same_task, выполняющаяся в одной таске с main, повлияла на значение ctx_var для них обеих, так как они выполняются в одном контексте.
- Корутина other_task, выполняющаяся в дочерней таске от main, унаследовала значение ctx_var, установленное в same_task.
- Однако модификация ctx_var внутри other_task не повлияла на значение внутри main, так как дочерняя таска копирует родительский контекст, что делает их независимыми.
Копирование контекста за O(1)
Поскольку каждое создание таски теперь приводит к копированию контекста, важно сделать копирование легковесным. Для этого модуль contextvars хранит элементы с помощью immutable-структуры данных HAMT (hash array mapped trie), для которой асимптотическая сложность копирования не зависит от размера контекста:
import contextvars, time
class Timer:
def __enter__(self):
self.start = time.perf_counter()
def __exit__(self, *args):
print(time.perf_counter() - self.start)
LOOP_COUNT = 1000
CTXVAR_COUNT = 42000
print(f'Context with {len(contextvars.copy_context())} variables')
with Timer():
for _ in range(LOOP_COUNT):
contextvars.copy_context()
cvars = [contextvars.ContextVar(f'cvar_{i}', default=i) for i in range(CTXVAR_COUNT)]
for v in cvars:
v.set('somevalue')
print(f'Context with {len(contextvars.copy_context())} variables')
with Timer():
for _ in range(LOOP_COUNT):
contextvars.copy_context()
# output
# Context with 0 variables
# 0.00015302500000000108
# Context with 42000 variables
# 0.00015314099999999835
Применяем contextvars к extra
Поскольку нам больше не нужно вручную задавать атрибуты для тасок, мы можем избавиться от кастомной фактори — функции get_log_extra и set_log_extra становятся обёртками над методами контекстной переменной:
import asyncio
import functools
import contextvars
_log_extra_data = contextvars.ContextVar('_log_extra_data')
def set_log_extra(log_extra):
_log_extra_data.set(log_extra)
def get_log_extra():
return _log_extra_data.get()
def log(text):
extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
msg = f'text="{text}"\t{extra_tskv}'
print(msg)
async def call_foo():
log(f'service_foo')
async def call_bar():
log(f'service_bar')
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.gather(
call_foo(),
call_bar(),
)
log(f'no explicit extra')
async def main():
await feedback_handler_example()
asyncio.run(main())
# output
# text="service_foo" handler=some-handler user_id=some-guid
# text="service_bar" handler=some-handler user_id=some-guid
# text="no explicit extra" handler=some-handler user_id=some-guid
Выглядит сильно лучше. Однако перед внедрением в продакшен необходима пара финальных штрихов.
Изменить extra для конкретной таски
Важно помнить, что «копирование» контекста не влечёт за собой глубокого копирования объектов. Это значит, что изменение контекстных переменных мутабельных типов будет влиять на родительский контекст. Представим, что после обращения к стороннему сервису мы хотим дополнительно логировать response_id в качестве extra-информации:
async def external_call():
# ...
get_log_extra()['response_id'] = 'some_external_response_id'
log('log from external call')
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.Task(external_call())
log('log from parent task ')
async def main():
await feedback_handler_example()
asyncio.run(main())
# output
# text="log from external call" handler=some-handler user_id=some-guid response_id=some_external_response_id
# text="log from parent task" handler=some-handler user_id=some-guid response_id=some_external_response_id
Видно, что дочерняя таска повлияла на родителя. Зачастую такое поведение нежелательно, лучше модифицировать экстру через глубокое копирование:
import copy
# ...
def get_log_extra(should_copy=False):
extra = _log_extra_data.get()
if should_copy:
return copy.deepcopy(extra)
return extra
def update_log_extra(update):
current_extra = get_log_extra(should_copy=True)
set_log_extra({**current_extra, **update})
async def external_call():
# ...
update_log_extra({'response_id': 'some_external_response_id'})
log('log from external call')
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.Task(external_call())
log(' log from parent task)
async def main():
await feedback_handler_example()
asyncio.run(main())
# output
# text="log from external call" handler=some-handler user_id=some-guid response_id=some_external_response_id
# text="log from parent task" handler=some-handler user_id=some-guid
Контекстные переменные и тредпул
Периодически при написании асинхронных приложений требуется отдать какую-то функцию на выполнение в тредпул (например, из-за отсутствия асинхронного аналога у библиотеки). Здесь нас ждёт неприятный сюрприз: потоки из пула по умолчанию работают в «собственном» контексте, в котором может и не быть созданных в «основном» потоке переменных:
import asyncio
import functools
import contextvars
_log_extra_data = contextvars.ContextVar('_log_extra_data')
# ...
def log(text):
extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items())
msg = f'text="{text}"\t{extra_tskv}'
print(msg)
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.get_event_loop().run_in_executor(None, log, 'some-info')
log(f'no explicit extra')
async def main():
await feedback_handler_example()
asyncio.run(main())
# output:
# Traceback (most recent call last):
# ...
# File "contextvars_threadpool.py", line 11, in get_log_extra
# return _log_extra_data.get()
# LookupError: <ContextVar name='_log_extra_data' at 0x1101a63b0>
В потоке из пула переменной _log_extra_data ни разу не присваивалось значение, из-за чего метод get() поднимает исключение LookupError. Для нашей задачи такое поведение кажется сомнительным: по умолчанию хотелось бы, чтобы потоки в пуле вели себя так же, как и дочерние таски, т. е. наследовали контекст родителя. Определим свой собственный пул, выполняющий target-функцию в копии текущего контекста, и зададим его пулом по умолчанию для event-loop’а:
import asyncio
import concurrent.futures
import contextvars
import functools
import typing
class CtxCopyThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
def submit(
self, fn: typing.Callable, *args, **kwargs,
) -> concurrent.futures.Future:
ctx = contextvars.copy_context()
return super().submit(
ctx.run,
functools.partial(fn, *args, **kwargs),
)
# ...
async def feedback_handler_example():
set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'})
await asyncio.get_event_loop().run_in_executor(None, log, 'some-info')
log(f'no explicit extra')
async def main():
asyncio.get_event_loop().set_default_executor(CtxCopyThreadPoolExecutor())
await feedback_handler_example()
asyncio.run(main())
# text="from threadpool" handler=some-handler user_id=some-guid
# text="no explicit extra" handler=some-handler user_id=some-guid
Итоги
На практике внедрение «автоматического» extra привело к следующему: мы больше не теряем логи при расследовании инцидентов в продакшене, не таскаем ненужный аргумент сквозь сигнатуры методов и не тратим силы на отлов его отсутствия на ревью.
Некоторые сомневаются насчёт контекстных переменных: фактически они представляют собой глобальные переменные на стероидах, ещё и с магией в виде неявного копирования контекста между корутинами.
Мы считаем, что практическая польза этого инструмента перевешивает идеологические недостатки, однако будем рады услышать ваши рассуждения на эту тему в комментариях.
===========
Источник:
habr.com
===========
Похожие новости:
- [Сетевые технологии, Беспроводные технологии, Разработка систем связи, Научно-популярное, IT-компании] Всё о проекте «Спутниковый интернет Starlink». Часть 4. Абонентский терминал
- [Python, Java] Удав укрощает Graal VM
- [Разработка веб-сайтов] Теперь я не могу сделать даже маленький сайт
- [Информационная безопасность, Системное администрирование, Сетевые технологии, Сетевое оборудование, Интервью] Консилиум с D-Link: базовая настройка управляемого сетевого оборудования
- [Разработка веб-сайтов, Программирование, IT-инфраструктура, Управление проектами, DevOps] Веб-разработка с нуля: руководство для молодых команд по созданию инфраструктуры CI/CD и процесса разработки
- [Разработка веб-сайтов, JavaScript, Программирование] Заметка о перебираемых объектах
- [Научно-популярное, Биотехнологии, Мозг] Как воспоминания переживают ампутацию, метаморфозы, и передаются через инъекции (перевод)
- [Сетевые технологии, Беспроводные технологии, Разработка систем связи, Научно-популярное, IT-компании] Всё о проекте «Спутниковый интернет Starlink». Часть 3. Наземный комплекс
- [Семантика, Программирование, Prolog, Бизнес-модели] Проектируем мульти-парадигменный язык программирования. Часть 3 — Обзор языков представления знаний
- [Развитие стартапа, Киберпанк, Биотехнологии, Нанотехнологии, Будущее здесь] 10 высокотехнологичных продуктов, которые мы будем есть в будущем (перевод)
Теги для поиска: #_razrabotka_vebsajtov (Разработка веб-сайтов), #_python, #_python, #_logi (логи), #_jandeks (яндекс), #_komanda_jandeks.taksi (команда яндекс.такси), #_blog_kompanii_jandeks (
Блог компании Яндекс
), #_razrabotka_vebsajtov (
Разработка веб-сайтов
), #_python
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:22
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Мы в Яндекс.Такси любим писать логи. Ещё больше мы любим, когда логи помогают нам расследовать проблемы в продакшене. При нагрузке в десятки тысяч RPS просто набора лог-записей мало. Хочется уметь фильтровать логи по пользователю, видеть последовательность вызовов клиентского API, а также углубляться в логи запроса. Для реализации такого интерфейса каждая лог-запись в обработчике сопровождается метаинформацией: id заказа, пользователя, запроса. Однако иногда разработчики забывают добавить метаинформацию при логировании. Из логов хорошо понятно, что именно пошло не так Проблема Представим себе такой обработчик: @route('/feedback')
async def feedback_handler(request): log_extra = { 'handler': 'some-handler', 'user_id': request.user_id, } ... order = await db.feedbacks.find_one(order_id=request.order_id) if order is None: logger.warning(f'user has no active orders in db', extra=log_extra) return process_without_order(request) # output: # text="user has no active orders in db" handler=some-handler user_id=some-guid Хендлер запроса POST /feedback?order_id=d34db33f пытается найти указанный в query-параметре заказ, если заказа в БД нет — рапортует об этом в логе. Вызов логера сопровождается явной передачей аргумента extra. В нём содержится метаинформация, которую нужно вывести вместе с текстом лога. У такого подхода есть проблемы:
# handler.py
async def handler(request): log_extra = derive_extra(request) # ... await foo.some_logic1(arg, kwarg=kwarg, log_extra=log_extra) # ... # foo.py async def some_logic1(..., log_extra=None): # ... await bar.some_logic2(..., log_extra=log_extra) # ... # bar.py async def some_logic2(..., log_extra=None): # ... await qux.some_logic3(..., log_extra=log_extra) # ... # qux.py async def some_logic3(..., log_extra=None): # ... if something: logger.warning('Something has happened', extra=log_extra) # ... На практике разработчики периодически забывают добавлять метаинформацию к логированию, из-за чего поведение некоторых хендлеров тяжело объяснить с точки зрения логов. Из логов не понять причину 404 кода Решения Пропатчить логер Зададимся целью сделать параметр extra обязательным для логирующих методов. Стандартная библиотека предоставляет возможность задать кастомный класс логера для потребителей функции logging.getLogger(). Воспользуемся этим и реализуем класс, для логинг-методов которого параметр extra будет обязательным. Например, такSPLimport logging
class MyLogger(logging.getLoggerClass()): sentinel = object() def __init__(self, name): logging.Logger.__init__(self, name=name) def _log(self, *args, extra=sentinel, **kwargs): if extra == self.sentinel: raise ValueError('Extra missed in logger call') super()._log(*args, **kwargs) logging.setLoggerClass(MyLogger) logger = logging.getLogger(__name__) logger.error('With extra', extra={'meta': 'data'}) logger.error('Explicit no extra', extra=None) logger.error('Error message without extra') # output: # With extra # Explicit no extra # Traceback (most recent call last): # ... # ValueError: Extra missed in logger call При запуске тестов приложение будет падать на каждом вызове логера без extra. Такая реализация выглядит опасно. Если мы не покроем тестами все строчки с логингом, мы рискуем получить исключение при логировании в продакшене. Да, неприятно попадать в непокрытую тестами строчку на бою, однако вряд ли бизнес согласен пятисотить при непереданном extra. Попробуем развить идею форсирования далее. Статический анализ Попробуем автоматически отслеживать наличие extra в вызовах логера до запуска приложения, основываясь на исходном коде. Кажется, что такое делается едва ли не грепом, но раз уж мы пишем на питоне, воспользуемся модулем ast. За несколько минут можно сделать наколеночный скрипт, и он в простейшем случае обнаружит все вызовы объекта logger, которым не был передан параметр extra. Пример простейшей реализацииSPL# linter.py
import ast, sys code = open(sys.argv[1]).read() for node in ast.walk(ast.parse(code)): # remember: not production-ready, just proof of concept if isinstance(node, ast.Call) and not isinstance(node.func, ast.Name): is_logger = node.func.value.id == 'logger' if is_logger and 'extra' not in [k.arg for k in node.keywords]: print(f'Missing extra at line {node.lineno}') # lint_example.py def foobar(): log_extra = {'meta': 'data'} bar() logger.warning('Has no extra') baz() logger.warning('Has extra', extra=log_extra) logger.warning('No extra again') # $ python3.7 linter.py lint_example.py # Missing extra at line 4 # Missing extra at line 7 Вложив в него ещё несколько часов разработки, можно получить приемлемое для внедрения в CI решение. Однако у такого подхода есть два недостатка. Во-первых, существуют лог-записи без метаинформации (например, логи фреймворка до начала обработки запроса). Для таких вызовов разработчику придётся либо явно передавать extra=None, либо добавлять ignore-строку. Во-вторых, разработчики всё ещё вынуждены тащить log_extra сквозь сигнатуры всех методов. Итог: вариант с линтером приемлем, но не идеален. Автоматическое добавление extra Обсудив проблему, мы решили попробовать автоматически добавлять метаинформацию к логированию, сняв эту обязанность с разработчика. Посмотрим, какие у нас есть варианты в зависимости от подхода к написанию приложений. Для простоты заменим обращения к логеру вызовами функции log, обёртки над print. Синхронное однопоточное приложение Первое решение, которое приходит в голову, — глобальная переменная, мы сохраняем в неё контекст в начале обработки запроса и достаём его при необходимости что-то залогировать. LOG_EXTRA = {}
def set_log_extra(extra): global LOG_EXTRA LOG_EXTRA = extra def get_log_extra(): return LOG_EXTRA def log(text): extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items()) msg = f'text="{text}"\t{extra_tskv}' print(msg) @app.route('/feedback') def feedback_handler_sync_singlethreaded(request): set_log_extra({ ... }) order = db.feedbacks.find_one(order_id=request.order_id) if order is None: log('user has no active orders in db') return process_without_order(request) # output: # text="user has no active orders in db" handler=some-handler user_id=some-guid Такой подход будет действовать, если каждый воркер веб-сервера работает в отдельном процессе. Однако многие синхронные веб-серверы поддерживают работу в многопоточном режиме, а для них решение с глобальной переменной не подходит из-за гонок. Представим следующую ситуациюSPL
Получается, что «всего лишь» глобальной переменной недостаточно для многопоточных серверов. Многопоточное приложение Проблема с гонками имеет тривиальное решение: будем хранить глобальную переменную не в общей для потоков памяти, а в локальной памяти каждого треда. В UNIX для этого есть готовый механизм thread-local storage, а Python предоставляет аналогичную по смыслу функциональность через threading.local: import threading
LOG_EXTRA = threading.local() LOG_EXTRA.data = {} def set_log_extra(extra): global LOG_EXTRA LOG_EXTRA.data = extra def get_log_extra(): return LOG_EXTRA.data # ... В таком случае каждый поток будет работать со своим собственным значением поля LOG_EXTRA.data. Однако в последнее время большинство новых бэкендов пишутся на асинхронных фреймворках вроде aiohttp. В мире event-loop’ов все запросы обрабатываются в разных корутинах, но в рамках одного потока. Это значит, что для асинхронных приложений threading.local не решит проблему: с точки зрения корутин мы продолжаем использовать «единую» глобальную переменную. Асинхронное приложение А если бы у нас был асинхронный аналог threading.local, то есть нечто, что может хранить состояние «выполняемой в данном контексте» задачи. В случае с asyncio некоторым аналогом тредов являются таски — инстансы asyncio.Task, исполняющие корутины поверх event-loop’а. В используемом нами фреймворке aiohttp при выполнении конкретного запроса запускается много корутин, но по умолчанию все они будут выполняться в рамках одной asyncio.Task (если мы, конечно, не породим новые таски самостоятельно). Кажется, что этим фактом можно воспользоваться: asyncio предоставляет интерфейс для получения текущей таски через asyncio.current_task, и мы можем сохранить контекст выполнения нашего запроса в ней через определение собственного атрибута: import asyncio
_log_extra_key = '_log_extra' def set_log_extra(log_extra, task=None): if task is None: task = asyncio.current_task() setattr(task, _log_extra_key, log_extra) def get_log_extra(task=None): if task is None: task = asyncio.current_task() return getattr(task, _log_extra_key, None) def log(text): extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items()) msg = f'text=""{text}""\t{extra_tskv}' print(msg) async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) log(f'no explicit extra') asyncio.run(feedback_handler_async()) # output: # text="no explicit extra" handler=some-handler user_id=some-guid Однако такой подход не будет работать, если при выполнении запроса мы запустим ещё одну таску. В качестве примера попробуем параллельно сделать запрос в два микросервиса: # ...
async def call_foo(): # ... log(f'service_foo') async def call_bar(): # ... log(f'service_bar') async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.gather( call_foo(), call_bar(), ) log(f'no explicit extra') asyncio.run(feedback_handler_example()) # output: # text="service_foo" # text="service_bar" # text="no explicit extra" handler=some-handler user_id=some-guid Дело в том, что вызов asyncio.gather порождает новую таску, для которой никто не вызывал set_log_extra, и, как следствие, у неё нет атрибута, хранящего extra-информацию. К счастью, asyncio позволяет кастомизировать механизм создания тасок для event-loop’а при помощи loop.set_task_factory. Если при создании новой таски мы будем присваивать ей атрибут от родительской задачи, мы решим проблему «теряющегося» extra: # ...
def log_extra_factory(): default_task_factory = asyncio.Task @functools.wraps(default_task_factory) def custom_task_factory(loop, coro): child_task = default_task_factory(coro, loop=loop) if not loop.is_running(): return child_task current_extra = get_log_extra() set_log_extra(current_extra, task=child_task) return child_task return custom_task_factory async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.gather( call_foo(), call_bar(), ) log(f'no explicit extra') async def main(): asyncio.get_event_loop().set_task_factory(log_extra_factory()) await feedback_handler_example() asyncio.run(main()) # output: # text="fetch_data_from_db" handler=some-handler user_id=some-guid # text="fetch_data_from_service" handler=some-handler user_id=some-guid # text="no explicit extra" handler=some-handler user_id=some-guid В целом мы уже получили достаточно интересное решение. Однако благодаря модулю contextvars, который появился в стандартной библиотеке Python 3.7, больше не нужно вручную задавать атрибуты тасок: желаемого результата можно добиться и без потенциально опасной работы с атрибутами. Contextvars В каком-то смысле контекстные переменные похожи на threadlocal-переменные: значение threading.local-переменной зависит от потока, а значение contextvars.ContextVar — от активного сейчас контекста (инстанса contextvars.Context), коих может быть несколько в рамках одного потока: from contextvars import ContextVar, Context
ctx_var = ContextVar('ctx_var') def get_name(): # значение ctx_var будет зависеть от контекста return ctx_var.get() for name in ['Alice', 'Bob']: ctx = Context() # ctx.run() запускает некоторый callable внутри контекста ctx # устанавливаем значение ctx_var внутри контекста ctx ctx.run(ctx_var.set, name) # запускаем get_name внутри ctx print(ctx.run(get_name)) # output: # Alice # Bob У contextvars есть два важных свойства, которые помогут нам решить задачу с log_extra: Встроенная интеграция с asyncio Начиная с Python 3.7 методы Loop.call_{at,later,soon} (а также Future.add_done_callback), ответственные за выполнение корутин в рамках event-loop’а, принимают опциональный параметр context, в рамках которого потом будет выполняться корутина: # asyncio pseudocode
def call_soon(self, callback, *args, context=None): if context is None: # use copy of current context by default context = contextvars.copy_context() # ... later context.run(callback, *args) Кроме того, теперь каждая таска по умолчанию выполняет все корутины в «своём» контексте, который по умолчанию равен копии текущего контекста: # asyncio pseudocode
class Task: def __init__(self, coro): ... # Get the current context snapshot. self._context = contextvars.copy_context() self._loop.call_soon(self._step, context=self._context) def _step(self, exc=None): ... # Every advance of the wrapped coroutine is done in # the task's context. self._loop.call_soon(self._step, context=self._context) ... Другими словами:
Вот как это выглядит на практике: import asyncio, contextvars
ctx_var = contextvars.ContextVar('ctx_var') async def main(): ctx_var.set('aaa') print(f'ctx_var in main: {ctx_var.get()}') await same_task() print(f'ctx_var in main after same_task call: {ctx_var.get()}') await asyncio.Task(other_task()) print(f'ctx_var in main after asyncio.Task(other_task()) call: {ctx_var.get()}') async def same_task(): print(f'ctx_var in same_task: {ctx_var.get()}') ctx_var.set('bbb') print(f'ctx_var in same_task after set: {ctx_var.get()}') async def other_task(): print(f'ctx_var in other_task before set: {ctx_var.get()}') ctx_var.set('ccc') print(f'ctx_var in other_task after set: {ctx_var.get()}') asyncio.run(main()) # output: # ctx_var in main: aaa # ctx_var in same_task: aaa # ctx_var in same_task after set: bbb # ctx_var in main after same_task call: bbb # (1) # ctx_var in other_task before set: bbb # (2) # ctx_var in other_task after set: ccc # ctx_var in main after asyncio.Task(other_task()) call: bbb # (3) Обратите внимание на три важных аспекта:
Копирование контекста за O(1) Поскольку каждое создание таски теперь приводит к копированию контекста, важно сделать копирование легковесным. Для этого модуль contextvars хранит элементы с помощью immutable-структуры данных HAMT (hash array mapped trie), для которой асимптотическая сложность копирования не зависит от размера контекста: import contextvars, time
class Timer: def __enter__(self): self.start = time.perf_counter() def __exit__(self, *args): print(time.perf_counter() - self.start) LOOP_COUNT = 1000 CTXVAR_COUNT = 42000 print(f'Context with {len(contextvars.copy_context())} variables') with Timer(): for _ in range(LOOP_COUNT): contextvars.copy_context() cvars = [contextvars.ContextVar(f'cvar_{i}', default=i) for i in range(CTXVAR_COUNT)] for v in cvars: v.set('somevalue') print(f'Context with {len(contextvars.copy_context())} variables') with Timer(): for _ in range(LOOP_COUNT): contextvars.copy_context() # output # Context with 0 variables # 0.00015302500000000108 # Context with 42000 variables # 0.00015314099999999835 Применяем contextvars к extra Поскольку нам больше не нужно вручную задавать атрибуты для тасок, мы можем избавиться от кастомной фактори — функции get_log_extra и set_log_extra становятся обёртками над методами контекстной переменной: import asyncio
import functools import contextvars _log_extra_data = contextvars.ContextVar('_log_extra_data') def set_log_extra(log_extra): _log_extra_data.set(log_extra) def get_log_extra(): return _log_extra_data.get() def log(text): extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items()) msg = f'text="{text}"\t{extra_tskv}' print(msg) async def call_foo(): log(f'service_foo') async def call_bar(): log(f'service_bar') async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.gather( call_foo(), call_bar(), ) log(f'no explicit extra') async def main(): await feedback_handler_example() asyncio.run(main()) # output # text="service_foo" handler=some-handler user_id=some-guid # text="service_bar" handler=some-handler user_id=some-guid # text="no explicit extra" handler=some-handler user_id=some-guid Выглядит сильно лучше. Однако перед внедрением в продакшен необходима пара финальных штрихов. Изменить extra для конкретной таски Важно помнить, что «копирование» контекста не влечёт за собой глубокого копирования объектов. Это значит, что изменение контекстных переменных мутабельных типов будет влиять на родительский контекст. Представим, что после обращения к стороннему сервису мы хотим дополнительно логировать response_id в качестве extra-информации: async def external_call():
# ... get_log_extra()['response_id'] = 'some_external_response_id' log('log from external call') async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.Task(external_call()) log('log from parent task ') async def main(): await feedback_handler_example() asyncio.run(main()) # output # text="log from external call" handler=some-handler user_id=some-guid response_id=some_external_response_id # text="log from parent task" handler=some-handler user_id=some-guid response_id=some_external_response_id Видно, что дочерняя таска повлияла на родителя. Зачастую такое поведение нежелательно, лучше модифицировать экстру через глубокое копирование: import copy
# ... def get_log_extra(should_copy=False): extra = _log_extra_data.get() if should_copy: return copy.deepcopy(extra) return extra def update_log_extra(update): current_extra = get_log_extra(should_copy=True) set_log_extra({**current_extra, **update}) async def external_call(): # ... update_log_extra({'response_id': 'some_external_response_id'}) log('log from external call') async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.Task(external_call()) log(' log from parent task) async def main(): await feedback_handler_example() asyncio.run(main()) # output # text="log from external call" handler=some-handler user_id=some-guid response_id=some_external_response_id # text="log from parent task" handler=some-handler user_id=some-guid Контекстные переменные и тредпул Периодически при написании асинхронных приложений требуется отдать какую-то функцию на выполнение в тредпул (например, из-за отсутствия асинхронного аналога у библиотеки). Здесь нас ждёт неприятный сюрприз: потоки из пула по умолчанию работают в «собственном» контексте, в котором может и не быть созданных в «основном» потоке переменных: import asyncio
import functools import contextvars _log_extra_data = contextvars.ContextVar('_log_extra_data') # ... def log(text): extra_tskv = '\t'.join(f'{k}={v}' for k, v in get_log_extra().items()) msg = f'text="{text}"\t{extra_tskv}' print(msg) async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.get_event_loop().run_in_executor(None, log, 'some-info') log(f'no explicit extra') async def main(): await feedback_handler_example() asyncio.run(main()) # output: # Traceback (most recent call last): # ... # File "contextvars_threadpool.py", line 11, in get_log_extra # return _log_extra_data.get() # LookupError: <ContextVar name='_log_extra_data' at 0x1101a63b0> В потоке из пула переменной _log_extra_data ни разу не присваивалось значение, из-за чего метод get() поднимает исключение LookupError. Для нашей задачи такое поведение кажется сомнительным: по умолчанию хотелось бы, чтобы потоки в пуле вели себя так же, как и дочерние таски, т. е. наследовали контекст родителя. Определим свой собственный пул, выполняющий target-функцию в копии текущего контекста, и зададим его пулом по умолчанию для event-loop’а: import asyncio
import concurrent.futures import contextvars import functools import typing class CtxCopyThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): def submit( self, fn: typing.Callable, *args, **kwargs, ) -> concurrent.futures.Future: ctx = contextvars.copy_context() return super().submit( ctx.run, functools.partial(fn, *args, **kwargs), ) # ... async def feedback_handler_example(): set_log_extra({'handler': 'some-handler', 'user_id': 'some-guid'}) await asyncio.get_event_loop().run_in_executor(None, log, 'some-info') log(f'no explicit extra') async def main(): asyncio.get_event_loop().set_default_executor(CtxCopyThreadPoolExecutor()) await feedback_handler_example() asyncio.run(main()) # text="from threadpool" handler=some-handler user_id=some-guid # text="no explicit extra" handler=some-handler user_id=some-guid Итоги На практике внедрение «автоматического» extra привело к следующему: мы больше не теряем логи при расследовании инцидентов в продакшене, не таскаем ненужный аргумент сквозь сигнатуры методов и не тратим силы на отлов его отсутствия на ревью. Некоторые сомневаются насчёт контекстных переменных: фактически они представляют собой глобальные переменные на стероидах, ещё и с магией в виде неявного копирования контекста между корутинами. Мы считаем, что практическая польза этого инструмента перевешивает идеологические недостатки, однако будем рады услышать ваши рассуждения на эту тему в комментариях. =========== Источник: habr.com =========== Похожие новости:
Блог компании Яндекс ), #_razrabotka_vebsajtov ( Разработка веб-сайтов ), #_python |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:22
Часовой пояс: UTC + 5