Apache NiFi对接FusionInsight

适用场景

Apache NiFi 1.9.2 ↔ FusionInsight HD 6.5 (HDFS/HBase/Hive/Spark/Kafka/Hetu)

Apache NiFi 1.12.0 ↔ FusionInsight MRS 8.0 (HDFS/HBase/Hive/Spark/Kafka/Hetu)

MRS 8.0 对接说明

说明:对接FusionInsight MRS 8.0需替换jar包如下

hive:
替换路径:/opt/nifi/nifi-1.12.0/work/nar/extensions/nifi-hive-nar-1.12.0.nar-unpacked/NAR-INF/bundled-dependencies
需要把zookeeper-3.5.6-hw-ei-302002.jar,zookeeper-jute-3.5.6-hw-ei-302002.jar拷贝到该路径,把原来的zookeeper-3.4.6.jar注释掉

hbase:
替换路径:/opt/nifi/nifi-1.12.0/work/nar/extensions/nifi-hbase_2-client-service-nar-1.12.0.nar-unpacked/NAR-INF/bundled-dependencies
需要把zookeeper-3.5.6-hw-ei-302002.jar,zookeeper-jute-3.5.6-hw-ei-302002.jar拷贝到该路径,把原来的zookeeper-3.4.10.jar注释掉
配置hbase需要选用HBase_2_ClientService

kafka:
替换路径:/opt/nifi/nifi-1.12.0/work/nar/extensions/nifi-kafka-2-0-nar-1.12.0.nar-unpacked/NAR-INF/bundled-dependencies
需要把kafka-clients-2.4.0-hw-ei-302002.jar导入到该路径下,并把原来的kafka-clients-2.0.0.jar注释掉
需要把zookeeper-3.5.6-hw-ei-302002.jar拷贝到该路径

安装Apache NiFi

环境:172.16.2.121

操作场景

安装Apache NiFi 1.9.2

前提条件

  • 已完成FusionInsight HD和客户端的安装。

操作步骤

  • 执行source命令到客户端,获取java配置信息
    source /opt/hadoopclient/bigdata_env
    echo $JAVA_HOME
    

20200703_145508_96

  • 安装NiFi,在网址https://nifi.apache.org/download.html下载安装包,使用WinSCP导入主机并用命令unzip nifi-1.9.2-bin.zip解压安装生成nifi-1.9.2目录,安装目录为/opt/nifi/nifi-1.9.2

  • 执行vi /opt/nifi/nifi-1.9.2/conf/nifi.properties配置NiFi服务器ip和端口如下:

    nifi.web.http.host=172.16.2.121
    nifi.web.http.port=8085
    

    20200703_150226_61

  • 启动和停止NiFi

    cd /opt/nifi/nifi-1.9.2
    bin/nifi.sh start
    bin/nifi.sh stop
    

    20200703_150434_63

  • 运行NiFi

    bin/nifi.sh start

    20200703_150655_19

NiFi配置Kerberos认证

操作场景

NiFi配置并保存Kerberos认证信息,供以后使用

前提条件

  • 已经完成Nifi 1.9.2的安装

  • 已完成FusionInsight HD和客户端的安装并创建测试用户developuser (参考产品文档->应用开发指南->安全模式->安全认证)

操作步骤

  • 在FusionInsight HD Manager上下载认证用户的配置文件user.keytabkrb5.conf,并一起存入路径/opt/developuser

  • 执行命令vi /opt/nifi/nifi-1.9.2/conf/nifi.properties配置Kerberos认证

    具体配置:
    nifi.kerberos.krb5.file=/opt/developuser/krb5.conf
    nifi.kerberos.service.principal=developuser
    nifi.kerberos.service.keytab.location=/opt/developuser/user.keytab
    

    20200703_151258_33

  • 重启NiFi

  • 登录NiFi网页界面,右键选择**Configure**

    20200703_151508_51

    点击**加号**按钮添加服务

    20200703_151604_13

    选择KeytabCredentialsService,点击**ADD**添加

    20200703_151645_95

    点击**齿轮**图标进行配置

    20200703_151719_97

    20200703_151804_61

    点击**闪电**图标生效并保存KeytabCredentialsService

    20200703_152008_49

    20200703_151926_36

  • 完成

    20200703_152039_61

NiFi连接HDFS

操作场景

NiFi中配置HDFS相关处理器,对接HDFS

