[Python, Проектирование и рефакторинг] Мониторинг демон на Asyncio + Dependency Injector — руководство по применению dependency injection

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
09-Авг-2020 11:31

Привет,
Я создатель Dependency Injector. Это dependency injection фреймворк для Python.
Это еще одно руководство по построению приложений с помощью Dependency Injector.
Сегодня хочу показать как можно построить асинхронный демон на базе модуля asyncio.
Руководство состоит из таких частей:

Завершенный проект можно найти на Github.
Для старта желательно иметь:
  • Начальные знания по asyncio
  • Общее представление о принципе dependency injection

Что мы будем строить?
Мы будем строить мониторинг демон, который будет следить за доступом к веб-сервисам.
Демон будет посылать запросы к example.com и httpbin.org каждые несколько секунд. При получении ответа он будет записывать в лог такие данные:
  • Код ответа
  • Количество байт в ответе
  • Время, затраченное на выполнение запроса


Проверка инструментов
Мы будем использовать Docker и docker-compose. Давайте проверим, что они установлены:
docker --version
docker-compose --version

Вывод должен выглядеть приблизительно так:
Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31

Если Docker или docker-compose не установлены, их нужно установить перед тем как продолжить. Следуйте этим руководствам:

Инструменты готовы. Переходим к структуре проекта.
Структура проекта
Создаем папку проекта и переходим в нее:
mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial

Теперь нам нужно создать начальную структуру проекта. Создаем файлы и папки следуя структуре ниже. Все файлы пока будут пустыми. Мы наполним их позже.
Начальная структура проекта:
./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Начальная структура проекта готова. Мы расширим ее с следующих секциях.
Дальше нас ждет подготовка окружения.
Подготовка окружения
В этом разделе мы подготовим окружение для запуска нашего демона.
Для начала нужно определить зависимости. Мы будем использовать такие пакеты:
  • dependency-injector — dependency injection фреймворк
  • aiohttp — веб фреймворк (нам нужен только http клиент)
  • pyyaml — библиотека для парсинга YAML файлов, используется для чтения конфига
  • pytest — фреймворк для тестирования
  • pytest-asyncio — библиотека-помогатор для тестирования asyncio приложений
  • pytest-cov — библиотека-помогатор для измерения покрытия кода тестами

Добавим следующие строки в файл requirements.txt:
dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov

И выполним в терминале:
pip install -r requirements.txt

Далее создаем Dockerfile. Он будет описывать процесс сборки и запуска нашего демона. Мы будем использовать python:3.8-buster в качестве базового образа.
Добавим следующие строки в файл Dockerfile:
FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY . /code/
RUN apt-get install openssl \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& rm -rf ~/.cache
CMD ["python", "-m", "monitoringdaemon"]

Последним шагом определим настройки docker-compose.
Добавим следующие строки в файл docker-compose.yml:
version: "3.7"
services:
  monitor:
    build: ./
    image: monitoring-daemon
    volumes:
      - "./:/code"

Все готово. Давайте запустим сборку образа и проверим что окружение настроено верно.
Выполним в терминале:
docker-compose build

Процесс сборки может занять несколько минут. В конце вы должны увидеть:
Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest

После того как процесс сборки завершен запустим контейнер:
docker-compose up

Вы увидите:
Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0

Окружение готово. Контейнер запускается и завершает работу с кодом 0.
Следующим шагом мы настроим логирование и чтение файла конфигурации.
Логирование и конфигурация
В этом разделе мы настроим логирование и чтение файла конфигурации.
Начнем с добавления основной части нашего приложения — контейнера зависимостей (дальше просто контейнера). Контейнер будет содержать все компоненты приложения.
Добавим первые два компонента. Это объект конфигурации и функция настройки логирования.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.
Сначала используем, потом задаем значения.

Настройки логирования будут содержаться в конфигурационном файле.
Отредактируем config.yml:
log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

Теперь определим функцию, которая будет запускать наш демон. Её обычно называют main(). Она будет создавать контейнер. Контейнер будет использован для чтения конфигурационного файла и вызова функции настройки логирования.
Отредактируем __main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
    """Run the application."""
    container = ApplicationContainer()
    container.config.from_yaml('config.yml')
    container.configure_logging()
if __name__ == '__main__':
    main()

Контейнер — первый объект в приложении. Он используется для получения всех остальных объектов.

Логирование и чтение конфигурации настроено. В следующем разделе мы создадим диспетчер мониторинговых задач.
Диспетчер
Пришло время добавить диспетчер мониторинговых задач.
Диспетчер будет содержать список мониторинговых задач и контролировать их выполнение. Он будет выполнять каждую задачу в соответствии с расписанием. Класс Monitor — базовый класс для мониторинговых задач. Для создания конкретных задач нужно добавлять дочерние классы и реализовывать метод check().

