[Open source, Apache, Big Data] Как писать свой процессор, или расширяем функционал в NiFi

Автор Сообщение
news_bot ®

Стаж: 2 года 8 месяцев
Сообщений: 46580

Создавать темы news_bot ® написал(а)
02-Июн-2020 14:36

Все большую популярность набирает NiFi и с каждым новым релизом он получает все больше инструментов для работы с данными. Тем не менее, может появиться необходимость в собственном инструменте для решения какой-то специфичной задачи.

оригинал
Apache Nifi имеет в базовой поставке более 300 процессоров.
NiFi Processor это основной строительный блок для создания dataflow в экосистеме NiFi. Процессоры предоставляют интерфейс, через который NiFi обеспечивает доступ к flowfile, его атрибутам и содержимому. Собственный кастомный процессор позволит сэкономить силы, время и внимание пользователей, так как вместо множества простейших элементов-процессоров будет отображаться в интерфейсе и выполняться всего один (ну или сколько напишете). Так же, как и стандартные процессоры, кастомный процессор позволяет выполнять различные операции и обрабатывать содержимое flowfile. Сегодня мы поговорим о стандартных инструментах для расширения функционала.
ExecuteScript
ExecuteScript – это универсальный процессор, который предназначен для реализации бизнес- логики на языке программирования (Groovy, Jython, Javascript, JRuby). Такой подход позволяет быстро получить нужную функциональность. Для обеспечения доступа к компонентам NiFi в скрипте есть возможность использовать следующие переменные:
Session: переменная типа org.apache.nifi.processor.ProcessSession. Переменная позволяет выполнять операции с flowfile, такими как create(), putAttribute() и Transfer(), а также read() и write().
Context: org.apache.nifi.processor.ProcessContext. Его можно использовать для получения свойств процессора, отношений, служб контроллера и StateManager.
REL_SUCCESS: Отношение «успех».
REL_FAILURE: Отношение «сбой»
Dynamic Properties: Динамические свойства, определенные в ExecuteScript, передаются механизму сценариев в виде переменных, установленные как объект PropertyValue. Это позволяет получить значение свойства, привести значение к соответствующему типу данных, например, логическому и т. д.
Для использования достаточно выбрать Script Engine и указать расположение файла Script File с нашим скриптом или сам скрипт Script Body.

Рассмотрим парочку примеров:
получить один файл потока из очереди
flowFile = session.get()
if(!flowFile) return

Сгенерировать новый FlowFile
flowFile = session.create()
// Additional processing here

Добавить атрибут к FlowFile
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Извлечь и обработать все атрибуты.
flowFile = session.get() if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}

Логгер
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

В ExecuteScript есть можно использовать расширенные возможности, подробнее об этом можно почитать в статье ExecuteScript Cookbook.
ExecuteGroovyScript
ExecuteGroovyScript имеет такой же функционал, что и ExecuteScript, но вместо зоопарка допустимых языков можно использовать только один – groovy. Главное преимущество этого процессора – это более удобное использование служб сервисов. Помимо стандартного набора переменных Session, Context и т.д. можно определить динамические свойства с префиксом CTL и SQL. Начиная с версии 1.11 появилась поддержка RecordReader и Record Writer. Все свойства представляют собой HashMap, у которого в качестве ключа используется «Имя сервиса», а значение – это конкретный объект в зависимости от свойства:
RecordWriter          HashMap<String,RecordSetWriterFactory>
RecordReader         HashMap<String,RecordReaderFactory>
SQL                       HashMap<String,groovy.sql.Sql>
CTL                       HashMap<String,ControllerService>

Данная информация уже облегчает жизнь, т.к. мы можем заглянуть в исходники или найти документацию по конкретному классу.
Работа с базой данных
Если определить свойство SQL.DB и связать DBCPService, то мы получим доступ к свойству из кода SQL.DB.rows('select * from table')

Процессор автоматически принимает соединение от службы dbcp перед выполнением и обрабатывает транзакцию. Транзакции базы данных автоматически откатываются при возникновении ошибки и фиксируются в случае успеха. В ExecuteGroovyScript можно перехватывать события start и stop, реализовав соответствующие статические методы.
import org.apache.nifi.processor.ProcessContext
...
static onStart(ProcessContext context){
// your code
}
static onStop(ProcessContext context){
// your code
}
REL_SUCCESS << flowFile