前提条件

  • 已经完成NiFi 1.9.2的安装

  • 已完成FusionInsight HD和客户端的安装,包含HDFS组件

  • 已完成 NiFi Kerberos认证配置

PutHDFS 操作步骤

  • 将FusionInsight HD客户端中关于HDFS的配置文件hdfs-site.xmlcore-site.xml导入路径/opt/nifi/nifi-1.9.2/conf

  • 修改hdfs-site.xml内容,删除如下配置项

    <property>
    <name>dfs.client.failover.proxy.provider.hacluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.AdaptiveFailoverProxyProvider</value>
    </property>
    

  • 修改core-site.xml内容,修改如下配置项中hacluster改为namenode主节点ip加端口号

    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://172.16.4.123:25000</value>
    </property>
    

  • 将FI客户端的hadoop-plugins-1.0.jar拷贝到nifi hadoop相关依赖路径

    cp /opt/125_651hdclient/hadoopclient/HDFS/hadoop/share/hadoop/common/lib/hadoop-plugins-1.0.jar /opt/nifi/nifi-1.9.2/work/nar/extensions/nifi-hadoop-nar-1.9.2.nar-unpacked/NAR-INF/bundled-dependencies

    说明:否则会报依赖错误

  • 整个过程的流程图所示:

20200703_154937_73

  • 处理器GetFile的配置如下:

20200703_155019_42

具体配置:
1: /home/dataset
- 处理器PutHDFS配置如下

20200703_155105_46

20200703_155146_35

具体配置:
1: /opt/nifi/nifi-1.9.2/conf/hdfs-site.xml,/opt/nifi/nifi-1.9.2/conf/core-site.xml
2: 选择NiFi配置Kerberos认证这一节中创建的 KeytabCredentialsService
3: /tmp/nifitest
  • 两个处理器的连接配置如下:

20200703_155330_74

  • 测试前将测试文件nifiHDFS.csv放入路径/home/dataset

20200703_155450_69

文件内容如下:

1;EcitQU
2;Hyy6RC
3;zju1jR
4;R9fex9
5;EU2mVq
- 测试后

20200703_161040_42

登录集群HDFS文件系统查看测试结果

hdfs dfs -cat /tmp/nifitest/nifiHDFS.csv

20200703_161511_96

GetHDFS 操作步骤

  • 整个过程的流程如图所示:

20200703_164231_28

  • 处理器GetHDFS配置如下

20200703_164512_54

具体配置:
1: /opt/nifi/nifi-1.9.2/conf/hdfs-site.xml,/opt/nifi/nifi-1.9.2/conf/core-site.xml
2: 选择NiFi配置Kerberos认证这一节中创建的 KeytabCredentialsService
3: /tmp/nifitest/HDFS
- 处理器PutFile配置如下

20200703_164612_26

具体配置:
1: /home/dataset/HDFS
  • 测试后

20200703_164756_32

登录安装 FusionInsight HD客户端主机路径/home/dataset/HDFS查看结果

20200703_164828_36

NiFi连接Hive

操作场景

NiFi中配置JDBC解析器,对的FI HD Hive接口

前提条件

  • 已经完成NiFi 1.9.2的安装

  • 已完成FusionInsight HD和客户端的安装,包含Hive组件

  • 已完成 NiFi Kerberos认证配置

HiveConnectionPool 配置操作步骤

  • 登录NiFi网页界面,右键选择**Configure**

  • 点击**加号**按钮添加服务

  • 选择HiveConnectionPool,点击**ADD**添加

20200703_170414_24

  • 点击**齿轮**图标进行配置

20200703_170503_83

20200703_170810_95

具体配置为
1: jdbc:hive2://172.16.4.121:24002,172.16.4.122:24002,172.16.4.123:24002/;serviceDiscoveryMode=zooKeeper;principal=hive/hadoop.hadoop.com@HADOOP.COM
2: KeytabCredentialsService
- 点击**闪电**图标点击闪电图标生效并保存HiveConnectionPool

20200703_172644_40

20200703_172719_10

  • 完成

20200703_172742_43

  • 在路径/opt/nifi/nifi-1.9.2/conf下创建jaas.conf文件内容如下:
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/opt/developuser/user.keytab"
    principal="developuser"
    useTicketCache=false
    storeKey=true
    debug=true;
    };
    
  • 执行命令vi /opt/nifi/nifi-1.9.2/conf/bootstrap.conf配置bootstrap.conf文件如下:

