Flink实战(二)FlinkTable API操作

举报
Maynor学长 发表于 2022/10/27 15:15:42 2022/10/27
【摘要】 2、FlinkTable API操作FlinkTable API提供了很多的connector用于对接各种数据源,例如CSV、json、HDFS数据、HBase数据、Kafka数据、JDBC数据、Hive数据等,可以将各个系统当中的数据,直接接入到Flink当中来进行处理,然后处理完成的数据,也可以写入到各个地方去我们接下来就一起来看一下关于各种输入数据源的使用 2.1、读取集合数据,并...

2、FlinkTable API操作

FlinkTable API提供了很多的connector用于对接各种数据源,例如CSV、json、HDFS数据、HBase数据、Kafka数据、JDBC数据、Hive数据等,可以将各个系统当中的数据,直接接入到Flink当中来进行处理,然后处理完成的数据,也可以写入到各个地方去

数据输出

我们接下来就一起来看一下关于各种输入数据源的使用

2.1、读取集合数据,并注册表实现数据查询

读取集合当中的数据,然后注册表,并实现数据的查询操作


import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;

public class FlinkTableStandardStructure {

    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        //2、创建source table: 1)读取外部表;2)从Table API或者SQL查询结果创建表
        Table projTable = tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD("name", DataTypes.STRING())
                ),
                row(1, "zhangsan"),
                row(2L, "lisi")
        ).select($("id"), $("name"));

        //注册表到catalog(可选的)
        tEnv.createTemporaryView("sourceTable", projTable);

        //3、创建sink table
        final Schema schema = Schema.newBuilder()
                .column("id", DataTypes.DECIMAL(10, 2))
                .column("name", DataTypes.STRING())
                .build();

        //https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/print/
        tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
                .schema(schema)
                .build());

        //4、Table API执行查询(可以执行多次查询,中间表可以注册到catalog也可以不注册)
        Table resultTable = tEnv.from("sourceTable").select($("id"), $("name"));
        //如果不注册sourceTable,可以这么写
        //Table resultTable = projTable.select($("id"), $("name"));

        //5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
        resultTable.executeInsert("sinkTable");

    }
}

2.2、Flink Table内置的Connector

Flink 的 Table API & SQL通过Connectors连接外部系统,并执⾏批/流⽅式的读写操作。Connectors提供了丰富的外部系统连接器,根据source和sink的类型,它们⽀持不同的格式,例如 CSV、Avro、Parquet 或 ORC。注意:如果作为sink⼤家还要注意⽀持的输出模式(Append/Retract/Upsert)

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/

名称 支持版本 source sink
Filesystem 有界和⽆界scan、 lookup均⽀持 Streaming Sink, Batch Sink
Kafka 0.10+ ⽆界scan Streaming Sink, Batch Sink
JDBC 有界scan和lookup Streaming Sink, Batch Sink
HBase 1.4.x、2.2.x 有界scan和lookup Streaming Sink, Batch Sink
Hive 1.0、1.1、1.2、2.0、2.1、 2.2、2.3、3.1 ⽆界scan、有界scan 和lookup Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x 不支持 Streaming sink,Batch Sink

表Format

数据以各种格式存储在不同的存储中(CSV、Avro、Parquet 或 ORC等),Flink定义了Format来⽀持读取不同格式的数据

Formats Support Connectors
CSV Apache Kafka, Upsert Kafka, Filesystem
JSON Apache Kafka, Upsert Kafka, Filesystem,Elasticsearch
Apache Avro Apache Kafka, Upsert Kafka, Filesystem,
Apache Parquet Filesystem
Apache ORC Filesystem
Debezium Apache Kafka,
Canal Apache Kafka,
Maxwell Apache Kafka,
Raw Apache Kafka,

2.3、通过内置的connector实现JSON数据读取,并将数据写入到HDFS上成为CSV数据格式

通过Flink的内置Connector实现读取JSON数据

