[Python, Высокая производительность, Распределённые системы, Финансы в IT] Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Как я дошёл до жизни такой?
Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.
Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть… в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа — не очень… Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.
В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное — это то, что библиотека асинхронна.
Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность — типизированные данные для передачи в топик.
Что будем делать?
Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.
P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:
Требования к проекту
В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:
- Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow — за последний год) — регулярно
- Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) — регулярно
- Выгружать последние торговые данные — регулярно
- Выгружать настроенный список индикаторов для каждой ценной бумаги — регулярно
Как полагается, выбираем имя проекту с потолка: horton
Готовим инфраструктуру
Заголовок конечно сильный, однако, всё что нужно сделать — это написать небольшой конфиг для docker-compose с kafka (и zookeeper — в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 — порт zookeeper'а. По остальному, я думаю, ясно.
Готовим скелет проекта
В базовом варианте структура нашего проекта должна выглядеть так:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**
Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.
Начнём с зависимостей и мета о проекте — pyproject.toml
Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):
pip3 install poetry (если ещё не установлено)
poetry install
Теперь создадим config.yml — креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py — извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу — sitri.
По подключению с монго — совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.
Что будет дальше?
Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте — обещаю, что в следующей части будет экшн и графика.
Итак, а в этой самой следующей части мы:
- Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
- Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.
Код проекта
Код этой части
===========
Источник:
habr.com
===========
Похожие новости:
- [Развитие стартапа, Управление продуктом, Финансы в IT] Кайф трекерства в экспансии. Интервью с трекером Дмитрием Безнасюком
- [Python, Программирование, Визуальное программирование] Опыт проведения городской школьной олимпиады по программированию
- [Google Cloud Platform, Python, R, Профессиональная литература] Учимся обращаться к данным и запрашивать их при помощи Google BigQuery. С примерами на Python и R (перевод)
- [IT-компании, Информационная безопасность, Финансы в IT] Яндекс закрыл проект, который оценивал заёмщиков банков. Эксперты увидели в нем угрозу личным данным
- [Анализ и проектирование систем, Высокая производительность, Программирование, Промышленное программирование] Паттерн «сага» как способ обеспечения консистентности данных
- [Высокая производительность, Венчурные инвестиции, Распределённые системы, Криптовалюты] Взгляд изнутри на строительство компании в Кремниевой долине
- [Высокая производительность, Компьютерное железо, Настольные компьютеры, Процессоры] Объем или частота, сколько нужно оперативной памяти в 2020 году?
- [Big Data, Data Mining, Open source, Python] Crime, Race and Lethal Force in the USA — Part 2 (перевод)
- [Open source, Python, Data Mining] Crime, Race and Lethal Force in the USA — Part 1 (перевод)
- [Python, Анализ и проектирование систем] Многоканальные массовые рассылки на Redis
Теги для поиска: #_python, #_vysokaja_proizvoditelnost (Высокая производительность), #_raspredelennye_sistemy (Распределённые системы), #_finansy_v_it (Финансы в IT), #_faust, #_celery, #_python, #_mongodb, #_mongo, #_finansy (финансы), #_finance, #_aiohttp, #_async, #_asyncio, #_kafka, #_kafka_streams, #_python, #_vysokaja_proizvoditelnost (
Высокая производительность
), #_raspredelennye_sistemy (
Распределённые системы
), #_finansy_v_it (
Финансы в IT
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 11:23
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Как я дошёл до жизни такой? Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи. Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть… в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа — не очень… Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут. В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное — это то, что библиотека асинхронна. Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность — типизированные данные для передачи в топик. Что будем делать? Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения. P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так: Требования к проекту В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:
Как полагается, выбираем имя проекту с потолка: horton Готовим инфраструктуру Заголовок конечно сильный, однако, всё что нужно сделать — это написать небольшой конфиг для docker-compose с kafka (и zookeeper — в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида: version: '3'
services: db: container_name: horton-mongodb-local image: mongo:4.2-bionic command: mongod --port 20017 restart: always ports: - 20017:20017 environment: - MONGO_INITDB_DATABASE=horton - MONGO_INITDB_ROOT_USERNAME=admin - MONGO_INITDB_ROOT_PASSWORD=admin_password kafka-service: container_name: horton-kafka-local image: obsidiandynamics/kafka restart: always ports: - "2181:2181" - "9092:9092" environment: KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092" KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000" KAFKA_RESTART_ATTEMPTS: "10" KAFKA_RESTART_DELAY: "5" ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0" kafdrop: container_name: horton-kafdrop-local image: 'obsidiandynamics/kafdrop:latest' restart: always ports: - '9000:9000' environment: KAFKA_BROKERCONNECT: kafka-service:29092 depends_on: - kafka-service Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 — порт zookeeper'а. По остальному, я думаю, ясно. Готовим скелет проекта В базовом варианте структура нашего проекта должна выглядеть так: horton
├── docker-compose.yml └── horton ├── agents.py * ├── alphavantage.py * ├── app.py * ├── config.py ├── database │ ├── connect.py │ ├── cruds │ │ ├── base.py │ │ ├── __init__.py │ │ └── security.py * │ └── __init__.py ├── __init__.py ├── records.py * └── tasks.py * *Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.** Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии. Начнём с зависимостей и мета о проекте — pyproject.toml Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение): pip3 install poetry (если ещё не установлено)
poetry install Теперь создадим config.yml — креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py — извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу — sitri. По подключению с монго — совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям. Что будет дальше? Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте — обещаю, что в следующей части будет экшн и графика. Итак, а в этой самой следующей части мы:
Код проекта Код этой части =========== Источник: habr.com =========== Похожие новости:
Высокая производительность ), #_raspredelennye_sistemy ( Распределённые системы ), #_finansy_v_it ( Финансы в IT ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 11:23
Часовой пояс: UTC + 5