20200703_173151_16

java.arg.17=-Djava.security.auth.login.config=/opt/nifi/nifi-1.9.2/conf/jaas.conf
java.arg.18=-Dsun.security.krb5.debug=false
  • 执行命令vi /opt/nifi/nifi-1.9.2/conf/nifi.properties配置nifi.properties文件如下:

20200703_173408_12

nifi.zookeeper.auth.type=sasl
nifi.zookeeper.kerberos.removeHostFromPrincipal=true
nifi.zookeeper.kerberos.removeRealmFromPrincipal=true
  • 执行命令cd /opt/nifi/nifi-1.9.2/work/nar/extensions/nifi-hive-nar-1.9.2.nar-unpacked/NAR-INF/bundled-dependencies到NiFi Hive类库中,将原有的zookeeper-3.4.6.jar替换为FusionInsight HD客户端中的zookeeper-3.5.1.jar

20200703_173735_37

SelectHiveQL 读取Hive表 操作步骤

  • 整个过程的流程如图所示:

20200706_160341_65

  • 处理器SelectHiveQL配置如下:

20200706_160541_24

20200706_160619_96

 具体配置如下:
 1: HiveConnectionPool
 2: select * from default.student
 3. CSV
  • 处理器PutFile配置如下:

20200706_160820_97

  • 运行前登录集群查看hive表student:

20200706_160906_10

  • 运行后:

20200706_163851_10

登录路径/home/dataset/HIVE查看结果:

20200706_163924_56

PutHiveQL 整表导入 操作步骤

  • 整个过程的流程如图所示:

20200706_165144_36

  • 处理器GetFile的配置如下

20200706_165223_72

  具体配置如下:
  1: /home/dataset/
  2: iris.txt

数据文件iris.txt的内容如下:

1,5.1,3.5,1.4,0.2,setosa
2,4.9,3,1.4,0.2,setosa
3,4.7,3.2,1.3,0.2,setosa
4,4.6,3.1,1.5,0.2,setosa
5,5,3.6,1.4,0.2,setosa
6,5.4,3.9,1.7,0.4,setosa
7,4.6,3.4,1.4,0.3,setosa
8,5,3.4,1.5,0.2,setosa
9,4.4,2.9,1.4,0.2,setosa
10,4.9,3.1,1.5,0.1,setosa

  • 处理器PutHDFS的配置如下

20200706_165308_97

  具体配置如下:
  1: /opt/nifi/nifi-1.9.2/conf/hdfs-site.xml,/opt/nifi/nifi-1.9.2/conf/core-site.xml
  2: KeytabCredentialsService
  3: /tmp/nifitest/loadhive
  • 处理器ReplaceText配置如下

