[Python] aio api crawler
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем пример. Я начал работать над библиотекой для выдергивания данных из разных json api. Также она может использоваться для тестирования api.
Апишки описываются в виде классов, например
class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories"
params = {"page": range(100), "language": "en"}
headers = {"User-Agent": get_user_agent}
results_key = "*.slug"
categories = Categories()
class Posts(JsonEndpoint):
url = "http://127.0.0.1:8888/categories/{category}/posts"
params = {"page": range(100), "language": "en"}
url_params = {"category": categories.iter_results()}
results_key = "posts"
async def comments(self, post):
comments = Comments(
self.session,
url_params={"category": post.url.params["category"], "id": post["id"]},
)
return [comment async for comment in comments]
posts = Posts()
В params и url_params могут быть функции(как здесь get_user_agent — возвращает случайный useragent), range, итераторы, awaitable и асинхронные итераторы(таким образом можно увязать их между собой).
В параметрах headers и cookies тоже могут быть функции и awaitable.
Апи категорий в примере выше возвращает массив объектов, у которых есть slug, итератор будет возвроащать именно их. Подсунув этот итератор в url_params постов, итератор пройдется рекурсивно по всем категориям и по всем страницам в каждой. Он прервется когда наткнется на 404 или какую-то другую ошибку и перейдет к следующей категории.
А репозитории есть пример aiohttp сервера для этих классов чтобы всё можно было протестировать.
Помимо get параметров можно передавать их как data или json и задать другой method.
results_key разбивается по точке и будет пытаться выдергивать ключи из результатов. Например «comments.*.text» вернет текст каждого комментария из массива внутри comments.
Результаты оборачиваются во wrapper у которого есть свойства url и params. url это производное строки, у которой тоже есть params. Таким образом можно узнать какие параметры использовались для получения данного результата Это демонстрируется в методе comments.
Также там есть базовый класс Sink для обработки результатов. Например, складывания их в mq или базу данных. Он работает в отдельных тасках и получает данные через asyncio.Queue.
class LoggingSink(Sink):
def transform(self, obj):
return repr(obj)
async def init(self):
from loguru import logger
self.logger = logger
async def process(self, obj):
self.logger.info(obj)
return True
sink = LoggingSink(num_tasks=1)
Пример простейшего Sink. Метод transform позволяет провести какие-то манипуляции с объектом и вернуть None, если он нам не подходит. т.е. в тем также можно сделать валидацию.
Sink это асинхронный contextmanager, который при выходе по-идее будет ждать пока все объекты в очереди будут обработаны, потом отменит свои таски.
Ну и, наконец, для связки этого всего вместе я сделал класс Worker. Он принимает один endpoint и несколько sink`ов. Например,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()
run запустит asyncio.run_until_complete для pipeline`а worker`а. У него также есть метод transform.
Ещё есть класс WorkerGroup который позволяет создать сразу несколько воркеров и сделать asyncio.gather для них.
В коде есть пример сервера, который генерит данные через faker и обработчиков для его endpoint`ов. Думаю это нагляднее всего.
Всё это на ранней стадии развития и я пока что часто менял api. Но сейчас вроде пришел к тому как это должно выглядеть. Буду раз merge request`ам и комментариям к моему коду.
===========
Источник:
habr.com
===========
Похожие новости:
- Выпуск Pyston 2, реализации языка Python с JIT-компилятором
- [Python, Big Data, Машинное обучение] AutoVIML: Автоматизированное машинное обучение (перевод)
- [Python, Алгоритмы, Машинное обучение, Data Engineering] Расширение возможностей алгоритмов Машинного Обучения с помощью библиотеки daal4py
- [Python, JavaScript, Java] Разбор вступительных задач Школы Программистов hh.ru
- [Разработка веб-сайтов, Python, Django] Что происходит, когда вы выполняете manage.py test? (перевод)
- [Python, Программирование, Математика, Научно-популярное, Физика] Изучаем распространение радиосигналов в ионосфере с помощью SDR
- [Python, Информационная безопасность] Сказ о том, как я токен в Линуксе хранил
- [Python] Мелкая питонячая радость #11: реактивное программирование, парсинг страниц и публикация моделей машинного обучения
- [Высокая производительность, Python, Программирование, Машинное обучение] Deep Learning Inference Benchmark — измеряем скорость работы моделей глубокого обучения
- [C++, Алгоритмы, Программирование, Процессоры, Разработка веб-сайтов] Исключительно быстрая валидация UTF-8 (перевод)
Теги для поиска: #_python, #_aiohttp, #_python_3, #_json, #_python
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 23:24
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем пример. Я начал работать над библиотекой для выдергивания данных из разных json api. Также она может использоваться для тестирования api. Апишки описываются в виде классов, например class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories" params = {"page": range(100), "language": "en"} headers = {"User-Agent": get_user_agent} results_key = "*.slug" categories = Categories() class Posts(JsonEndpoint): url = "http://127.0.0.1:8888/categories/{category}/posts" params = {"page": range(100), "language": "en"} url_params = {"category": categories.iter_results()} results_key = "posts" async def comments(self, post): comments = Comments( self.session, url_params={"category": post.url.params["category"], "id": post["id"]}, ) return [comment async for comment in comments] posts = Posts() В params и url_params могут быть функции(как здесь get_user_agent — возвращает случайный useragent), range, итераторы, awaitable и асинхронные итераторы(таким образом можно увязать их между собой). В параметрах headers и cookies тоже могут быть функции и awaitable. Апи категорий в примере выше возвращает массив объектов, у которых есть slug, итератор будет возвроащать именно их. Подсунув этот итератор в url_params постов, итератор пройдется рекурсивно по всем категориям и по всем страницам в каждой. Он прервется когда наткнется на 404 или какую-то другую ошибку и перейдет к следующей категории. А репозитории есть пример aiohttp сервера для этих классов чтобы всё можно было протестировать. Помимо get параметров можно передавать их как data или json и задать другой method. results_key разбивается по точке и будет пытаться выдергивать ключи из результатов. Например «comments.*.text» вернет текст каждого комментария из массива внутри comments. Результаты оборачиваются во wrapper у которого есть свойства url и params. url это производное строки, у которой тоже есть params. Таким образом можно узнать какие параметры использовались для получения данного результата Это демонстрируется в методе comments. Также там есть базовый класс Sink для обработки результатов. Например, складывания их в mq или базу данных. Он работает в отдельных тасках и получает данные через asyncio.Queue. class LoggingSink(Sink):
def transform(self, obj): return repr(obj) async def init(self): from loguru import logger self.logger = logger async def process(self, obj): self.logger.info(obj) return True sink = LoggingSink(num_tasks=1) Пример простейшего Sink. Метод transform позволяет провести какие-то манипуляции с объектом и вернуть None, если он нам не подходит. т.е. в тем также можно сделать валидацию. Sink это асинхронный contextmanager, который при выходе по-идее будет ждать пока все объекты в очереди будут обработаны, потом отменит свои таски. Ну и, наконец, для связки этого всего вместе я сделал класс Worker. Он принимает один endpoint и несколько sink`ов. Например, worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run() run запустит asyncio.run_until_complete для pipeline`а worker`а. У него также есть метод transform. Ещё есть класс WorkerGroup который позволяет создать сразу несколько воркеров и сделать asyncio.gather для них. В коде есть пример сервера, который генерит данные через faker и обработчиков для его endpoint`ов. Думаю это нагляднее всего. Всё это на ранней стадии развития и я пока что часто менял api. Но сейчас вроде пришел к тому как это должно выглядеть. Буду раз merge request`ам и комментариям к моему коду. =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 23:24
Часовой пояс: UTC + 5