[Тестирование IT-систем, Java, Apache] Сбор данных и отправка в Apache Kafka

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
15-Ноя-2020 21:31

Введение
Для анализа потоковых данных необходимы источники этих данных. Так же важна сама информация, которая предоставляется источниками. А источники с текстовой информацией, к примеру, еще и редки.
Из интересных источников можно выделить следующие: twitter, vk. Но эти источники подходят не под все задачи.
Есть источники с нужными данными, но эти источники не потоковые. Здесь можно привести следующее ссылки: public-apis.
При решении задач, связанных с потоковыми данными, можно воспользоваться старым способом.
Скачать данные и отправить в поток.
Для примера можно воспользоваться следующим источником: imdb.
Следует отметить, что imdb предоставляет данные самостоятельно. См. IMDb Datasets. Но можно принять, что данные собранные напрямую содержат более актуальную информацию.
Язык: Java 1.8.
Библиотеки: kafka 2.6.0, jsoup 1.13.1.
Сбор данных
Сбор данных представляет из себя сервис, который по входным данным загружает html-страницы, ищет нужную информацию и преобразует в набор объектов.
Итак источник данных: imdb. Информация будет собираться о фильмах и будет использован следующий запрос: https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s
Где 1, 2 параметр – это даты. 3 параметр – страны.
Для лучшего понимания источника данных можно обратится к следующему ресурсу: imdb-extensive-dataset.
Интерфейс для сервиса:
public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}

Класс Movie – это класс, которые содержит информацию об одном фильме (или о шоу и т.п.).
class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;

Анализ данных на одной странице.
Информация собирается следующим образом. Данные закачиваются с помощью jsoup. Далее ищутся нужные html-элементы и трансформируются в экземпляры для фильмов.
String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}

Поиск ссылки на следующею страницу.
String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}

Тогда основной метод будет таким. Формируется начальная строка поиска. Закачиваются данные по одной странице. Если есть следующая страница, то идет переход к ней. По окончании передаются накопленные данные.
@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}

Подробности по остальным методам можно найти в ссылках на ресурсы.
Отправка данных в топик
Формируется следующий сервис: MovieProducer. Здесь будет один единственный публичный метод: run.
Создается продюсер для кафки. Загружаются данные из источника. Трансформируются и отправляются в топик.
public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            …
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}

Теперь все вместе
Формируются нужные параметры для поиска. Загружаются данные и отправляются в топик.
Для этого понадобится еще один класс: MovieDirectScrapingExecutor. С одним публичным методом: run.
В цикле создаются данные для поиска из текущей даты. Происходит загрузка и отправка данных в топик.
public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");
    while (true) {
        try {
            LocalDate localDate = LocalDate.now();
            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();
            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);
            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;
            String language = "en";
            String countries = countriesSource.get(countriesCounter);
            execute(language, startDate, endDate, countries);
            Thread.sleep(1000);
            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }
        } catch (InterruptedException e) {
        }
    }
}

Для запуска потребуется экземпляр класса MovieDirectScrapingExecutor, который можно запустить с нужными параметрами, к примеру, из метода main.
Пример отправляемых данных для одного фильма.
{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}

Подробности можно найти в ссылках на ресурсы.
Тесты
Для тестирования основной логики, которая связана с отправкой данных, можно воспользоваться юнит-тестами. В тестах предварительно создается kafka-сервер.
См. Apache Kafka и тестирование с Kafka Server.
Сам тест: MovieProducerTest.
public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);
            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(…)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();
            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
        …
            });
            Thread.sleep(5000);
        }
    }
}

Заключение
Конечно, описанный здесь способ получения источника потоковых данных, строго потоковым не является. Но для исследований и прототипов вполне может сойти.
Ссылки и ресурсы
Исходный код.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_testirovanie_itsistem (Тестирование IT-систем), #_java, #_apache, #_java, #_java_8, #_kafka, #_apache_kafka, #_junit, #_jsoup, #_scraping, #_streaming, #_testirovanie_itsistem (
Тестирование IT-систем
)
, #_java, #_apache
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 17:14
Часовой пояс: UTC + 5