20200706_165421_62

  具体配置如下:
  1: CREATE TABLE IF NOT EXISTS iris_createdBy_NiFi ( ID string, sepallength FLOAT, sepalwidth FLOAT, petallength FLOAT, petalwidth FLOAT, species string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;LOAD DATA INPATH "hdfs:///tmp/nifitest/loadhive/iris.txt" into table iris_createdBy_NiFi;
  • 处理器PutHiveQL配置如下

20200706_165505_36

  • 运行前将数据文件iris.txt导入路径/home/dataset/

20200706_165609_11

  • 运行后:

20200706_170022_60

登录HIVE查看测试结果:

20200706_170036_69

NiFi连接HBase

操作场景

NiFi中配置HBase解析器,对的FI HD HBase接口

前提条件

  • 已经完成NiFi 1.9.2的安装

  • 已完成FusionInsight HD和客户端的安装,包含HBase组件

  • 已完成 NiFi Kerberos认证配置

HBase_1_1_2_ClientService 配置操作步骤

  • 将FusionInsight HD客户端中关于HBase的配置文件hbase-site.xml导入路径/opt/nifi/nifi-1.9.2/conf

  • 更换路径/opt/nifi/nifi-1.9.2/work/nar/extensions/nifi-hbase_1_1_2-client-service-nar-1.9.2.nar-unpacked/NAR-INF/bundled-dependencies下面的zookeeper-3.4.6.jar为FusionInsight HD客户端自带的zookeeper-3.5.1.jar

    20200714_114611_68

  • 登录NiFi网页界面,右键选择**Configure**

    20200714_114833_84

  • 点击**加号**按钮添加服务

    20200714_114935_76

  • 选择HBase_1_1_2_ClientService,点击**ADD**添加

    20200714_115020_88

  • 点击**齿轮**图标进行配置

    20200714_115405_27

    20200714_115222_37

  具体配置如下:
  1: /opt/nifi/nifi-1.9.2/conf/hbase-site.xml,/opt/nifi/nifi-1.9.2/conf/core-site.xml
  2: KeytabCredentialsService
  • 点击**闪电**图标点击闪电图标生效并保存HBase_1_1_2_ClientService

    20200714_115636_52

    20200714_115656_25

  • 完成

    20200714_115733_18

PutHBaseJSON 向HBase导入表

  • 整个过程的流程如图所示:

20200714_142654_43

  • 处理器GetFile的配置如下:

20200714_141810_29

数据文件hbase_test.csv的内容如下:

1,5.1,3.5,setosa
2,6.1,3.6,versicolor
3,7.1,3.7,virginica

  • 处理器InverAvroSchema配置如下:

20200714_142029_50

  具体配置如下:
  1: flowfile-attribute
  2: csv
  3: false
  4: hbase_test_data
  • 处理器ConvertCSVToAvro配置如下:

20200714_142249_97

${inferred.avro.schema}

  • 处理器ConvertAvroToJSON配置如下:

20200714_142359_10

  • 处理器SplitJson配置如下:

20200714_142440_23

  • 处理器PutHBaseJSON配置如下:

20200714_142621_98

  具体配置如下:
  1: HBase_1_1_2_ClientService
  2: hbase_test
  3: ${UUID()}
  4: data
  • 测试前需要将数据文件hbase_test.csv导入路径/home/dataset/HBASE

20200714_142733_61

并且需要在集群里面建一个hbase表,执行命令

hbase shell
create 'HBase_test','data'
  • 运行后:

20200714_142911_49

登录集群查看结果:

20200714_143101_10

GetHbase 操作步骤

  • 整个过程的流程如图所示:

20200714_143508_61

  • 驱动器GetHBase的配置如下:

20200714_143555_30

  • 驱动器PutFile的配置如下:

20200714_143618_96

  • 测试后

20200714_143802_57

登录到路径/home/dataset/GetHBase_test查看结果:

20200714_143925_73

NiFi连接Spark

操作场景

NiFi中配置Livy解析器,对的FI HD HBase接口

前提条件

  • 已经完成NiFi 1.7.1的安装

  • 已完成FusionInsight HD和客户端的安装,包含Spark2x组件

  • 已完成 NiFi Kerberos认证配置

  • 已完成Apache Livy 0.5.0的安装 (Livy可安装在FI HD客户端主机,也可以安装在其他主机但是需要保证安装Livy主机能够和FI HD客户端主机以及集群网络互通)

可参考《Apache Livy对接FusionInsight》对接文档完成Apache Livy的安装

配置LivySessionController操作步骤

  • 登录NiFi网页界面,右键选择**Configure**

  • 点击**加号**按钮添加服务

  • 选择LivySessionController,点击**ADD**添加

  • 点击**齿轮**图标进行配置

具体配置如下:
1: 172.21.3.43 (已安装Apache Livy的主机ip)
2: 8998 (Livy默认端口,可更改)
3: spark
4:KeytabCredentialsService
  • 继续点击**加号**按钮添加服务

  • 选择LivySessionController,点击**ADD**添加

  • 点击**齿轮**图标进行配置

更改Controller名字为 LivySessionController_PySpark

具体配置如下:
1: 172.21.3.43 (已安装Apache Livy的主机ip)
2: 8998 (Livy默认端口,可更改)
3: pysaprk
4:KeytabCredentialsService
  • 继续点击**加号**按钮添加服务

  • 选择LivySessionController,点击**ADD**添加

  • 点击**齿轮**图标进行配置

更改Controller名字为 LivySessionController_SparkR

具体配置如下:
1: 172.21.3.43 (已安装Apache Livy的主机ip)
2: 8998 (Livy默认端口,可更改)
3: sparkr
4:KeytabCredentialsService
  • 点击**闪电**图标选择Service and referencing components生效并保存LivySessionController,LivySessionController_PySpark,LivySessionController_SparkR

  • 完成

运行Spark样例操作步骤

  • 整个过程的流程如图所示:

  • 处理器GetFile配置如下:

具体配置如下:
1: /home/dataset/sparkTest
2: code1.txt

代码内容文件code1.txt的内容如下:

1+2

  • 处理器ExtractText配置如下:

需要点击**加号**按钮,Property项命名为code1,Value项赋值为$

  • 处理器ExecuteSparkInteractive配置为:

具体配置如下:
1: LivySessionController
2: ${code1}
  • 测试前将代码文件code1.txt上传至安装nifi主机的路径/home/dataset/sparkTest下:

在已安装Livy的主机上启动Livy:

  • 测试后:

登录Livy sever查看测试结果

运行PySpark样例操作步骤

  • 整个过程的流程如图所示:

  • 处理器GetFile配置如下:

具体配置如下:
1: /home/dataset/sparkTest
2: code2.txt

代码内容文件code2.txt的内容如下:

import random
NUM_SAMPLES = 100000
def sample(p):
  x, y = random.random(), random.random()
  return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

  • 处理器ExtractText配置如下:

需要点击**加号**按钮,Property项命名为code2,Value项赋值为$

  • 处理器ExecuteSparkInteractive配置为:

具体配置如下:
1: LivySessionController_PySpark
2: ${code2}
  • 测试前将代码文件code2.txt上传至安装nifi主机的路径/home/dataset/sparkTest下:

在已安装Livy的主机上启动Livy

  • 测试后

登录Livy sever查看测试结果

运行SparkR样例操作步骤

  • 整个过程的流程如图所示:

注意:在测试过程中如果与Spark,PySpark样例不完全一样

  • 处理器GetFile配置如下:

    具体配置如下:
    1: /home/dataset/sparkTest
    2: code3.txt
    

    代码内容文件code3.txt的内容如下:

    piR <- function(N) {
        x <- runif(N)
        y <- runif(N)
        d <- sqrt(x^2 + y^2)
        return(4 * sum(d < 1.0) / N)
    }
    
    set.seed(5)
    cat("Pi is roughly ",piR(1000000) )
    

  • 处理器ExecuteSparkInteractive配置为:

    具体配置如下:
    1: /home/dataset/sparkTest
    2: code3.txt里的代码内容
    

  • 测试前将代码文件code3.txt上传至安装nifi主机的路径/home/dataset/sparkTest下:

    在已安装Livy的主机上启动Livy

  • 测试后

登录Livy sever查看测试结果

NiFi连接Kafka普通模式

操作场景

NiFi中配置kafka解析器,对的FI HD kafka 21005端口

前提条件

  • 已经完成NiFi 1.7.1的安装

  • 已完成FusionInsight HD和客户端的安装,包含kafka组件

  • 已完成 NiFi Kerberos认证配置

GetHTTP & PutKafka 操作步骤

  • 整个过程的流程如图所示:

  • 驱动器GetHTTP的配置如下:

具体配置如下:
1: http://vincentarelbundock.github.io/Rdatasets/csv/datasets/iris.csv
2: iris.csv
- 驱动器PutKafka配置如下:

具体配置如下:
1: 172.21.3.102:21005,172.21.3.101:21005,172.21.3.103:21005
2: nifi-kafka-test-demo
3: nifi
  • 测试前:

登录FI客户端kafak组件,创建Topic nifi-kafka-test-demo

  cd /opt/hadoopclient/Kafka/kafka/bin
  kafka-topics.sh --create --topic nifi-kafka-test-demo --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --partitions 1 --replication-factor 1

  • 测试后:

登录FI客户端kafak组件,查看结果:

cd /opt/hadoopclient/Kafka/kafka/bin
kafka-console-consumer.sh --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --topic nifi-kafka-test-demo --from-beginning

PublishKafka_0_11样例操作步骤

  • 整个工作流为:

  • 驱动器GetHTTP的配置如下:

具体配置如下:
1: http://vincentarelbundock.github.io/Rdatasets/csv/datasets/iris.csv
2: iris.csv
- 驱动器PublishKafka_0_11的配置如下:

1: 172.16.6.10:21005,172.16.6.11:21005,172.16.6.12:21005
2: SASL_PLAINTEXT
3: wikipedia21005
6: Guarantee Replicated Delivery
  • 启动整个工作流:

  • 登陆FI HD kafka客户端检查结果:

ConsumeKafka_0_11 操作步骤

  • 整个过程的流程如图所示:

  • 处理器ConsumeKafka_0_11配置如下:

1: 172.21.3.101:21005,172.21.3.102:21005,172.21.3.103:21005
2: PLAINTEXT
3: KeytabCredentialsService
4: Kafka
5: example-metric1
6: DemoConsumer
  • 处理器PutFile配置如下:

  • 测试前:

    用eclipse打开客户端自带的kafka样例代码kafka-examples,调试使得样例代码能够正常运行NewProducer.java

    由于ConsumeKafka是实时获取日志信息的,所以在测试的时候需要先运行NewProducer.java往Kafka上传日志文件,再同时开启nifi的驱动器ConsumeKafka_0_11进行读取日志的测试

  • 测试后:

    登录路径/home/dataset/Kafka查看测试结果:

NiFi连接Kafka安全模式

操作场景

NiFi中配置kafka解析器,对的FI HD kafka 21007端口

前提条件

  • 已经完成NiFi 1.9.2的安装

  • 已完成FusionInsight HD和客户端的安装,包含kafka组件

  • 已完成 NiFi Kerberos认证配置

  • nifi主机ip: 172.16.2.121, FI HD三节点ip: 172.16.4.121-123

认证相关操作步骤

  • 在nifi主机/opt/nifi/nifi-1.9.2/conf/jaas.conf文件下新增内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
principal="developuser@HADOOP.COM"
keyTab="/opt/developuser/user.keytab"
useTicketCache=false
serviceName="kafka"
storeKey=true
debug=true;
};