Добавим диспетчер и базовый класс мониторинговой задачи.
Создадим dispatcher.py и monitors.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Добавим следующие строки в файл monitors.py:
"""Monitors module."""
import logging
class Monitor:
    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)
    async def check(self) -> None:
        raise NotImplementedError()

и в файл dispatcher.py:
""""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
    def __init__(self, monitors: List[Monitor]) -> None:
        self._monitors = monitors
        self._monitor_tasks: List[asyncio.Task] = []
        self._logger = logging.getLogger(self.__class__.__name__)
        self._stopping = False
    def run(self) -> None:
        asyncio.run(self.start())
    async def start(self) -> None:
        self._logger.info('Starting up')
        for monitor in self._monitors:
            self._monitor_tasks.append(
                asyncio.create_task(self._run_monitor(monitor)),
            )
        asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
        asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
        await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
        self.stop()
    def stop(self) -> None:
        if self._stopping:
            return
        self._stopping = True
        self._logger.info('Shutting down')
        for task, monitor in zip(self._monitor_tasks, self._monitors):
            task.cancel()
        self._logger.info('Shutdown finished successfully')
    @staticmethod
    async def _run_monitor(monitor: Monitor) -> None:
        def _until_next(last: float) -> float:
            time_took = time.time() - last
            return monitor.check_every - time_took
        while True:
            time_start = time.time()
            try:
                await monitor.check()
            except asyncio.CancelledError:
                break
            except Exception:
                monitor.logger.exception('Error executing monitor check')
            await asyncio.sleep(_until_next(last=time_start))

Диспетчер нужно добавить в контейнер.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )
    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )

Каждый компонент добавляется в контейнер.

В завершении нам нужно обновить функцию main(). Мы получим диспетчер из контейнера и вызовем его метод run().
Отредактируем __main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
    """Run the application."""
    container = ApplicationContainer()
    container.config.from_yaml('config.yml')
    container.configure_logging()
    dispatcher = container.dispatcher()
    dispatcher.run()
if __name__ == '__main__':
    main()

Теперь запустим демон и проверим его работу.
Выполним в терминале:
docker-compose up

Вывод должен выглядеть так:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0

Все работает верно. Диспетчер запускается и выключается так как мониторинговых задач нет.
К концу этого раздела каркас нашего демона готов. В следующем разделе мы добавим первую мониторинговую задачу.
Мониторинг example.com
В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.
Мы начнем с расширения нашей модели классов новым типом мониторинговой задачи HttpMonitor.
HttpMonitor это дочерний класс Monitor. Мы реализуем метод check(). Он будет отправлять HTTP запрос и логировать полученный ответ. Детали выполнения HTTP запроса будут делегированы классу HttpClient.

Сперва добавим HttpClient.
Создадим файл http.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

И добавим в него следующие строки:
"""Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse
class HttpClient:
    async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
        async with ClientSession(timeout=ClientTimeout(timeout)) as session:
            async with session.request(method, url) as response:
                return response

Далее нужно добавить HttpClient в контейнер.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )
    http_client = providers.Factory(http.HttpClient)
    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )

Теперь мы готовы добавить HttpMonitor. Добавим его в модуль monitors.
Отредактируем monitors.py:
"""Monitors module."""
import logging
import time
from typing import Dict, Any
from .http import HttpClient
class Monitor:
    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)
    async def check(self) -> None:
        raise NotImplementedError()
class HttpMonitor(Monitor):
    def __init__(
            self,
            http_client: HttpClient,
            options: Dict[str, Any],
    ) -> None:
        self._client = http_client
        self._method = options.pop('method')
        self._url = options.pop('url')
        self._timeout = options.pop('timeout')
        super().__init__(check_every=options.pop('check_every'))
    @property
    def full_name(self) -> str:
        return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)
    async def check(self) -> None:
        time_start = time.time()
        response = await self._client.request(
            method=self._method,
            url=self._url,
            timeout=self._timeout,
        )
        time_end = time.time()
        time_took = time_end - time_start
        self.logger.info(
            'Response code: %s, content length: %s, request took: %s seconds',
            response.status,
            response.content_length,
            round(time_took, 3)
        )

У нас все готово для добавления проверки http://example.com. Нам нужно сделать два изменения в контейнере:
  • Добавить фабрику example_monitor.
  • Передать example_monitor в диспетчер.

Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )
    http_client = providers.Factory(http.HttpClient)
    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )
    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
        ),
    )

