可视化编排的数据集成和分发开源框架Nifi轻松入门-上

发布时间 2023-03-22 19:06:53作者: itxiaoshen

@

概述

定义

Nifi 官网地址 https://nifi.apache.org/

Nifi 官网文档 https://nifi.apache.org/docs.html

Nifi GitHub源码地址 https://github.com/apache/nifi

Apache NiFi是一个易于使用、功能强大且可靠的系统,用于处理和分发数据,可以自动化管理系统间的数据流。最新版本为1.19.1

简单来说,NiFi是用来处理数据集成场景的数据分发。NiFi是基于Java的,使用Maven支持包的构建管理。 NiFi基于Web方式工作,后台在服务器上进行调度。用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

dataflow面临挑战

  • 系统失败:网络故障,磁盘故障,软件崩溃,人为犯错。
  • 数据访问超过了消费能力:有时,给定数据源的速度可能超过处理或交付链的某些部分,而只需要某一个环节出现问题,整个流程都会受到影响。
  • 超出边界问题:总是会得到太大、太小、太快、太慢、损坏、错误或格式错误的数据。
  • 现实业务或需求变化快:设计新的数据处理流程或者修改已有的数据处理流程必须要够敏捷。
  • 难以在测试环境模拟生产环境数据。

特性

Apache NiFi支持数据路由、转换和系统中介逻辑的强大且可伸缩的有向图。

  • 基于浏览器的用户界面:设计、控制、反馈和监控的无缝体验。
  • 数据来源跟踪:完整从开始到结束跟踪信息。
  • 丰富的配置
    • 容错和保证交付
    • 低延迟,高吞吐量
    • 动态优先级
    • 流配置的运行时修改
    • 背压控制
  • 可扩展的设计
    • 定制处理器和服务的组件体系结构
    • 快速开发和迭代测试
  • 安全通信
    • HTTPS,具有可配置的身份验证策略
    • 多租户授权和策略管理
    • 用于加密通信的标准协议,包括TLS和SSH

核心概念

  • FlowFile:表示在系统中移动的每个对象,对于每个对象,NiFi跟踪键/值对属性字符串的映射及其零或多字节的相关内容。
    • 每一块“用户数据”(即用户带入NiFi进行处理和分发的数据)都被称为一个FlowFile。
    • 一个FlowFile由两部分组成:属性和内容。内容就是用户数据本身。属性是与用户数据相关联的键值对
  • FlowFile Processor:处理器实际执行工作,处理器是在系统之间进行数据路由、转换或中介的某种组合。处理器可以访问给定的FlowFile及其内容流的属性。处理器可以在给定的工作单元中操作零个或多个flowfile,并提交该工作或回滚。
    • 处理器是NiFi组件,负责创建、发送、接收、转换、路由、拆分、合并和处理流文件。它是NiFi用户用于构建数据流的最重要的构建块。
  • Connection:连接提供处理器之间的实际链接。充当队列,允许各种进程以不同的速率进行交互。这些队列可以动态地划分优先级,并且可以设置负载上限,从而启用背压。
  • Flow Controller:流控制器维护进程如何连接,并管理所有进程使用的线程及其分配。流控制器充当了促进处理器之间流文件交换的代理。
  • Process Group:进程组是一组特定的进程及其连接,这些进程可以通过输入端口接收数据,通过输出端口发送数据。通过这种方式,流程组允许通过简单地组合其他组件来创建全新的组件。

这种设计模型帮助NiFi成为构建强大且可伸缩的数据流的非常有效的平台,其好处如下:

  • 很好地用于处理器有向图的可视化创建和管理。
  • 本质上是异步的,允许非常高的吞吐量和自然缓冲,即使处理和流速率波动。
  • 提供了一个高度并发的模型,开发人员不必担心并发性的典型复杂性。
  • 促进内聚和松散耦合组件的开发,这些组件可以在其他上下文中重用,并促进可测试单元的开发。
  • 资源受限的连接使得诸如回压和压力释放等关键功能非常自然和直观。
  • 错误处理变得像快乐之路一样自然,而不是粗粒度的一刀切。
  • 数据进入和退出系统的点以及它如何流经系统都很容易理解和跟踪。