20200714_152157_31

  • 使用命令bin/nifi.sh stop停止nifi

  • 在FI HD的kafka客户端中找到对应的kafka client jar包,比如/opt/125_651hdclient/hadoopclient/Kafka/kafka/libs/kafka-clients-1.1.0.jar

20200714_144552_55

  • 将nifi主机下/opt/nifi/nifi-1.9.2/work/nar/extensions/nifi-kafka-1-0-nar-1.9.2.nar-unpacked/NAR-INF/bundled-dependencies路径中原来的kafka client jar包kafka-clients-1.0.2.jar 使用重命名命令命名为 kafka-clients-1.0.2.jar.org 并且把上一步在 FI HD kafka客户端中找到的kafka-clients-1.1.0.jar复制到此路径下:

20200714_145341_90

  • 登陆nifi主机,修改配置文件/opt/nifi/nifi-1.9.2/conf/bootstrap.conf添加新的jvm参数,保存:

20200714_152418_40

  • 使用命令bin/nifi.sh start启动nifi:

PublishKafka_1_0样例操作步骤

  • 整个工作流为:

20200714_153914_50

  • 驱动器GetFile的配置如下:

20200714_153956_25

具体配置如下:
1: /home/dataset/KAFKA
2: test.txt

test.txt文件内容