通过TableAPI实现读取CSV文件内容,然后将数据写入到HDFS上面去


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class FlinkJson2HDFSCsv {

    public static void main(String[] args) {

        Logger.getLogger("org").setLevel(Level.ERROR);
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                .inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        String source_sql = "CREATE TABLE json_table (\n" +
                "  id Integer,\n" +
                "  name STRING,\n" +
                "  email STRING,\n" +
                "  date_time STRING" +
                ") WITH (\n" +
                "  'connector'='filesystem',\n" +
                "  'path'='input/userbase.json',\n" +
                "  'format'='json'\n" +
                ")";


        String sink_sql = "CREATE TABLE sink_hdfs (\n" +
                "  id Integer,\n" +
                "  name STRING,\n" +
                "  email STRING,\n" +
                "  date_time STRING" +
                ") WITH ( \n " +
                " 'connector' = 'filesystem',\n" +
                " 'path' = 'hdfs://bigdata01:8020/output_csv/userbase.csv' , \n" +
                " 'format' = 'csv'\n" +
                ")";


        String insert_SQL = "insert into sink_hdfs select id,name ,date_time,email from json_table ";


        //注册表
        tableEnvironment.executeSql(source_sql);
        tableEnvironment.executeSql(sink_sql);
        tableEnvironment.executeSql(insert_SQL);

    }
}

2.4、通过内置的Connector实现读取HDFS上的csv格式的数据写入HBase

通过Flink的内置Connector实现读取json数据,然后将读取的数据写入到HBase里面去

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/filesystem/

进入HBase的shell客户端,然后创建表

[hadoop@bigdata01 ~]$ cd /opt/install/hbase-2.2.7/
[hadoop@bigdata01 hbase-2.2.7]$ bin/hbase shell
hbase(main):009:0> create 'hTable','f1'

通过TableAPI实现读取CSV文件内容,然后将数据写入到HBase当中去


import org.apache.flink.table.api.*;

public class FlinkWithHDFSCSV2HBase {

    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        String source_sql = "CREATE TABLE source_hdfs (\n" +
                "  id Integer,\n" +
                "  name STRING,\n" +
                "  date_time STRING,\n" +
                "  email STRING" +
                ") WITH ( \n " +
                " 'connector' = 'filesystem',\n" +
                " 'path' = 'hdfs://bigdata01:8020//output_csv/userbase.csv/' , \n" +
                " 'format' = 'csv'\n" +
                ")";


        String sink_sql = "CREATE TABLE sink_table (\n" +
                " rowkey Integer,\n" +
                " f1 ROW<name STRING,email STRING,date_time STRING > ,\n" +
                " PRIMARY KEY (rowkey) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-2.2',\n" +
                " 'table-name' = 'hTable',\n" +
                " 'zookeeper.quorum' = 'bigdata01:2181,bigdata02:2181,bigdata03:2181'\n" +
                ") ";


        String execute_sql = "insert  into sink_table select id as rowkey,ROW(name,email,date_time)  from source_hdfs ";

        tableEnvironment.executeSql(source_sql);
        tableEnvironment.executeSql(sink_sql);
        tableEnvironment.executeSql(execute_sql);
    }
}

2.6、通过内置的connector实现读取HBase上的数据,写入到Kafka当中去

通过Flink的内置Connector实现读取HBase数据,然后将数据写入到Kafka当中去

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hbase/

创建HBase表,并添加以下数据内容

准备HBase数据集
hbase(main):003:0> create 'opt_log','f1'
#插入数据集
put 'opt_log','1','f1:username','郑剃'
put 'opt_log','1','f1:email','kyzqcd0686@vjikq.tng'
put 'opt_log','1','f1:date_time','2022-10-04 08:01:48'
put 'opt_log','2','f1:username','王曙介'
put 'opt_log','2','f1:email','axvcbj7vbo@ecyi1.4gw'
put 'opt_log','2','f1:date_time','2022-10-04 08:04:39'
put 'opt_log','3','f1:username','赖溯姆'
put 'opt_log','3','f1:email','ew1qu5sunz@caxtg.vtn'
put 'opt_log','3','f1:date_time','2022-10-04 08:00:19'

创建Kafak的topic,并打开消费者进行消费

#查看topic列表
cd /opt/install/kafka_2.12-2.6.3/
bin/kafka-topics.sh --zookeeper bigdata01:9092,bigdata02:9092,bigdata03:9092 --list
#创建topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_output --replication-factor 3 --partitions 3
#打开消费者
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_output

