[Python, Проектирование и рефакторинг] Мониторинг демон на Asyncio + Dependency Injector — руководство по применению dependency injection
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Привет,
Я создатель Dependency Injector. Это dependency injection фреймворк для Python.
Это еще одно руководство по построению приложений с помощью Dependency Injector.
Сегодня хочу показать как можно построить асинхронный демон на базе модуля asyncio.
Руководство состоит из таких частей:
- Что мы будем строить?
- Проверка инструментов
- Структура проекта
- Подготовка окружения
- Логирование и конфигурация
- Диспетчер
- Мониторинг example.com
- Мониторинг httpbin.org
- Тесты
- Заключение
Завершенный проект можно найти на Github.
Для старта желательно иметь:
- Начальные знания по asyncio
- Общее представление о принципе dependency injection
Что мы будем строить?
Мы будем строить мониторинг демон, который будет следить за доступом к веб-сервисам.
Демон будет посылать запросы к example.com и httpbin.org каждые несколько секунд. При получении ответа он будет записывать в лог такие данные:
- Код ответа
- Количество байт в ответе
- Время, затраченное на выполнение запроса
Проверка инструментов
Мы будем использовать Docker и docker-compose. Давайте проверим, что они установлены:
docker --version
docker-compose --version
Вывод должен выглядеть приблизительно так:
Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31
Если Docker или docker-compose не установлены, их нужно установить перед тем как продолжить. Следуйте этим руководствам:
Инструменты готовы. Переходим к структуре проекта.
Структура проекта
Создаем папку проекта и переходим в нее:
mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial
Теперь нам нужно создать начальную структуру проекта. Создаем файлы и папки следуя структуре ниже. Все файлы пока будут пустыми. Мы наполним их позже.
Начальная структура проекта:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Начальная структура проекта готова. Мы расширим ее с следующих секциях.
Дальше нас ждет подготовка окружения.
Подготовка окружения
В этом разделе мы подготовим окружение для запуска нашего демона.
Для начала нужно определить зависимости. Мы будем использовать такие пакеты:
- dependency-injector — dependency injection фреймворк
- aiohttp — веб фреймворк (нам нужен только http клиент)
- pyyaml — библиотека для парсинга YAML файлов, используется для чтения конфига
- pytest — фреймворк для тестирования
- pytest-asyncio — библиотека-помогатор для тестирования asyncio приложений
- pytest-cov — библиотека-помогатор для измерения покрытия кода тестами
Добавим следующие строки в файл requirements.txt:
dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov
И выполним в терминале:
pip install -r requirements.txt
Далее создаем Dockerfile. Он будет описывать процесс сборки и запуска нашего демона. Мы будем использовать python:3.8-buster в качестве базового образа.
Добавим следующие строки в файл Dockerfile:
FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY . /code/
RUN apt-get install openssl \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& rm -rf ~/.cache
CMD ["python", "-m", "monitoringdaemon"]
Последним шагом определим настройки docker-compose.
Добавим следующие строки в файл docker-compose.yml:
version: "3.7"
services:
monitor:
build: ./
image: monitoring-daemon
volumes:
- "./:/code"
Все готово. Давайте запустим сборку образа и проверим что окружение настроено верно.
Выполним в терминале:
docker-compose build
Процесс сборки может занять несколько минут. В конце вы должны увидеть:
Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest
После того как процесс сборки завершен запустим контейнер:
docker-compose up
Вы увидите:
Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0
Окружение готово. Контейнер запускается и завершает работу с кодом 0.
Следующим шагом мы настроим логирование и чтение файла конфигурации.
Логирование и конфигурация
В этом разделе мы настроим логирование и чтение файла конфигурации.
Начнем с добавления основной части нашего приложения — контейнера зависимостей (дальше просто контейнера). Контейнер будет содержать все компоненты приложения.
Добавим первые два компонента. Это объект конфигурации и функция настройки логирования.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.
Сначала используем, потом задаем значения.
Настройки логирования будут содержаться в конфигурационном файле.
Отредактируем config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
Теперь определим функцию, которая будет запускать наш демон. Её обычно называют main(). Она будет создавать контейнер. Контейнер будет использован для чтения конфигурационного файла и вызова функции настройки логирования.
Отредактируем __main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
if __name__ == '__main__':
main()
Контейнер — первый объект в приложении. Он используется для получения всех остальных объектов.
Логирование и чтение конфигурации настроено. В следующем разделе мы создадим диспетчер мониторинговых задач.
Диспетчер
Пришло время добавить диспетчер мониторинговых задач.
Диспетчер будет содержать список мониторинговых задач и контролировать их выполнение. Он будет выполнять каждую задачу в соответствии с расписанием. Класс Monitor — базовый класс для мониторинговых задач. Для создания конкретных задач нужно добавлять дочерние классы и реализовывать метод check().
Добавим диспетчер и базовый класс мониторинговой задачи.
Создадим dispatcher.py и monitors.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Добавим следующие строки в файл monitors.py:
"""Monitors module."""
import logging
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
и в файл dispatcher.py:
""""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
def __init__(self, monitors: List[Monitor]) -> None:
self._monitors = monitors
self._monitor_tasks: List[asyncio.Task] = []
self._logger = logging.getLogger(self.__class__.__name__)
self._stopping = False
def run(self) -> None:
asyncio.run(self.start())
async def start(self) -> None:
self._logger.info('Starting up')
for monitor in self._monitors:
self._monitor_tasks.append(
asyncio.create_task(self._run_monitor(monitor)),
)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
self.stop()
def stop(self) -> None:
if self._stopping:
return
self._stopping = True
self._logger.info('Shutting down')
for task, monitor in zip(self._monitor_tasks, self._monitors):
task.cancel()
self._logger.info('Shutdown finished successfully')
@staticmethod
async def _run_monitor(monitor: Monitor) -> None:
def _until_next(last: float) -> float:
time_took = time.time() - last
return monitor.check_every - time_took
while True:
time_start = time.time()
try:
await monitor.check()
except asyncio.CancelledError:
break
except Exception:
monitor.logger.exception('Error executing monitor check')
await asyncio.sleep(_until_next(last=time_start))
Диспетчер нужно добавить в контейнер.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
Каждый компонент добавляется в контейнер.
В завершении нам нужно обновить функцию main(). Мы получим диспетчер из контейнера и вызовем его метод run().
Отредактируем __main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
dispatcher = container.dispatcher()
dispatcher.run()
if __name__ == '__main__':
main()
Теперь запустим демон и проверим его работу.
Выполним в терминале:
docker-compose up
Вывод должен выглядеть так:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0
Все работает верно. Диспетчер запускается и выключается так как мониторинговых задач нет.
К концу этого раздела каркас нашего демона готов. В следующем разделе мы добавим первую мониторинговую задачу.
Мониторинг example.com
В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.
Мы начнем с расширения нашей модели классов новым типом мониторинговой задачи HttpMonitor.
HttpMonitor это дочерний класс Monitor. Мы реализуем метод check(). Он будет отправлять HTTP запрос и логировать полученный ответ. Детали выполнения HTTP запроса будут делегированы классу HttpClient.
Сперва добавим HttpClient.
Создадим файл http.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
И добавим в него следующие строки:
"""Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse
class HttpClient:
async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
async with ClientSession(timeout=ClientTimeout(timeout)) as session:
async with session.request(method, url) as response:
return response
Далее нужно добавить HttpClient в контейнер.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
Теперь мы готовы добавить HttpMonitor. Добавим его в модуль monitors.
Отредактируем monitors.py:
"""Monitors module."""
import logging
import time
from typing import Dict, Any
from .http import HttpClient
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
class HttpMonitor(Monitor):
def __init__(
self,
http_client: HttpClient,
options: Dict[str, Any],
) -> None:
self._client = http_client
self._method = options.pop('method')
self._url = options.pop('url')
self._timeout = options.pop('timeout')
super().__init__(check_every=options.pop('check_every'))
@property
def full_name(self) -> str:
return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)
async def check(self) -> None:
time_start = time.time()
response = await self._client.request(
method=self._method,
url=self._url,
timeout=self._timeout,
)
time_end = time.time()
time_took = time_end - time_start
self.logger.info(
'Response code: %s, content length: %s, request took: %s seconds',
response.status,
response.content_length,
round(time_took, 3)
)
У нас все готово для добавления проверки http://example.com. Нам нужно сделать два изменения в контейнере:
- Добавить фабрику example_monitor.
- Передать example_monitor в диспетчер.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
),
)
Провайдер example_monitor имеет зависимость от значений конфигурации. Давайте добавим эти значения:
Отредактируем config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
Все готово. Запускаем демон и проверяем работу.
Выполняем в терминале:
docker-compose up
И видим подобный вывод:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.067 seconds
monitor_1 |
monitor_1 | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.073 seconds
Наш демон может следить за наличием доступа к http://example.com.
Давайте добавим мониторинг https://httpbin.org.
Мониторинг httpbin.org
В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.
Добавление мониторинговой задачи для https://httpbin.org будет сделать легче, так как все компоненты уже готовы. Нам просто нужно добавить новый провайдер в контейнер и обновить конфигурацию.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
Отредактируем config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
httpbin:
method: "GET"
url: "https://httpbin.org/get"
timeout: 5
check_every: 5
Запустим демон и проверим логи.
Выполним в терминале:
docker-compose up
И видим подобный вывод:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.077 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.18 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.066 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.126 seconds
Функциональная часть завершена. Демон следит за наличием доступа к http://example.com и https://httpbin.org.
В следующем разделе мы добавим несколько тестов.
Тесты
Было бы неплохо добавить несколько тестов. Давайте сделаем это.
Создаем файл tests.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ ├── monitors.py
│ └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
и добавляем в него следующие строки:
"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import ApplicationContainer
@dataclasses.dataclass
class RequestStub:
status: int
content_length: int
@pytest.fixture
def container():
container = ApplicationContainer()
container.config.from_dict({
'log': {
'level': 'INFO',
'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
},
'monitors': {
'example': {
'method': 'GET',
'url': 'http://fake-example.com',
'timeout': 1,
'check_every': 1,
},
'httpbin': {
'method': 'GET',
'url': 'https://fake-httpbin.org/get',
'timeout': 1,
'check_every': 1,
},
},
})
return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
caplog.set_level('INFO')
http_client_mock = mock.AsyncMock()
http_client_mock.request.return_value = RequestStub(
status=200,
content_length=635,
)
with container.http_client.override(http_client_mock):
example_monitor = container.example_monitor()
await example_monitor.check()
assert 'http://fake-example.com' in caplog.text
assert 'response code: 200' in caplog.text
assert 'content length: 635' in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
caplog.set_level('INFO')
example_monitor_mock = mock.AsyncMock()
httpbin_monitor_mock = mock.AsyncMock()
with container.example_monitor.override(example_monitor_mock), \
container.httpbin_monitor.override(httpbin_monitor_mock):
dispatcher = container.dispatcher()
event_loop.create_task(dispatcher.start())
await asyncio.sleep(0.1)
dispatcher.stop()
assert example_monitor_mock.check.called
assert httpbin_monitor_mock.check.called
Для запуска тестов выполним в терминале:
docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon
Должен получиться подобный результат:
platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items
monitoringdaemon/tests.py .. [100%]
----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name Stmts Miss Cover
----------------------------------------------------
monitoringdaemon/__init__.py 0 0 100%
monitoringdaemon/__main__.py 9 9 0%
monitoringdaemon/containers.py 11 0 100%
monitoringdaemon/dispatcher.py 43 5 88%
monitoringdaemon/http.py 6 3 50%
monitoringdaemon/monitors.py 23 1 96%
monitoringdaemon/tests.py 37 0 100%
----------------------------------------------------
TOTAL 129 18 86%
Обратите внимание как в тесте test_example_monitor мы подменяем HttpClient моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.
Такие же действия выполняются в тесте test_dispatcher для подмены моками мониторинговых задач.
Заключение
Мы построили мониторинг демон на базе asyncio применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.
Преимущество, которое вы получаете с Dependency Injector — это контейнер.
Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит.
Что дальше?
- Узнайте больше о Dependency Injector на GitHub
- Ознакомтесь с документацией на Read the Docs
- Есть вопрос или нашли баг? Откройте issue на Github
===========
Источник:
habr.com
===========
Похожие новости:
- [Python, Django] Шесть Python-пакетов, которые рекомендуется использовать в каждом веб-приложении на Django (перевод)
- [API, Python] Реализация offline режима для Yandex.Music
- Facebook представил Pysa, статический анализатор для языка Python
- [Системное администрирование, Серверное администрирование, DevOps, Kubernetes] New Relic меняет бизнесь модель — открывает код агентов и инструментария (перевод)
- [Космонавтика, DIY или Сделай сам] Целимся и общаемся со спутниками: Часть первая — целимся программно
- [Программирование, Разработка мобильных приложений, Проектирование и рефакторинг, Управление разработкой] Какие навыки можно прокачать на проекте c большой кодовой базой
- [Python, Искусственный интеллект, Natural Language Processing] Итоговые проекты курса Deep Learning in Natural Language Processing (by DeepPavlov Lab)
- [Python, Nginx] Рецепты uWSGI: преобразование документов с использованием LibreOffice
- [PostgreSQL] Павел Труханов. Мониторинг Postgres по USE и RED. Расшифровка с PGConf.Russia
- [Ruby, Python, Программирование] Как работают профайлеры в Ruby и Python?
Теги для поиска: #_python, #_proektirovanie_i_refaktoring (Проектирование и рефакторинг), #_python, #_python3, #_asyncio, #_dependency_injection, #_inversion_of_control, #_refactoring, #_object_oriented_design, #_tests, #_unit_tests, #_pytest, #_daemon, #_monitoring, #_python, #_proektirovanie_i_refaktoring (
Проектирование и рефакторинг
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:44
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Привет, Я создатель Dependency Injector. Это dependency injection фреймворк для Python. Это еще одно руководство по построению приложений с помощью Dependency Injector. Сегодня хочу показать как можно построить асинхронный демон на базе модуля asyncio. Руководство состоит из таких частей:
Завершенный проект можно найти на Github. Для старта желательно иметь:
Что мы будем строить? Мы будем строить мониторинг демон, который будет следить за доступом к веб-сервисам. Демон будет посылать запросы к example.com и httpbin.org каждые несколько секунд. При получении ответа он будет записывать в лог такие данные:
Проверка инструментов Мы будем использовать Docker и docker-compose. Давайте проверим, что они установлены: docker --version
docker-compose --version Вывод должен выглядеть приблизительно так: Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31 Если Docker или docker-compose не установлены, их нужно установить перед тем как продолжить. Следуйте этим руководствам:
Инструменты готовы. Переходим к структуре проекта. Структура проекта Создаем папку проекта и переходим в нее: mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial Теперь нам нужно создать начальную структуру проекта. Создаем файлы и папки следуя структуре ниже. Все файлы пока будут пустыми. Мы наполним их позже. Начальная структура проекта: ./
├── monitoringdaemon/ │ ├── __init__.py │ ├── __main__.py │ └── containers.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt Начальная структура проекта готова. Мы расширим ее с следующих секциях. Дальше нас ждет подготовка окружения. Подготовка окружения В этом разделе мы подготовим окружение для запуска нашего демона. Для начала нужно определить зависимости. Мы будем использовать такие пакеты:
Добавим следующие строки в файл requirements.txt: dependency-injector
aiohttp pyyaml pytest pytest-asyncio pytest-cov И выполним в терминале: pip install -r requirements.txt
Далее создаем Dockerfile. Он будет описывать процесс сборки и запуска нашего демона. Мы будем использовать python:3.8-buster в качестве базового образа. Добавим следующие строки в файл Dockerfile: FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1 WORKDIR /code COPY . /code/ RUN apt-get install openssl \ && pip install --upgrade pip \ && pip install -r requirements.txt \ && rm -rf ~/.cache CMD ["python", "-m", "monitoringdaemon"] Последним шагом определим настройки docker-compose. Добавим следующие строки в файл docker-compose.yml: version: "3.7"
services: monitor: build: ./ image: monitoring-daemon volumes: - "./:/code" Все готово. Давайте запустим сборку образа и проверим что окружение настроено верно. Выполним в терминале: docker-compose build
Процесс сборки может занять несколько минут. В конце вы должны увидеть: Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest После того как процесс сборки завершен запустим контейнер: docker-compose up
Вы увидите: Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done Attaching to monitoring-daemon-tutorial_monitor_1 monitoring-daemon-tutorial_monitor_1 exited with code 0 Окружение готово. Контейнер запускается и завершает работу с кодом 0. Следующим шагом мы настроим логирование и чтение файла конфигурации. Логирование и конфигурация В этом разделе мы настроим логирование и чтение файла конфигурации. Начнем с добавления основной части нашего приложения — контейнера зависимостей (дальше просто контейнера). Контейнер будет содержать все компоненты приложения. Добавим первые два компонента. Это объект конфигурации и функция настройки логирования. Отредактируем containers.py: """Application containers module."""
import logging import sys from dependency_injector import containers, providers class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.
Сначала используем, потом задаем значения. Настройки логирования будут содержаться в конфигурационном файле. Отредактируем config.yml: log:
level: "INFO" format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s" Теперь определим функцию, которая будет запускать наш демон. Её обычно называют main(). Она будет создавать контейнер. Контейнер будет использован для чтения конфигурационного файла и вызова функции настройки логирования. Отредактируем __main__.py: """Main module."""
from .containers import ApplicationContainer def main() -> None: """Run the application.""" container = ApplicationContainer() container.config.from_yaml('config.yml') container.configure_logging() if __name__ == '__main__': main() Контейнер — первый объект в приложении. Он используется для получения всех остальных объектов.
Логирование и чтение конфигурации настроено. В следующем разделе мы создадим диспетчер мониторинговых задач. Диспетчер Пришло время добавить диспетчер мониторинговых задач. Диспетчер будет содержать список мониторинговых задач и контролировать их выполнение. Он будет выполнять каждую задачу в соответствии с расписанием. Класс Monitor — базовый класс для мониторинговых задач. Для создания конкретных задач нужно добавлять дочерние классы и реализовывать метод check(). Добавим диспетчер и базовый класс мониторинговой задачи. Создадим dispatcher.py и monitors.py в пакете monitoringdaemon: ./
├── monitoringdaemon/ │ ├── __init__.py │ ├── __main__.py │ ├── containers.py │ ├── dispatcher.py │ └── monitors.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt Добавим следующие строки в файл monitors.py: """Monitors module."""
import logging class Monitor: def __init__(self, check_every: int) -> None: self.check_every = check_every self.logger = logging.getLogger(self.__class__.__name__) async def check(self) -> None: raise NotImplementedError() и в файл dispatcher.py: """"Dispatcher module."""
import asyncio import logging import signal import time from typing import List from .monitors import Monitor class Dispatcher: def __init__(self, monitors: List[Monitor]) -> None: self._monitors = monitors self._monitor_tasks: List[asyncio.Task] = [] self._logger = logging.getLogger(self.__class__.__name__) self._stopping = False def run(self) -> None: asyncio.run(self.start()) async def start(self) -> None: self._logger.info('Starting up') for monitor in self._monitors: self._monitor_tasks.append( asyncio.create_task(self._run_monitor(monitor)), ) asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop) asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop) await asyncio.gather(*self._monitor_tasks, return_exceptions=True) self.stop() def stop(self) -> None: if self._stopping: return self._stopping = True self._logger.info('Shutting down') for task, monitor in zip(self._monitor_tasks, self._monitors): task.cancel() self._logger.info('Shutdown finished successfully') @staticmethod async def _run_monitor(monitor: Monitor) -> None: def _until_next(last: float) -> float: time_took = time.time() - last return monitor.check_every - time_took while True: time_start = time.time() try: await monitor.check() except asyncio.CancelledError: break except Exception: monitor.logger.exception('Error executing monitor check') await asyncio.sleep(_until_next(last=time_start)) Диспетчер нужно добавить в контейнер. Отредактируем containers.py: """Application containers module."""
import logging import sys from dependency_injector import containers, providers from . import dispatcher class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) dispatcher = providers.Factory( dispatcher.Dispatcher, monitors=providers.List( # TODO: add monitors ), ) Каждый компонент добавляется в контейнер.
В завершении нам нужно обновить функцию main(). Мы получим диспетчер из контейнера и вызовем его метод run(). Отредактируем __main__.py: """Main module."""
from .containers import ApplicationContainer def main() -> None: """Run the application.""" container = ApplicationContainer() container.config.from_yaml('config.yml') container.configure_logging() dispatcher = container.dispatcher() dispatcher.run() if __name__ == '__main__': main() Теперь запустим демон и проверим его работу. Выполним в терминале: docker-compose up
Вывод должен выглядеть так: Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1 | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully monitoring-daemon-tutorial_monitor_1 exited with code 0 Все работает верно. Диспетчер запускается и выключается так как мониторинговых задач нет. К концу этого раздела каркас нашего демона готов. В следующем разделе мы добавим первую мониторинговую задачу. Мониторинг example.com В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com. Мы начнем с расширения нашей модели классов новым типом мониторинговой задачи HttpMonitor. HttpMonitor это дочерний класс Monitor. Мы реализуем метод check(). Он будет отправлять HTTP запрос и логировать полученный ответ. Детали выполнения HTTP запроса будут делегированы классу HttpClient. Сперва добавим HttpClient. Создадим файл http.py в пакете monitoringdaemon: ./
├── monitoringdaemon/ │ ├── __init__.py │ ├── __main__.py │ ├── containers.py │ ├── dispatcher.py │ ├── http.py │ └── monitors.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt И добавим в него следующие строки: """Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse class HttpClient: async def request(self, method: str, url: str, timeout: int) -> ClientResponse: async with ClientSession(timeout=ClientTimeout(timeout)) as session: async with session.request(method, url) as response: return response Далее нужно добавить HttpClient в контейнер. Отредактируем containers.py: """Application containers module."""
import logging import sys from dependency_injector import containers, providers from . import http, dispatcher class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) http_client = providers.Factory(http.HttpClient) dispatcher = providers.Factory( dispatcher.Dispatcher, monitors=providers.List( # TODO: add monitors ), ) Теперь мы готовы добавить HttpMonitor. Добавим его в модуль monitors. Отредактируем monitors.py: """Monitors module."""
import logging import time from typing import Dict, Any from .http import HttpClient class Monitor: def __init__(self, check_every: int) -> None: self.check_every = check_every self.logger = logging.getLogger(self.__class__.__name__) async def check(self) -> None: raise NotImplementedError() class HttpMonitor(Monitor): def __init__( self, http_client: HttpClient, options: Dict[str, Any], ) -> None: self._client = http_client self._method = options.pop('method') self._url = options.pop('url') self._timeout = options.pop('timeout') super().__init__(check_every=options.pop('check_every')) @property def full_name(self) -> str: return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url) async def check(self) -> None: time_start = time.time() response = await self._client.request( method=self._method, url=self._url, timeout=self._timeout, ) time_end = time.time() time_took = time_end - time_start self.logger.info( 'Response code: %s, content length: %s, request took: %s seconds', response.status, response.content_length, round(time_took, 3) ) У нас все готово для добавления проверки http://example.com. Нам нужно сделать два изменения в контейнере:
Отредактируем containers.py: """Application containers module."""
import logging import sys from dependency_injector import containers, providers from . import http, monitors, dispatcher class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) http_client = providers.Factory(http.HttpClient) example_monitor = providers.Factory( monitors.HttpMonitor, http_client=http_client, options=config.monitors.example, ) dispatcher = providers.Factory( dispatcher.Dispatcher, monitors=providers.List( example_monitor, ), ) Провайдер example_monitor имеет зависимость от значений конфигурации. Давайте добавим эти значения: Отредактируем config.yml: log:
level: "INFO" format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s" monitors: example: method: "GET" url: "http://example.com" timeout: 5 check_every: 5 Все готово. Запускаем демон и проверяем работу. Выполняем в терминале: docker-compose up
И видим подобный вывод: Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1 | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up monitor_1 | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check monitor_1 | GET http://example.com monitor_1 | response code: 200 monitor_1 | content length: 648 monitor_1 | request took: 0.067 seconds monitor_1 | monitor_1 | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check monitor_1 | GET http://example.com monitor_1 | response code: 200 monitor_1 | content length: 648 monitor_1 | request took: 0.073 seconds Наш демон может следить за наличием доступа к http://example.com. Давайте добавим мониторинг https://httpbin.org. Мониторинг httpbin.org В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com. Добавление мониторинговой задачи для https://httpbin.org будет сделать легче, так как все компоненты уже готовы. Нам просто нужно добавить новый провайдер в контейнер и обновить конфигурацию. Отредактируем containers.py: """Application containers module."""
import logging import sys from dependency_injector import containers, providers from . import http, monitors, dispatcher class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) http_client = providers.Factory(http.HttpClient) example_monitor = providers.Factory( monitors.HttpMonitor, http_client=http_client, options=config.monitors.example, ) httpbin_monitor = providers.Factory( monitors.HttpMonitor, http_client=http_client, options=config.monitors.httpbin, ) dispatcher = providers.Factory( dispatcher.Dispatcher, monitors=providers.List( example_monitor, httpbin_monitor, ), ) Отредактируем config.yml: log:
level: "INFO" format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s" monitors: example: method: "GET" url: "http://example.com" timeout: 5 check_every: 5 httpbin: method: "GET" url: "https://httpbin.org/get" timeout: 5 check_every: 5 Запустим демон и проверим логи. Выполним в терминале: docker-compose up
И видим подобный вывод: Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1 | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up monitor_1 | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check monitor_1 | GET http://example.com monitor_1 | response code: 200 monitor_1 | content length: 648 monitor_1 | request took: 0.077 seconds monitor_1 | monitor_1 | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check monitor_1 | GET https://httpbin.org/get monitor_1 | response code: 200 monitor_1 | content length: 310 monitor_1 | request took: 0.18 seconds monitor_1 | monitor_1 | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check monitor_1 | GET http://example.com monitor_1 | response code: 200 monitor_1 | content length: 648 monitor_1 | request took: 0.066 seconds monitor_1 | monitor_1 | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check monitor_1 | GET https://httpbin.org/get monitor_1 | response code: 200 monitor_1 | content length: 310 monitor_1 | request took: 0.126 seconds Функциональная часть завершена. Демон следит за наличием доступа к http://example.com и https://httpbin.org. В следующем разделе мы добавим несколько тестов. Тесты Было бы неплохо добавить несколько тестов. Давайте сделаем это. Создаем файл tests.py в пакете monitoringdaemon: ./
├── monitoringdaemon/ │ ├── __init__.py │ ├── __main__.py │ ├── containers.py │ ├── dispatcher.py │ ├── http.py │ ├── monitors.py │ └── tests.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt и добавляем в него следующие строки: """Tests module."""
import asyncio import dataclasses from unittest import mock import pytest from .containers import ApplicationContainer @dataclasses.dataclass class RequestStub: status: int content_length: int @pytest.fixture def container(): container = ApplicationContainer() container.config.from_dict({ 'log': { 'level': 'INFO', 'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s', }, 'monitors': { 'example': { 'method': 'GET', 'url': 'http://fake-example.com', 'timeout': 1, 'check_every': 1, }, 'httpbin': { 'method': 'GET', 'url': 'https://fake-httpbin.org/get', 'timeout': 1, 'check_every': 1, }, }, }) return container @pytest.mark.asyncio async def test_example_monitor(container, caplog): caplog.set_level('INFO') http_client_mock = mock.AsyncMock() http_client_mock.request.return_value = RequestStub( status=200, content_length=635, ) with container.http_client.override(http_client_mock): example_monitor = container.example_monitor() await example_monitor.check() assert 'http://fake-example.com' in caplog.text assert 'response code: 200' in caplog.text assert 'content length: 635' in caplog.text @pytest.mark.asyncio async def test_dispatcher(container, caplog, event_loop): caplog.set_level('INFO') example_monitor_mock = mock.AsyncMock() httpbin_monitor_mock = mock.AsyncMock() with container.example_monitor.override(example_monitor_mock), \ container.httpbin_monitor.override(httpbin_monitor_mock): dispatcher = container.dispatcher() event_loop.create_task(dispatcher.start()) await asyncio.sleep(0.1) dispatcher.stop() assert example_monitor_mock.check.called assert httpbin_monitor_mock.check.called Для запуска тестов выполним в терминале: docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon
Должен получиться подобный результат: platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code plugins: asyncio-0.14.0, cov-2.10.0 collected 2 items monitoringdaemon/tests.py .. [100%] ----------- coverage: platform linux, python 3.8.3-final-0 ----------- Name Stmts Miss Cover ---------------------------------------------------- monitoringdaemon/__init__.py 0 0 100% monitoringdaemon/__main__.py 9 9 0% monitoringdaemon/containers.py 11 0 100% monitoringdaemon/dispatcher.py 43 5 88% monitoringdaemon/http.py 6 3 50% monitoringdaemon/monitors.py 23 1 96% monitoringdaemon/tests.py 37 0 100% ---------------------------------------------------- TOTAL 129 18 86% Обратите внимание как в тесте test_example_monitor мы подменяем HttpClient моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.
Такие же действия выполняются в тесте test_dispatcher для подмены моками мониторинговых задач. Заключение Мы построили мониторинг демон на базе asyncio применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка. Преимущество, которое вы получаете с Dependency Injector — это контейнер. Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте: """Application containers module."""
import logging import sys from dependency_injector import containers, providers from . import http, monitors, dispatcher class ApplicationContainer(containers.DeclarativeContainer): """Application container.""" config = providers.Configuration() configure_logging = providers.Callable( logging.basicConfig, stream=sys.stdout, level=config.log.level, format=config.log.format, ) http_client = providers.Factory(http.HttpClient) example_monitor = providers.Factory( monitors.HttpMonitor, http_client=http_client, options=config.monitors.example, ) httpbin_monitor = providers.Factory( monitors.HttpMonitor, http_client=http_client, options=config.monitors.httpbin, ) dispatcher = providers.Factory( dispatcher.Dispatcher, monitors=providers.List( example_monitor, httpbin_monitor, ), ) Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит. Что дальше?
=========== Источник: habr.com =========== Похожие новости:
Проектирование и рефакторинг ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:44
Часовой пояс: UTC + 5