id,name,class_id
1,"Tom",1
2,"Sandy",2
3,"Benny",3
4,"Tina",1
2,"Sandy",2
3,"Benny",3
4,"Tina",1
  • 驱动器PublishKafka_1_0的配置如下:

20200714_153304_50

1: 172.16.4.121:21007,172.16.4.122:21007,172.16.4.123:21007
2: SASL_PLAINTEXT
3: kafka
4: KeytabCredentialsService
5: testtopic_01
6: Guarantee Replicated Delivery
  • 运行整个工作流:

20200714_154149_84

  • 去集群kafka客户端检查:

20200714_154923_62

ConsumeKafka_1_0样例操作步骤

  • 整个工作流为:

  • 驱动器ConsumeKafka_1_0的配置如下:

20200714_155250_13

1: 172.16.4.121:21007,172.16.4.122:21007,172.16.4.123:21007
2: SASL_PLAINTEXT
3: kafka
4: KeytabCredentialsService
5: testtopic01
6: Demo
  • 驱动器PutFile配置如下:

20200714_155513_64

  • 启动整个工作流

20200714_155630_71

  • 使用FI HD producer传数据:

20200714_155712_24

  • 登陆druid主机检查结果

20200714_155801_46

NiFi连接Hetu

相关配置参考配置文档,下面介绍hetu相关配置

  • 配置bootstrap.conf文件

增加如下内容