通过TableAPI实现读取HBase文件内容,然后将数据写入到Kafka


import org.apache.flink.table.api.*;

public class FlinkTableWithHBase2Kafka {
    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);


        String source_table = "CREATE TABLE hTable (\n" +
                " rowkey STRING,\n" +
                " f1 ROW<username STRING,email STRING,date_time String> ,\n" +
                " PRIMARY KEY (rowkey) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-2.2',\n" +
                " 'table-name' = 'opt_log',\n" +
                " 'zookeeper.quorum' = 'bigdata01:2181,bigdata02:2181,bigdata03:2181'\n" +
                ") ";


        String sink_table = "CREATE TABLE KafkaTable (\n" +
                "  `username` STRING,\n" +
                "  `email` STRING,\n" +
                "  `date_time` STRING \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'user_output',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
                "  'format' = 'json'\n" +
                ")";

        String insert_sql = "insert into KafkaTable select username,email,date_time from hTable";


        tableEnvironment.executeSql(source_table);
        tableEnvironment.executeSql(sink_table);
        tableEnvironment.executeSql(insert_sql);
    }
}

2.7、通过内置的connector实现读取Kafka上的数据,然后将数据写入到mysql

通过Flink的内置Connector实现读取Kafka数据

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/

创建Kafka的topic,并添加以下数据内容

#创建topic
bin/kafka-topics.sh --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181 --create --topic usr_opt --replication-factor 3 --partitions 3
#打开生产者
bin/kafka-console-producer.sh --broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic usr_opt
测试数据
{"date_time":"2022-10-04 08:01:48","email":"kyzqcd0686@vjikq.tng","id":0,"name":"郑剃"}
{"date_time":"2022-10-04 08:06:31","email":"bvkqwbmgwi@lh80q.4ln","id":1,"name":"闾丘喜造"}
{"date_time":"2022-10-04 08:04:39","email":"axvcbj7vbo@ecyi1.4gw","id":2,"name":"王曙介"}
{"date_time":"2022-10-04 08:00:19","email":"ew1qu5sunz@caxtg.vtn","id":3,"name":"赖溯姆"}



Kafka数据生产与消费

#查看topic列表
cd /opt/install/kafka_2.12-2.6.3/
bin/kafka-topics.sh --zookeeper bigdata01:9092,bigdata02:9092,bigdata03:9092 --list
#创建topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_input --replication-factor 3 --partitions 3
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_output --replication-factor 3 --partitions 3
#打开消费者
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_output
#打开生产者
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_input
#输入数据内容如下
{"user":"Cidy","visit_url":"./home","op_time":"2022-02-03 12:00:00"}
{"user":"Lili","visit_url":"./index","op_time":"2022-02-03 13:30:50"}
{"user":"Tom","visit_url":"./detail","op_time":"2022-02-04 13:35:30"}


定义mysql表

CREATE DATABASE /*!32312 IF NOT EXISTS*/`user_log` /*!40100 DEFAULT CHARACTER SET latin1 */;

USE `user_log`;

/*Table structure for table `clickcount` */

DROP TABLE IF EXISTS `clickcount`;

CREATE TABLE `clickcount` (
  `username` varchar(64) DEFAULT NULL,
  `result` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `clickcount` */

/*Table structure for table `clicklog` */

DROP TABLE IF EXISTS `clicklog`;

CREATE TABLE `clicklog` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(20) DEFAULT NULL,
  `email` varchar(100) DEFAULT NULL,
  `date_time` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

/*Data for the table `clicklog` */

insert  into `clicklog`(`id`,`username`,`email`,`date_time`) values (1,'zhangsan','kyzqcd0686@vjikq.tng','2022-10-04 08:01:48'),(2,'lisi','bvkqwbmgwi@lh80q.4ln','2022-10-04 08:06:31');

通过TableAPI实现读取Kafka数据内容,然后将数据写入到mysql里面去


import org.apache.flink.table.api.*;

public class FlinkTableWithKafka2MySQL {

    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);


        String source_sql = "CREATE TABLE KafkaTable (\n" +
                "  id Integer,\n" +
                "  name STRING,\n" +
                "  email STRING,\n" +
                "  date_time STRING" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'usr_opt',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
                "  'properties.group.id' = 'user_opt_group',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json',\n" +
                "   'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")";


        String sink_sql = "CREATE TABLE mysql_sink (\n" +
                "  id Integer,\n" +
                "  name STRING,\n" +
                "  email STRING,\n" +
                "  date_time STRING" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/user_log?characterEncoding=utf-8&serverTimezone=GMT%2B8',\n" +
                "  'driver' = 'com.mysql.jdbc.Driver',\n" +
                "  'table-name' = 'clicklog',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456'\n" +
                ")";
        String execute_sql = "insert into mysql_sink select id,name,email,date_time from KafkaTable";
        tableEnvironment.executeSql(source_sql);
        tableEnvironment.executeSql(sink_sql);
        tableEnvironment.executeSql(execute_sql);
    }
}

