使用Flume消费kafka topic数据并存储到HBase
操作场景:
Flume消费kafka数据存储到HBase中。
前提条件:
已创建混合集群或者流式和分析集群(集群间网络互通,如果开启kerberos,则需配置跨集群互信https://support.huaweicloud.cn/usermanual-mrs/mrs_01_0354.html)。
操作步骤:
1. 从HBase客户端拷贝配置文件hbase-site.xml到Flume server所在节点(流式core节点)的配置目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 下。通常可以在分析集群Master节点HBase客户端安装目录如“/opt/client/HBase/hbase/conf/”下找到hbase-site.xml文件。 拷贝完成后文件需要修改文件属组为omm:wheel。
2. 从HBase集群下载用户的认证凭据。
a. 在MRS Manager,单击“系统设置”。
b. 在“权限配置”区域,单击“用户管理”。
c. 在用户列表中选择需要的用户,单击后面的“更多”下载用户凭据。
d. 解压下载的用户凭据文件,获取krb5.conf和user.keytab文件。
3. 将上一步获得的krb5.conf和user.keytab拷贝到Flume server所在节点(流式core节点)的配置目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 下。文件属组需要修改为omm:wheel。
4. 修改配置文件jaas.conf,文件所在目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 。
修改参数“keyTab”定义的用户认证文件完整路径即步骤3中保存用户认证文件的目录。
5. 修改配置文件flume-env.sh,文件所在目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 。
在 “-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:-Djava.security.krb5.conf=/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/krb5.conf -Djava.security.auth.login.config=/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/jaas.conf -Dzookeeper.request.timeout=120000
例如:"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=/opt/Bigdata/MRS_2.1.0/1_6_Flume/etc/krb5.conf -Djava.security.auth.login.config=/opt/Bigdata/MRS_2.1.0/1_6_Flume/etc/jaas.conf -Dzookeeper.request.timeout=120000"
请根据实际情况,修改配置文件路径,然后保存并退出配置文件。
6. 将HBase集群的“/etc/hosts”文件中host匹配相关内容添加到Flume server节点的“/etc/hosts”文件。
7. 重启该节点的flume实例。
8. 修改Flume配置文件“properties.properties”。
vi /opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/properties.properties
将以下内容保存到文件“properties.properties”中:
server.sources = kafka_source
server.channels = flume
server.sinks = hbase
server.sources.kafka_source.channels = flume
server.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
server.sources.kafka_source.topics = flume_test
server.sources.kafka_source.groupId = flume_group
server.sources.kafka_source.batchSize = 1000
server.sources.kafka_source.kafka.security.protocol = SASL_PLAINTEXT
server.sources.kafka_source.kafka.kerberos.domain.name = hadoop.XXX.com
server.sources.kafka_source.kafka.bootstrap.servers=XXX.XXX.XXX.XXX:21007,XXX.XXX.XXX.XXX:21007,XXX.XXX.XXX.XXX:21007
server.channels.flume.type = memory
server.channels.flume.capacity=100000
server.channels.flume.transactionCapacity=10000
server.channels.flume.channelfullcount = 10
server.channels.flume.keep-alive = 3
server.channels.flume.byteCapacityBufferPercentage = 20
server.sinks.hbase.channel = flume
server.sinks.hbase.type = hbase
server.sinks.hbase.table = hbase_name
server.sinks.hbase.columnFamily= info
server.sinks.hbase.batchSize = 1000
server.sinks.hbase.kerberosPrincipal = admin
server.sinks.hbase.kerberosKeytab = /opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/user.keytab
请根据实际情况,修改以下参数,然后保存并退出。
kafka.bootstrap.servers
默认情况下,安全集群对应端口21007,普通集群对应端口9092。
kafka.security.protocol
安全集群请配置为SASL_PLAINTEXT,普通集群请配置为PLAINTEXT。
kafka.kerberos.domain.name
普通集群无需配置此参数。安全集群对应此参数的值为Kafka集群中“kerberos.domain.name”对应的值。具体可到Borker实例所在节点上查看“/opt/Bigdata/MRS_Current/1_X_Broker/etc/server.properties”文件中配置项“kerberos.domain.name”对应的值,仅安全集群需要配置,其中“X”为随机生成的数字,请根据实际情况修改。
kerberosPrincipal
安全集群需要配置,用户名
kerberosKeytab
安全集群需要配置,用户认证文件,需要写绝对路径。
- 点赞
- 收藏
- 关注作者
评论(0)