java.arg.20=-Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
java.arg.21=-Dzookeeper.sasl.clientconfig=Client
java.arg.22=-Dzookeeper.auth.type=kerberos
java.arg.23=-Djava.util.logging.config.file=/opt/hetuserver-client-logging.properties

其中hetuserver-client-logging.properties配置文件内容,该文件可从hetu客户端获取:

.level = INFO
java.util.logging.ConsoleHandler.level = SEVERE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
handlers = java.util.logging.ConsoleHandler
  • 找到路径/opt/nifi/nifi-1.9.2/work/nar/extensions/nifi-dbcp-service-nar-1.9.2.nar-unpacked/NAR-INF/bundled-dependencies,将hetu的驱动jar包presto-jdbc-316.jar放置到该路径下

注意: mrs8.0.2对应的hetu驱动jar包为:presto-jdbc-316-hw-ei-302002.jar

  • 重启nifi

  • 首先配置DBCPConnectionPool

1. jdbc:presto://172.16.4.161:24002,172.16.4.162:24002,172.16.4.163:24002/hive/default?serviceDiscoveryMode=zooKeeper&zooKeeperNamespace=hsbroker&deploymentMode=on_yarn&user=developuser&SSL=true&SSLTrustStorePath=/opt/hetuserver.jks&KerberosConfigPath=/opt/krb5.conf&KerberosPrincipal=developuser&KerberosKeytabPath=/opt/user.keytab&KerberosRemoteServiceName=HTTP&KerberosServicePrincipalPattern=%24%7BSERVICE%7D%40%24%7BHOST%7D
2. io.prestosql.jdbc.PrestoDriver

完成后enable:

  • 工作流如图:

  • ExecuteSQLRecord配置:

1. DBCPConnectionPool
2. select * from default.test
3. CSVRecordSetWriter (默认配置,不用做任何改动)
  • PutFile配置:

  • 启动工作流:

  • 去后台检查结果:

FAQ

问题: 连hive的时候遇到问题:

20200706_161301_86

2020-07-06 16:09:39,819 ERROR [Timer-Driven Process Thread-3] o.a.nifi.dbcp.hive.HiveConnectionPool HiveConnectionPool[id=13ec476a-0173-1000-e692-3b30a2f32700] Error getting Hive connection: org.apache.commons.dbcp.SQLNestedException: Cannot create JDBC driver of class 'org.apache.hive.jdbc.HiveDriver' for connect URL ' jdbc:hive2://172.16.4.121:24002,172.16.4.122:24002,172.16.4.123:24002/;serviceDiscoveryMode=zooKeeper;principal=hive/hadoop.hadoop.com@HADOOP.COM'
org.apache.commons.dbcp.SQLNestedException: Cannot create JDBC driver of class 'org.apache.hive.jdbc.HiveDriver' for connect URL ' jdbc:hive2://172.16.4.121:24002,172.16.4.122:24002,172.16.4.123:24002/;serviceDiscoveryMode=zooKeeper;principal=hive/hadoop.hadoop.com@HADOOP.COM'
        at org.apache.commons.dbcp.BasicDataSource.createConnectionFactory(BasicDataSource.java:1452)
        at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1371)
        at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044)
        at org.apache.nifi.dbcp.hive.HiveConnectionPool.lambda$getConnection$0(HiveConnectionPool.java:369)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
        at org.apache.nifi.dbcp.hive.HiveConnectionPool.getConnection(HiveConnectionPool.java:369)
        at org.apache.nifi.dbcp.DBCPService.getConnection(DBCPService.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87)
        at com.sun.proxy.$Proxy153.getConnection(Unknown Source)
        at org.apache.nifi.processors.hive.SelectHiveQL.onTrigger(SelectHiveQL.java:339)
        at org.apache.nifi.processors.hive.SelectHiveQL.lambda$onTrigger$0(SelectHiveQL.java:285)
        at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
        at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:106)
        at org.apache.nifi.processors.hive.SelectHiveQL.onTrigger(SelectHiveQL.java:285)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: No suitable driver
        at java.sql.DriverManager.getDriver(DriverManager.java:315)
        at org.apache.commons.dbcp.BasicDataSource.createConnectionFactory(BasicDataSource.java:1437)
        ... 30 common frames omitted

仔细查看报错

20200706_163532_26

发现连接的url前面有一个空格,在界面上连接串前面多了个空格,去掉问题解决