Провайдер example_monitor имеет зависимость от значений конфигурации. Давайте добавим эти значения:
Отредактируем config.yml:
log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5

Все готово. Запускаем демон и проверяем работу.
Выполняем в терминале:
docker-compose up

И видим подобный вывод:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.067 seconds
monitor_1  |
monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.073 seconds

Наш демон может следить за наличием доступа к http://example.com.
Давайте добавим мониторинг https://httpbin.org.
Мониторинг httpbin.org
В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.
Добавление мониторинговой задачи для https://httpbin.org будет сделать легче, так как все компоненты уже готовы. Нам просто нужно добавить новый провайдер в контейнер и обновить конфигурацию.
Отредактируем containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )
    http_client = providers.Factory(http.HttpClient)
    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )
    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )
    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )

Отредактируем config.yml:
log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5
  httpbin:
    method: "GET"
    url: "https://httpbin.org/get"
    timeout: 5
    check_every: 5

Запустим демон и проверим логи.
Выполним в терминале:
docker-compose up

И видим подобный вывод:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.077 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.18 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.066 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.126 seconds

Функциональная часть завершена. Демон следит за наличием доступа к http://example.com и https://httpbin.org.
В следующем разделе мы добавим несколько тестов.
Тесты
Было бы неплохо добавить несколько тестов. Давайте сделаем это.
Создаем файл tests.py в пакете monitoringdaemon:
./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   ├── monitors.py
│   └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

и добавляем в него следующие строки:
"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import ApplicationContainer
@dataclasses.dataclass
class RequestStub:
    status: int
    content_length: int
@pytest.fixture
def container():
    container = ApplicationContainer()
    container.config.from_dict({
        'log': {
            'level': 'INFO',
            'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
        },
        'monitors': {
            'example': {
                'method': 'GET',
                'url': 'http://fake-example.com',
                'timeout': 1,
                'check_every': 1,
            },
            'httpbin': {
                'method': 'GET',
                'url': 'https://fake-httpbin.org/get',
                'timeout': 1,
                'check_every': 1,
            },
        },
    })
    return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
    caplog.set_level('INFO')
    http_client_mock = mock.AsyncMock()
    http_client_mock.request.return_value = RequestStub(
        status=200,
        content_length=635,
    )
    with container.http_client.override(http_client_mock):
        example_monitor = container.example_monitor()
        await example_monitor.check()
    assert 'http://fake-example.com' in caplog.text
    assert 'response code: 200' in caplog.text
    assert 'content length: 635' in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
    caplog.set_level('INFO')
    example_monitor_mock = mock.AsyncMock()
    httpbin_monitor_mock = mock.AsyncMock()
    with container.example_monitor.override(example_monitor_mock), \
            container.httpbin_monitor.override(httpbin_monitor_mock):
        dispatcher = container.dispatcher()
        event_loop.create_task(dispatcher.start())
        await asyncio.sleep(0.1)
        dispatcher.stop()
    assert example_monitor_mock.check.called
    assert httpbin_monitor_mock.check.called

Для запуска тестов выполним в терминале:
docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon

Должен получиться подобный результат:
platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items
monitoringdaemon/tests.py ..                                    [100%]
----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name                             Stmts   Miss  Cover
----------------------------------------------------
monitoringdaemon/__init__.py         0      0   100%
monitoringdaemon/__main__.py         9      9     0%
monitoringdaemon/containers.py      11      0   100%
monitoringdaemon/dispatcher.py      43      5    88%
monitoringdaemon/http.py             6      3    50%
monitoringdaemon/monitors.py        23      1    96%
monitoringdaemon/tests.py           37      0   100%
----------------------------------------------------
TOTAL                              129     18    86%

Обратите внимание как в тесте test_example_monitor мы подменяем HttpClient моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.

Такие же действия выполняются в тесте test_dispatcher для подмены моками мониторинговых задач.

Заключение
Мы построили мониторинг демон на базе asyncio применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.
Преимущество, которое вы получаете с Dependency Injector — это контейнер.
Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""
    config = providers.Configuration()
    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )
    http_client = providers.Factory(http.HttpClient)
    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )
    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )
    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )


Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит.
Что дальше?

===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_python, #_proektirovanie_i_refaktoring (Проектирование и рефакторинг), #_python, #_python3, #_asyncio, #_dependency_injection, #_inversion_of_control, #_refactoring, #_object_oriented_design, #_tests, #_unit_tests, #_pytest, #_daemon, #_monitoring, #_python, #_proektirovanie_i_refaktoring (
Проектирование и рефакторинг
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 23-Ноя 00:34
Часовой пояс: UTC + 5