InvokeScriptedProcessor
Еще один интересный процессор. Для его использования нужно объявить класс, который реализовывает интерфейс implements, и определить переменную processor. Можно определить любые PropertyDescriptor или Relationship, также получить доступ к родительскому ComponentLog'у и определить методы void onScheduled(ProcessContext context) и void onStopped(ProcessContext context). Эти методы будут вызваны при наступлении события запуска по расписанию в NiFi (onScheduled) и при остановке (onScheduled).
class GroovyProcessor implements Processor {
          @Override
          void initialize(ProcessorInitializationContext context) { log = context.getLogger()
          }
          @Override
          void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr
          @Override
          Collection<ValidationResult> validate(ValidationContext context) { return null
          @Override
          PropertyDescriptor getPropertyDescriptor(String name) { return null  }
          @Override
          void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n
          @Override
          List<PropertyDescriptor> getPropertyDescriptors() { return null }
          @Override
          String getIdentifier() { return null }
}

Логику надо реализовать в методе void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throw s ProcessException {
            final ProcessSession session = sessionFactory.createSession(); def
            flowFile = session.create()
           if (!flowFile) return
                 // your code
           try
                 { session.commit();
           } catch (final Throwable t) {
                 session.rollback(true);
                 throw t;
           }
}

Описывать все методы, декларированные в интерфейсе излишне, так что обойдемся одним абстрактным классом, в котором объявим следующий метод:
abstract void executeScript(ProcessContext context, ProcessSession session)

Метод мы будем вызывать в
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)

import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.components.ValidationContext import org.apache.nifi.components.ValidationResult import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.ProcessSessionFactory import org.apache.nifi.processor.Processor
import org.apache.nifi.processor.ProcessorInitializationContext import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
abstract class BaseGroovyProcessor implements Processor {
  public ComponentLog log
  public Set<Relationship> relationships;
  @Override
  void initialize(ProcessorInitializationContext context) { log = context.getLogger()
  }
  @Override
  void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr final ProcessSession session = sessionFactory.createSession();
  try {
    executeScript(context, session);
    session.commit();
  } catch (final Throwable t) {
    session.rollback(true);
    throw t;
  }
}
  abstract void executeScript(ProcessContext context, ProcessSession session) thro
  @Override
  Collection<ValidationResult> validate(ValidationContext context) { return null }
  @Override
  PropertyDescriptor getPropertyDescriptor(String name) { return null  }
  @Override
  void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n
  @Override
  List<PropertyDescriptor> getPropertyDescriptors() { return null }
  @Override
  String getIdentifier() { return null }
}

Теперь объявим класс-наследник BaseGroovyProcessorи опишем наш executeScript, также добавим Relationship RELSUCCESS и RELFAILURE.
import org.apache.commons.lang3.tuple.Pair
import org.apache.nifi.expression.ExpressionLanguageScope import org.apache.nifi.processor.util.StandardValidators import ru.rt.nifi.common.BaseGroovyProcessor
import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException import org.quartz.CronExpression
import java.sql.Connection
import java.sql.PreparedStatement import java.sql.ResultSet
import java.sql.SQLException import java.sql.Statement
class InvokeScripted extends BaseGroovyProcessor {
  public static final REL_SUCCESS = new Relationship.Builder()
    .name("success")
    .description("If the cache was successfully communicated with it will be rou
    .build()
  public static final REL_FAILURE = new Relationship.Builder()
    .name("failure")
    .description("If unable to communicate with the cache or if the cache entry
    .build()
  @Override
  void executeScript(ProcessContext context, ProcessSession session) throws Proces   def flowFile = session.create()
    if (!flowFile) return
    try {
        // your code
      session.transfer(flowFile, REL_SUCCESS)
    } catch(ProcessException | SQLException e) {
      session.transfer(flowFile, REL_FAILURE)
      log.error("Unable to execute SQL select query {} due to {}. No FlowFile
    }
  }
}


В конец кода добавим processor = new InvokeScripted()
Такой подход похож на создание кастомного процессора.
Заключение
Создание кастомного процессора не самая простая вещь – в первый раз придется поднапрячься, чтобы разобраться, зато польза от этого действия неоспорима.
Пост подготовлен Командой управления данными Ростелекома
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_open_source, #_apache, #_big_data, #_nifi, #_apache, #_opensource, #_apache_nifi, #_groovy, #_jython, #_javascript, #_jruby, #_executescript, #_komanda_upravlenija_dannymi_rtk (Команда управления данными РТК), #_blog_kompanii_rostelekom (
Блог компании Ростелеком
)
, #_open_source, #_apache, #_big_data
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 29-Окт 14:51
Часовой пояс: UTC + 5