架构

NiFi的设计目的是充分利用它所运行的底层主机系统的功能,对IO、CPU、RAM高效使用,这种资源最大化在CPU和磁盘方面表现得尤为突出,详细信息在管理指南中的最佳实践和配置技巧中。

image-20221213150613979

NiFi在主机操作系统上的JVM中执行,JVM上NiFi的主要组件如下:

  • Web Server:web服务器的目的是承载NiFi基于http的命令和控制API。
  • Flow Controller:流量控制器是操作的大脑,它为要运行的扩展提供线程,并管理扩展何时接收要执行的资源的调度。
  • Extensions:各种类型的NiFi扩展,这里的关键点是扩展在JVM中操作和执行。
  • FlowFile Repository:流文件存储库是NiFi跟踪当前流中活动的给定流文件状态的地方。存储库的实现是可插入的。默认方法是位于指定磁盘分区上的持久预写日志。
  • Content Repository:内容存储库是一个给定的FlowFile的实际内容字节所在的地方。存储库的实现是可插入的。默认的方法是一种相当简单的机制,即在文件系统中存储数据块。可以指定多个文件系统存储位置,以便使用不同的物理分区,以减少任何单个卷上的争用。
  • Provenance Repository:源头存储库是存储所有源头事件数据的地方。存储库结构是可插入的,默认实现是使用一个或多个物理磁盘卷。在每个位置中,事件数据都被索引并可搜索。

NiFi也能够在集群中运行,NiFi 采用了零领导者集群,NiFi集群中的每个节点在数据上执行相同的任务,但每个节点操作不同的数据集。Apache ZooKeeper选择一个节点作为Cluster Coordinator,故障转移由ZooKeeper自动处理。所有集群节点都向集群协调器报告心跳和状态信息。集群协调器负责断开和连接节点。此外,每个集群都有一个主节点,也由ZooKeeper选举产生。作为DataFlow管理器,可通过任何节点的用户界面(UI)与NiFi集群交互,操作更改复制到集群中的所有节点,允许多个入口点。

image-20221213151506874

高级概述

