[Тестирование IT-систем, Java, Apache] Сбор данных и отправка в Apache Kafka
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Введение
Для анализа потоковых данных необходимы источники этих данных. Так же важна сама информация, которая предоставляется источниками. А источники с текстовой информацией, к примеру, еще и редки.
Из интересных источников можно выделить следующие: twitter, vk. Но эти источники подходят не под все задачи.
Есть источники с нужными данными, но эти источники не потоковые. Здесь можно привести следующее ссылки: public-apis.
При решении задач, связанных с потоковыми данными, можно воспользоваться старым способом.
Скачать данные и отправить в поток.
Для примера можно воспользоваться следующим источником: imdb.
Следует отметить, что imdb предоставляет данные самостоятельно. См. IMDb Datasets. Но можно принять, что данные собранные напрямую содержат более актуальную информацию.
Язык: Java 1.8.
Библиотеки: kafka 2.6.0, jsoup 1.13.1.
Сбор данных
Сбор данных представляет из себя сервис, который по входным данным загружает html-страницы, ищет нужную информацию и преобразует в набор объектов.
Итак источник данных: imdb. Информация будет собираться о фильмах и будет использован следующий запрос: https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s
Где 1, 2 параметр – это даты. 3 параметр – страны.
Для лучшего понимания источника данных можно обратится к следующему ресурсу: imdb-extensive-dataset.
Интерфейс для сервиса:
public interface MovieDirectScrapingService {
Collection<Movie> scrap();
}
Класс Movie – это класс, которые содержит информацию об одном фильме (или о шоу и т.п.).
class Movie {
public final String titleId;
public final String titleUrl;
public final String title;
public final String description;
public final Double rating;
public final String genres;
public final String runtime;
public final String baseUrl;
public final String baseNameUrl;
public final String baseTitleUrl;
public final String participantIds;
public final String participantNames;
public final String directorIds;
public final String directorNames;
…
Анализ данных на одной странице.
Информация собирается следующим образом. Данные закачиваются с помощью jsoup. Далее ищутся нужные html-элементы и трансформируются в экземпляры для фильмов.
String scrap(String url, List<Movie> items) {
Document doc = null;
try {
doc = Jsoup.connect(url).header("Accept-Language", language).get();
} catch (IOException e) {
e.printStackTrace();
}
if (doc != null) {
collectItems(doc, items);
return nextUrl(doc);
}
return "";
}
Поиск ссылки на следующею страницу.
String nextUrl(Document doc) {
Elements nextPageElements = doc.select(".next-page");
if (nextPageElements.size() > 0) {
Element hrefElement = nextPageElements.get(0);
return baseUrl + hrefElement.attributes().get("href");
}
return "";
}
Тогда основной метод будет таким. Формируется начальная строка поиска. Закачиваются данные по одной странице. Если есть следующая страница, то идет переход к ней. По окончании передаются накопленные данные.
@Override
public Collection<Movie> scrap() {
String url = String.format(
baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
startDate, endDate, countries
);
List<Movie> items = new ArrayList<>();
String nextUrl = url;
while (true) {
nextUrl = scrap(nextUrl, items);
if ("".equals(nextUrl)) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
return items;
}
Подробности по остальным методам можно найти в ссылках на ресурсы.
Отправка данных в топик
Формируется следующий сервис: MovieProducer. Здесь будет один единственный публичный метод: run.
Создается продюсер для кафки. Загружаются данные из источника. Трансформируются и отправляются в топик.
public void run() {
try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
bootstrapServers, clientId, topic)) {
Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
for (Data.Movie move : movies) {
Map<String, String> map = new HashMap<>();
map.put("title_id", move.titleId);
map.put("title_url", move.titleUrl);
…
String value = JSONObject.toJSONString(map);
String key = UUID.randomUUID().toString();
kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
}
producer.produce(kvList);
}
}
Теперь все вместе
Формируются нужные параметры для поиска. Загружаются данные и отправляются в топик.
Для этого понадобится еще один класс: MovieDirectScrapingExecutor. С одним публичным методом: run.
В цикле создаются данные для поиска из текущей даты. Происходит загрузка и отправка данных в топик.
public void run() {
int countriesCounter = 0;
List<String> countriesSource = Arrays.asList("us");
while (true) {
try {
LocalDate localDate = LocalDate.now();
int year = localDate.getYear();
int month = localDate.getMonthValue();
int day = localDate.getDayOfMonth();
String monthString = month < 9 ? "0" + month : Integer.toString(month);
String dayString = day < 9 ? "0" + day : Integer.toString(day);
String startDate = year + "-" + monthString + "-" + dayString;
String endDate = startDate;
String language = "en";
String countries = countriesSource.get(countriesCounter);
execute(language, startDate, endDate, countries);
Thread.sleep(1000);
countriesCounter += 1;
if (countriesCounter >= countriesSource.size()) {
countriesCounter = 0;
}
} catch (InterruptedException e) {
}
}
}
Для запуска потребуется экземпляр класса MovieDirectScrapingExecutor, который можно запустить с нужными параметрами, к примеру, из метода main.
Пример отправляемых данных для одного фильма.
{
"base_name_url": "https:\/\/www.imdb.com\/name",
"participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
"title_id": "tt13121702",
"rating": "0.0",
"base_url": "https:\/\/www.imdb.com",
"description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
"runtime": "",
"title": "The Christmas Edition",
"director_ids": "nm0838289~",
"title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
"director_names": "Peter Sullivan~",
"genres": "Drama, Romance",
"base_title_url": "https:\/\/www.imdb.com\/title",
"participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}
Подробности можно найти в ссылках на ресурсы.
Тесты
Для тестирования основной логики, которая связана с отправкой данных, можно воспользоваться юнит-тестами. В тестах предварительно создается kafka-сервер.
См. Apache Kafka и тестирование с Kafka Server.
Сам тест: MovieProducerTest.
public class MovieProducerTest {
@Test
void simple() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String zooKeeperHost = "127.0.0.1";
int zooKeeperPort = 22183;
String bootstrapServers = brokerHost + ":" + brokerPort;
String topic = "q-data";
String clientId = "simple";
try (KafkaServerService kafkaServerService = new KafkaServerService(
brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
)
) {
kafkaServerService.start();
kafkaServerService.createTopic(topic);
MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
new Data.Movie(…)
);
MovieProducer movieProducer =
new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
movieProducer.run();
kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
assertTrue(records.count() > 0);
ConsumerRecord<String, String> record = records.iterator().next();
JSONParser jsonParser = new JSONParser();
JSONObject jsonObject = null;
try {
jsonObject = (JSONObject) jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
assertNotNull(jsonObject);
…
});
Thread.sleep(5000);
}
}
}
Заключение
Конечно, описанный здесь способ получения источника потоковых данных, строго потоковым не является. Но для исследований и прототипов вполне может сойти.
Ссылки и ресурсы
Исходный код.
===========
Источник:
habr.com
===========
Похожие новости:
- [Java] Обеспечение границ компонент чистой архитектуры с помощью Spring Boot и ArchUnit (перевод)
- [Программирование, Java] Обработка исключений в контроллерах Spring
- [C#, Промышленное программирование] Как я при помощи Google сделал OPC2WEB клиент
- [] Исследование возможных заимствований и нарушений условий лицензирования в Java-коде на GitHub
- [Программирование, Java, Scala] Java 15 глазами программиста Scala (перевод)
- [] Единороги на страже вашей безопасности: исследуем код Bouncy Castle
- [] Unicorns on Guard for Your Safety: Exploring the Bouncy Castle Code
- [Тестирование IT-систем, Тестирование веб-сервисов, Тестирование мобильных приложений, Карьера в IT-индустрии] Собеседование для QA: резюме, вопросы на интервью, переговоры о зарплате + полезные ссылки
- [Open source, Java, Apache] Релиз Apache Ignite 2.9.0 — что нового?
- [Тестирование IT-систем, Программирование, TDD, Профессиональная литература] Что необходимо учитывать при юнит-тестировании фронтенда (перевод)
Теги для поиска: #_testirovanie_itsistem (Тестирование IT-систем), #_java, #_apache, #_java, #_java_8, #_kafka, #_apache_kafka, #_junit, #_jsoup, #_scraping, #_streaming, #_testirovanie_itsistem (
Тестирование IT-систем
), #_java, #_apache
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 23:15
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Введение Для анализа потоковых данных необходимы источники этих данных. Так же важна сама информация, которая предоставляется источниками. А источники с текстовой информацией, к примеру, еще и редки. Из интересных источников можно выделить следующие: twitter, vk. Но эти источники подходят не под все задачи. Есть источники с нужными данными, но эти источники не потоковые. Здесь можно привести следующее ссылки: public-apis. При решении задач, связанных с потоковыми данными, можно воспользоваться старым способом. Скачать данные и отправить в поток. Для примера можно воспользоваться следующим источником: imdb. Следует отметить, что imdb предоставляет данные самостоятельно. См. IMDb Datasets. Но можно принять, что данные собранные напрямую содержат более актуальную информацию. Язык: Java 1.8. Библиотеки: kafka 2.6.0, jsoup 1.13.1. Сбор данных Сбор данных представляет из себя сервис, который по входным данным загружает html-страницы, ищет нужную информацию и преобразует в набор объектов. Итак источник данных: imdb. Информация будет собираться о фильмах и будет использован следующий запрос: https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s Где 1, 2 параметр – это даты. 3 параметр – страны. Для лучшего понимания источника данных можно обратится к следующему ресурсу: imdb-extensive-dataset. Интерфейс для сервиса: public interface MovieDirectScrapingService {
Collection<Movie> scrap(); } Класс Movie – это класс, которые содержит информацию об одном фильме (или о шоу и т.п.). class Movie {
public final String titleId; public final String titleUrl; public final String title; public final String description; public final Double rating; public final String genres; public final String runtime; public final String baseUrl; public final String baseNameUrl; public final String baseTitleUrl; public final String participantIds; public final String participantNames; public final String directorIds; public final String directorNames; … Анализ данных на одной странице. Информация собирается следующим образом. Данные закачиваются с помощью jsoup. Далее ищутся нужные html-элементы и трансформируются в экземпляры для фильмов. String scrap(String url, List<Movie> items) {
Document doc = null; try { doc = Jsoup.connect(url).header("Accept-Language", language).get(); } catch (IOException e) { e.printStackTrace(); } if (doc != null) { collectItems(doc, items); return nextUrl(doc); } return ""; } Поиск ссылки на следующею страницу. String nextUrl(Document doc) {
Elements nextPageElements = doc.select(".next-page"); if (nextPageElements.size() > 0) { Element hrefElement = nextPageElements.get(0); return baseUrl + hrefElement.attributes().get("href"); } return ""; } Тогда основной метод будет таким. Формируется начальная строка поиска. Закачиваются данные по одной странице. Если есть следующая страница, то идет переход к ней. По окончании передаются накопленные данные. @Override
public Collection<Movie> scrap() { String url = String.format( baseUrl + "/search/title/?release_date=%s,%s&countries=%s", startDate, endDate, countries ); List<Movie> items = new ArrayList<>(); String nextUrl = url; while (true) { nextUrl = scrap(nextUrl, items); if ("".equals(nextUrl)) { break; } try { Thread.sleep(50); } catch (InterruptedException e) { } } return items; } Подробности по остальным методам можно найти в ссылках на ресурсы. Отправка данных в топик Формируется следующий сервис: MovieProducer. Здесь будет один единственный публичный метод: run. Создается продюсер для кафки. Загружаются данные из источника. Трансформируются и отправляются в топик. public void run() {
try (SimpleStringStringProducer producer = new SimpleStringStringProducer( bootstrapServers, clientId, topic)) { Collection<Data.Movie> movies = movieDirectScrapingService.scrap(); List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>(); for (Data.Movie move : movies) { Map<String, String> map = new HashMap<>(); map.put("title_id", move.titleId); map.put("title_url", move.titleUrl); … String value = JSONObject.toJSONString(map); String key = UUID.randomUUID().toString(); kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value)); } producer.produce(kvList); } } Теперь все вместе Формируются нужные параметры для поиска. Загружаются данные и отправляются в топик. Для этого понадобится еще один класс: MovieDirectScrapingExecutor. С одним публичным методом: run. В цикле создаются данные для поиска из текущей даты. Происходит загрузка и отправка данных в топик. public void run() {
int countriesCounter = 0; List<String> countriesSource = Arrays.asList("us"); while (true) { try { LocalDate localDate = LocalDate.now(); int year = localDate.getYear(); int month = localDate.getMonthValue(); int day = localDate.getDayOfMonth(); String monthString = month < 9 ? "0" + month : Integer.toString(month); String dayString = day < 9 ? "0" + day : Integer.toString(day); String startDate = year + "-" + monthString + "-" + dayString; String endDate = startDate; String language = "en"; String countries = countriesSource.get(countriesCounter); execute(language, startDate, endDate, countries); Thread.sleep(1000); countriesCounter += 1; if (countriesCounter >= countriesSource.size()) { countriesCounter = 0; } } catch (InterruptedException e) { } } } Для запуска потребуется экземпляр класса MovieDirectScrapingExecutor, который можно запустить с нужными параметрами, к примеру, из метода main. Пример отправляемых данных для одного фильма. {
"base_name_url": "https:\/\/www.imdb.com\/name", "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~", "title_id": "tt13121702", "rating": "0.0", "base_url": "https:\/\/www.imdb.com", "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »", "runtime": "", "title": "The Christmas Edition", "director_ids": "nm0838289~", "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt", "director_names": "Peter Sullivan~", "genres": "Drama, Romance", "base_title_url": "https:\/\/www.imdb.com\/title", "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~" } Подробности можно найти в ссылках на ресурсы. Тесты Для тестирования основной логики, которая связана с отправкой данных, можно воспользоваться юнит-тестами. В тестах предварительно создается kafka-сервер. См. Apache Kafka и тестирование с Kafka Server. Сам тест: MovieProducerTest. public class MovieProducerTest {
@Test void simple() throws InterruptedException { String brokerHost = "127.0.0.1"; int brokerPort = 29092; String zooKeeperHost = "127.0.0.1"; int zooKeeperPort = 22183; String bootstrapServers = brokerHost + ":" + brokerPort; String topic = "q-data"; String clientId = "simple"; try (KafkaServerService kafkaServerService = new KafkaServerService( brokerHost, brokerPort, zooKeeperHost, zooKeeperPort ) ) { kafkaServerService.start(); kafkaServerService.createTopic(topic); MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton( new Data.Movie(…) ); MovieProducer movieProducer = new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl); movieProducer.run(); kafkaServerService.poll(topic, "simple", 1, 5, (records) -> { assertTrue(records.count() > 0); ConsumerRecord<String, String> record = records.iterator().next(); JSONParser jsonParser = new JSONParser(); JSONObject jsonObject = null; try { jsonObject = (JSONObject) jsonParser.parse(record.value()); } catch (ParseException e) { e.printStackTrace(); } assertNotNull(jsonObject); … }); Thread.sleep(5000); } } } Заключение Конечно, описанный здесь способ получения источника потоковых данных, строго потоковым не является. Но для исследований и прототипов вполне может сойти. Ссылки и ресурсы Исходный код. =========== Источник: habr.com =========== Похожие новости:
Тестирование IT-систем ), #_java, #_apache |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 23:15
Часовой пояс: UTC + 5