2.8、通过内置的connector实现读取Hive数据以及写入Hive数据

通过Flink的内置Connector实现读取hive数据

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/

修改hive-site.xml的配置属性,添加以下配置

<property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.52.120:9083</value>
</property>

将hive-site.xml存放到项目路径当中

image-20221004210814346

启动hive的metastore服务,并创建hive数据库以及hive数据库表

#启动hive的metastore服务以及hiveserver2服务
[hadoop@bigdata03 apache-hive-3.1.2]$ cd /opt/install/apache-hive-3.1.2/
[hadoop@bigdata03 apache-hive-3.1.2]$ bin/hive --service metastore
[hadoop@bigdata03 apache-hive-3.1.2]$ bin/hive --service hiveserver2

#创建本地文件
[hadoop@bigdata03 install]$ cd /opt/install/
[hadoop@bigdata03 install]$ vim userbase.csv
#文件内容如下
0,郑剃,"2022-10-04 08:01:48",kyzqcd0686@vjikq.tng
1,闾丘喜造,"2022-10-04 08:06:31",bvkqwbmgwi@lh80q.4ln
2,王曙介,"2022-10-04 08:04:39",axvcbj7vbo@ecyi1.4gw
3,赖溯姆,"2022-10-04 08:00:19",ew1qu5sunz@caxtg.vtn
4,钱泼奎,"2022-10-04 08:04:51",50xdhnfppw@vwreu.kxk
5,尉迟亏,"2022-10-04 08:02:25",h8ist2s54k@lorkp.79s
6,贾盏,"2022-10-04 08:05:22",hnzfdmnjgo@rsiq9.syx
7,蔡辟,"2022-10-04 08:03:53",apjlg5pyuo@lhs6l.oj4
8,蔡矛,"2022-10-04 08:05:35",cpqofnn5xd@7iknh.qc5
9,赖妖炬,"2022-10-04 08:05:03",0wg3nfjdv9@fomvu.2kb
10,毛溜孝,"2022-10-04 08:06:37",1kkaib5i4e@ecvb8.6cs


准备Hive数据
#创建test数据库
create database if not exists test location "/user/hive/warehouse/test";
use test;
#创建Hive表
drop table userbase;
create  table if not exists  userbase
(id int,
name string,
date_time string,
email string 
)
row format delimited fields terminated by ","
stored as textfile;
#加载click.txt至Hive表
load data local inpath '/opt/install/userbase.csv' into table userbase;
#查询Hive
select * from userbase;

#创建hive表数据保存目的地表
create table user_count  (username string,count_result int) row format delimited fields terminated by '\t';

通过TableAPI实现读取Hive数据内容

import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;

public class FlinkTableWithHive {
    public static void main(String[] args) {
        //1.创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        //2.创建HiveCatalog
        String name = "myCataLog";
        String defaultDatabase = "test";
        String hiveConfDir = "input/hiveconf/";
        tEnv.loadModule(name,new HiveModule("3.1.2"));
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        HiveCatalog hive = new HiveCatalog(name,defaultDatabase,hiveConfDir);

        //3.注册catalog
        tEnv.registerCatalog(name,hive);

        //4.设置当前会话使用的catalog和database
        tEnv.useCatalog(name);
        tEnv.useDatabase(defaultDatabase);


        tEnv.executeSql("insert into user_count select username,count(1) as count_result from clicklog group by username");

    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。