Nifi高级概述包括流管理、易用性、安全性、可扩展的体系结构和灵活的伸缩模型。

  • 流量管理
    • 保证交付:NiFi的核心理念是,即使规模非常大,也必须保证交付。这是通过有效使用专门构建的持久预写日志和内容存储库来实现的。它们一起被设计成这样一种方式,允许非常高的事务率、有效的负载分散、写时复制,并发挥传统磁盘读/写的优势。
    • 带背压和压力释放的数据缓冲:NiFi支持对所有排队的数据进行缓冲,并在这些队列达到指定的限制时提供回压,或者在数据达到指定的年龄(其值已经消亡)时使其老化。
    • 优先队列:NiFi允许为如何从队列中检索数据设置一个或多个优先级方案。默认情况下是最早的先提取,但有时应该先提取最新的数据,先提取最大的数据,或者其他一些自定义方案。
    • 特定于流的QoS(延迟v吞吐量,损失容忍度等):在数据流中,有些点的数据是绝对关键的,并且是不能容忍损失的,需要实时处理和交付,才能具有任何价值,NiFi支持这些关注点的细粒度流特定配置。
  • 易用性
    • 可视化管理:数据流可能变得相当复杂。能够可视化这些流程并以可视化的方式表达它们可以极大地帮助降低复杂性,并确定需要简化的区域。NiFi不仅可以可视化地建立数据流,而且可以实时地实现。对数据流进行更改立即生效。
    • 流模板:数据流往往是高度面向模式的,模板允许主题专家构建和发布他们的流设计,并让其他人从中受益和协作。
    • 数据源:当对象流经系统时,NiFi自动记录、索引并提供来源数据,即使是在扇入、扇出、转换等过程中也是如此。这些信息对于支持遵从性、故障排除、优化和其他场景非常重要。
    • 恢复/记录细粒度历史的滚动缓冲区:NiFi的内容存储库被设计成历史的滚动缓冲区。数据只有在内容存储库老化或需要空间时才会被删除。这与数据来源功能相结合,形成了一个非常有用的基础,可以在对象生命周期(甚至可以跨越几代)的特定点上实现点击内容、下载内容和重播。
  • 安全
    • 系统到系统:数据流需要安全保障,数据流中的每个点上的NiFi通过使用带有加密协议(如2-way SSL)提供安全交换。此外,NiFi使流能够加密和解密内容,并在发送方/接收方等式的任何一方使用共享密钥或其他机制。
    • 系统用户:NiFi支持双向SSL身份验证,并提供可插拔授权,以便在特定级别(只读、数据流管理器、管理)正确控制用户的访问。如果用户将敏感属性(如密码)输入到流中,它将立即在服务器端加密,并且即使以加密形式也不会再次在客户端公开。
    • 多租户授权:给定数据流的权限级别应用于每个组件,允许admin用户拥有细粒度级别的访问控制。这意味着每个NiFi集群都能够处理一个或多个组织的需求。与孤立的拓扑相比,多租户授权支持数据流管理的自助服务模型,允许每个团队或组织管理流,同时充分了解他们无法访问的其余流。
  • 可扩展体系结构
    • 扩展:NiFi的核心是为扩展而构建的,因此它是一个平台,数据流进程可以在其上以可预测和可重复的方式执行和交互。扩展点包括:处理器、控制器服务、报告任务、优先级和客户用户界面。
    • 类加载器隔离:对于任何基于组件的系统,依赖关系问题都可能很快发生。NiFi通过提供自定义类加载器模型来解决这个问题,确保每个扩展包只暴露给非常有限的一组依赖项。
    • 点到点通信协议:NiFi实例之间的首选通信协议是NiFi Site-to-Site (S2S)协议。S2S可以轻松地将数据从一个NiFi实例传输到另一个NiFi实例,轻松、高效、安全。NiFi客户端库可以很容易地构建并捆绑到其他应用程序或设备中,通过S2S与NiFi通信。在S2S中,基于套接字的协议和HTTP(S)协议都被支持作为底层传输协议,这使得在S2S通信中嵌入代理服务器成为可能。
  • 灵活缩放模型
    • 水平扩展(聚类):NiFi被设计为通过使用如上所述的群集多个节点来向外扩展。如果将单个节点配置为每秒处理数百MB,则可以将普通集群配置为每秒处理GB。
    • 扩缩容:NiFi还被设计成以非常灵活的方式扩大和缩小,从NiFi框架的角度来看,在配置时,可以在Scheduling选项卡下增加处理器上并发任务的数量。

安装

部署

# 下载最新版本1.19.1的nifi
wget --no-check-certificate https://dlcdn.apache.org/nifi/1.19.1/nifi-1.19.1-bin.zip
# 由于下载很慢我就直接下载源码安装了,最低建议JDK 11.0.16、Apache Maven 3.8.6,最新需求是JDK 8 Update 251Apache Maven 3.6.0
wget 
# 解压源码包
tar -xvf nifi-1.19.1.tar.gz
# 进入源码根目录
cd nifi-rel-nifi-1.19.1

image-20221213162425748

# 执行编译命令
mvn clean install -DskipTests

等待编译完成

image-20221213172354202

编译好的目录和包目录如下

image-20221213172526037

# 复制编译好的安装包nifi-1.19.1-bin.zip
cp -rf nifi-1.19.1-bin.zip /home/commons/
cd /home/commons/
# 解压编译好的安装包
unzip nifi-1.19.1-bin.zip
# 进入安装目录
cd nifi-1.19.1

nifi主要配置文件在conf/nifi.properties,默认的https的端口为8443,修改host为本机IP地址

image-20221213173827957

# 启动nifi
./bin/nifi.sh start
# 得等一小会时间后查看nifi进程状态
./bin/nifi.sh status
# 查看授权的密码信息
grep Generated logs/nifi-app*log
# 可以使用自定义凭证替换随机用户名和密码,使用如下命令
./bin/nifi.sh set-single-user-credentials <username> <password>
# 其他命令如下,停止nifi ./bin/nifi.sh stop,重启nifi./bin/nifi.sh restart

image-20221213173159977

在web浏览器中打开以下链接以访问NiFi:https://192.168.50.95:8443/nifi ,看到登录页面后输入上面的用户名和密码就可以进入nifi的首页。

image-20221213174051210

常见处理器

想到创建数据流必须了解可供使用的处理器类型,NiFi包含许多开箱即用的不同处理器,这些处理器提供了从许多不同系统摄取数据、路由、转换、处理、分割和聚合数据以及将数据分发到许多系统的功能。几乎在每一个NiFi发行版中,可用的处理器数量都会增加。因此将不尝试为每个可用的处理器命名,下面重点介绍一些最常用的处理器,并根据它们的功能对它们进行分类。

  • 数据转换
    • CompressContent:压缩或解压缩内容。
    • ConvertCharacterSet:将用于对内容进行编码的字符集转换为另一个字符集。
    • EncryptContent:加密或解密内容。
    • ReplaceText:使用正则表达式修改文本内容。
    • TransformXml:对XML内容应用XSLT转换。
    • JoltTransformJSON:应用JOLT规范转换JSON内容
  • 路由和中介
    • ControlRate:限制数据通过流的一部分的速率。
    • DetectDuplicate:基于一些用户定义的标准,监控重复的flowfile。通常与HashContent一起使用。
    • DistributeLoad:通过仅将一部分数据分发到每个用户定义的关系来实现负载平衡或示例数据。
    • MonitorActivity:当用户定义的一段时间过去了,没有任何数据通过流中的特定点时,发送一个通知。可以选择在数据流恢复时发送通知。
    • RouteOnAttribute:基于属性th的路由流文件。
    • ScanAttribute:扫描FlowFile上的用户定义属性集,检查是否有任何属性与用户定义字典中的术语匹配。
    • RouteOnContent:搜索FlowFile的内容,看它是否匹配任何用户定义的正则表达式。如果是,则将FlowFile路由到配置的Relationship。
    • ScanContent:根据用户定义的字典和路由中存在或不存在的术语搜索FlowFile的内容。字典可以由文本项或二进制项组成。
    • ValidateXml:根据XML模式验证XML内容;根据用户定义的XML模式,根据FlowFile的内容是否有效来路由FlowFile。
  • 数据库访问
    • ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后传递给PutSQL处理器。
    • ExecuteSQL:执行用户定义的SQL SELECT命令,将结果以Avro格式写入FlowFile。
    • PutSQL:通过执行由FlowFile内容定义的SQL DDM语句来更新数据库。
    • SelectHiveQL:对Apache Hive数据库执行用户自定义的HiveQL SELECT命令,将结果以Avro或CSV格式写入FlowFile。
    • PutHiveQL:通过执行由FlowFile的内容定义的HiveQL DDM语句来更新Hive数据库。
  • 属性提取
    • EvaluateJsonPath:用户提供JSONPath表达式(类似于XPath,用于XML解析/提取),然后根据JSON内容计算这些表达式,以替换FlowFile内容或将值提取到用户命名的属性中。
    • EvaluateXPath:用户提供XPath表达式,然后根据XML内容计算这些表达式,以替换FlowFile内容或将值提取到用户命名的属性中。
    • EvaluateXQuery:用户提供一个XQuery查询,然后根据XML内容计算该查询,以替换FlowFile内容或将值提取到用户命名的属性中。
    • ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容计算正则表达式,然后将提取的值作为用户命名的属性添加。
  • 系统交互
    • ExecuteProcess:执行用户自定义的操作系统命令。流程的StdOut被重定向,这样写入StdOut的内容就变成了出站FlowFile的内容。这个处理器是一个源处理器——它的输出预计会生成一个新的FlowFile,而系统调用预计不会接收任何输入。为了向流程提供输入,请使用ExecuteStreamCommand处理器。
    • ExecuteStreamCommand:执行用户自定义的操作系统命令。FlowFile的内容可选地流到进程的StdIn中。写入StdOut的内容成为出站FlowFile的内容。
  • 数据摄取
    • GetFile:将本地磁盘(或网络连接磁盘)中的文件内容流到NiFi中,然后删除原始文件。此处理器预计将文件从一个位置移动到另一个位置,而不是用于复制数据。
    • GetFTP:通过FTP将远程文件的内容下载到NiFi,然后删除原始文件。此处理器预计将数据从一个位置移动到另一个位置,而不是用于复制数据。
    • GetHDFS:监控HDFS中用户指定的目录。每当有新文件进入HDFS时,它就会被复制到NiFi中,然后从HDFS中删除。此处理器预计将文件从一个位置移动到另一个位置,而不是用于复制数据。如果在集群中运行,这个处理器也只能在主节点上运行。为了从HDFS复制数据并保留数据,或者从集群中的多个节点传输数据,请参阅ListHDFS处理器。
    • GetKafka:从Apache Kafka中获取消息,特别是对于0.8。x版本。消息可以作为每条消息的FlowFile发出,也可以使用用户指定的分隔符将消息批处理在一起。
    • GetMongo:对MongoDB执行用户指定的查询,并将内容写入新的FlowFile。
  • 数据发送
    • PutFile:将FlowFile的内容写入本地(或网络连接)文件系统上的目录。
    • PutFTP:将FlowFile的内容复制到远程FTP服务器。
    • PutKafka:将FlowFile的内容作为消息发送给Apache Kafka,特别是0.8。x版本。FlowFile可以作为单个消息或分隔符发送,例如可以指定一个新行,以便为单个FlowFile发送多个消息。
    • PutMongo:将FlowFile的内容作为INSERT或UPDATE发送到Mongo。
  • 拆分和聚合
    • SplitText:SplitText接收一个包含文本内容的FlowFile,并根据配置的行数将其拆分为1个或多个FlowFile。例如,处理器可以被配置为将一个FlowFile分割成许多个FlowFile,每个FlowFile只有1行。
    • SplitJson:允许用户将一个由数组或许多子对象组成的JSON对象拆分为每个JSON元素的FlowFile。
    • MergeContent:这个处理器负责将多个FlowFile合并为一个FlowFile。可以通过将它们的内容连同可选的页眉、页脚和分界符连接在一起,或者通过指定归档格式(如ZIP或TAR)来合并flowfile。

入门示例

我们使用演示一个从本地源文件夹拷贝到本地目的文件夹,主要使用到GetFile文件数据摄取处理器和PutFile文件发送处理器。

GetFile文件数据摄取处理器,详细属性可以在官方文档https://nifi.apache.org/docs.html的左边处理器菜单下找,例如GetFile处理器,从目录中的文件创建FlowFiles,NiFi将忽略它至少没有读权限的文件

image-20221213170135069

这里我们使用默认参数,主要配置输入目录,添加一个GetFile处理器

image-20221213174308391

"设置"中填写名称为my-first-get-file,属性填写输入目录。

image-20221213175332743

PutFile文件数据摄取处理器,将FlowFile的内容写入本地文件系统,详细属性可直接查阅官方文档

image-20221213171024225

添加一个PutFile处理器,"设置"中填写名称为my-first-put-file,属性填写目录

image-20221213175630748

# 创建上传文件目录,如果没有创建在my-first-put-file的会有感叹号提示信息
mkdir /home/commons/data/nifi/input

从my-first-get-file上点击拉动到my-first-put-file处理器形成连接,连接名称为first-connection

image-20221213180217975

为my-first-put-file设置终止关联关系

image-20221213181148267

分别点击my-first-get-file和my-first-put-file启动按钮,启动两个的处理器

image-20221213181348512

# 手工写入数据文件
echo "hello nifi" >> /home/commons/data/nifi/input/nifi.log

查看nifi上可以看到数据文件有复制数据

image-20221213181637680

查看本地的output文件夹下也有上面手工写入后转移的nifi.log文件数据(由于PutFile创建缺失的目录默认属性设置是true,也即是会自动创建目录)

image-20221213181723178

本地input文件转移就没有文件,重新执行上面写入一个重名的文件

echo "hello nifi" >> /home/commons/data/nifi/input/nifi.log

由于PutFile冲突解决的策略默认为false,所以同名文件不会放到输出目录下,就直接在页面出现警告信息,可设置为true就不会有警告信息了

image-20221213182149359

本篇只是简单入门,nifi的功能非常强大,针对数据采集和数据集成场景需求可以满足大多数的场景

本人博客网站IT小神 www.itxiaoshen.com