Flink实战(二)FlinkTable API操作
2、FlinkTable API操作
FlinkTable API提供了很多的connector用于对接各种数据源,例如CSV、json、HDFS数据、HBase数据、Kafka数据、JDBC数据、Hive数据等,可以将各个系统当中的数据,直接接入到Flink当中来进行处理,然后处理完成的数据,也可以写入到各个地方去
我们接下来就一起来看一下关于各种输入数据源的使用
2.1、读取集合数据,并注册表实现数据查询
读取集合当中的数据,然后注册表,并实现数据的查询操作
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;
public class FlinkTableStandardStructure {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table: 1)读取外部表;2)从Table API或者SQL查询结果创建表
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "zhangsan"),
row(2L, "lisi")
).select($("id"), $("name"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.DECIMAL(10, 2))
.column("name", DataTypes.STRING())
.build();
//https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/print/
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、Table API执行查询(可以执行多次查询,中间表可以注册到catalog也可以不注册)
Table resultTable = tEnv.from("sourceTable").select($("id"), $("name"));
//如果不注册sourceTable,可以这么写
//Table resultTable = projTable.select($("id"), $("name"));
//5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
resultTable.executeInsert("sinkTable");
}
}
2.2、Flink Table内置的Connector
Flink 的 Table API & SQL通过Connectors连接外部系统,并执⾏批/流⽅式的读写操作。Connectors提供了丰富的外部系统连接器,根据source和sink的类型,它们⽀持不同的格式,例如 CSV、Avro、Parquet 或 ORC。注意:如果作为sink⼤家还要注意⽀持的输出模式(Append/Retract/Upsert)
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
名称 | 支持版本 | source | sink |
---|---|---|---|
Filesystem | 有界和⽆界scan、 lookup均⽀持 | Streaming Sink, Batch Sink | |
Kafka | 0.10+ | ⽆界scan | Streaming Sink, Batch Sink |
JDBC | 有界scan和lookup | Streaming Sink, Batch Sink | |
HBase | 1.4.x、2.2.x | 有界scan和lookup | Streaming Sink, Batch Sink |
Hive | 1.0、1.1、1.2、2.0、2.1、 2.2、2.3、3.1 | ⽆界scan、有界scan 和lookup | Streaming Sink, Batch Sink |
Elasticsearch | 6.x & 7.x | 不支持 | Streaming sink,Batch Sink |
表Format
数据以各种格式存储在不同的存储中(CSV、Avro、Parquet 或 ORC等),Flink定义了Format来⽀持读取不同格式的数据
Formats | Support Connectors |
---|---|
CSV | Apache Kafka, Upsert Kafka, Filesystem |
JSON | Apache Kafka, Upsert Kafka, Filesystem,Elasticsearch |
Apache Avro | Apache Kafka, Upsert Kafka, Filesystem, |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
Debezium | Apache Kafka, |
Canal | Apache Kafka, |
Maxwell | Apache Kafka, |
Raw | Apache Kafka, |
2.3、通过内置的connector实现JSON数据读取,并将数据写入到HDFS上成为CSV数据格式
通过Flink的内置Connector实现读取JSON数据
通过TableAPI实现读取CSV文件内容,然后将数据写入到HDFS上面去
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class FlinkJson2HDFSCsv {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
String source_sql = "CREATE TABLE json_table (\n" +
" id Integer,\n" +
" name STRING,\n" +
" email STRING,\n" +
" date_time STRING" +
") WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='input/userbase.json',\n" +
" 'format'='json'\n" +
")";
String sink_sql = "CREATE TABLE sink_hdfs (\n" +
" id Integer,\n" +
" name STRING,\n" +
" email STRING,\n" +
" date_time STRING" +
") WITH ( \n " +
" 'connector' = 'filesystem',\n" +
" 'path' = 'hdfs://bigdata01:8020/output_csv/userbase.csv' , \n" +
" 'format' = 'csv'\n" +
")";
String insert_SQL = "insert into sink_hdfs select id,name ,date_time,email from json_table ";
//注册表
tableEnvironment.executeSql(source_sql);
tableEnvironment.executeSql(sink_sql);
tableEnvironment.executeSql(insert_SQL);
}
}
2.4、通过内置的Connector实现读取HDFS上的csv格式的数据写入HBase
通过Flink的内置Connector实现读取json数据,然后将读取的数据写入到HBase里面去
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/filesystem/
进入HBase的shell客户端,然后创建表
[hadoop@bigdata01 ~]$ cd /opt/install/hbase-2.2.7/
[hadoop@bigdata01 hbase-2.2.7]$ bin/hbase shell
hbase(main):009:0> create 'hTable','f1'
通过TableAPI实现读取CSV文件内容,然后将数据写入到HBase当中去
import org.apache.flink.table.api.*;
public class FlinkWithHDFSCSV2HBase {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
String source_sql = "CREATE TABLE source_hdfs (\n" +
" id Integer,\n" +
" name STRING,\n" +
" date_time STRING,\n" +
" email STRING" +
") WITH ( \n " +
" 'connector' = 'filesystem',\n" +
" 'path' = 'hdfs://bigdata01:8020//output_csv/userbase.csv/' , \n" +
" 'format' = 'csv'\n" +
")";
String sink_sql = "CREATE TABLE sink_table (\n" +
" rowkey Integer,\n" +
" f1 ROW<name STRING,email STRING,date_time STRING > ,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'hTable',\n" +
" 'zookeeper.quorum' = 'bigdata01:2181,bigdata02:2181,bigdata03:2181'\n" +
") ";
String execute_sql = "insert into sink_table select id as rowkey,ROW(name,email,date_time) from source_hdfs ";
tableEnvironment.executeSql(source_sql);
tableEnvironment.executeSql(sink_sql);
tableEnvironment.executeSql(execute_sql);
}
}
2.6、通过内置的connector实现读取HBase上的数据,写入到Kafka当中去
通过Flink的内置Connector实现读取HBase数据,然后将数据写入到Kafka当中去
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hbase/
创建HBase表,并添加以下数据内容
准备HBase数据集
hbase(main):003:0> create 'opt_log','f1'
#插入数据集
put 'opt_log','1','f1:username','郑剃'
put 'opt_log','1','f1:email','kyzqcd0686@vjikq.tng'
put 'opt_log','1','f1:date_time','2022-10-04 08:01:48'
put 'opt_log','2','f1:username','王曙介'
put 'opt_log','2','f1:email','axvcbj7vbo@ecyi1.4gw'
put 'opt_log','2','f1:date_time','2022-10-04 08:04:39'
put 'opt_log','3','f1:username','赖溯姆'
put 'opt_log','3','f1:email','ew1qu5sunz@caxtg.vtn'
put 'opt_log','3','f1:date_time','2022-10-04 08:00:19'
创建Kafak的topic,并打开消费者进行消费
#查看topic列表
cd /opt/install/kafka_2.12-2.6.3/
bin/kafka-topics.sh --zookeeper bigdata01:9092,bigdata02:9092,bigdata03:9092 --list
#创建topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_output --replication-factor 3 --partitions 3
#打开消费者
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_output
通过TableAPI实现读取HBase文件内容,然后将数据写入到Kafka
import org.apache.flink.table.api.*;
public class FlinkTableWithHBase2Kafka {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
String source_table = "CREATE TABLE hTable (\n" +
" rowkey STRING,\n" +
" f1 ROW<username STRING,email STRING,date_time String> ,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'opt_log',\n" +
" 'zookeeper.quorum' = 'bigdata01:2181,bigdata02:2181,bigdata03:2181'\n" +
") ";
String sink_table = "CREATE TABLE KafkaTable (\n" +
" `username` STRING,\n" +
" `email` STRING,\n" +
" `date_time` STRING \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_output',\n" +
" 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
" 'format' = 'json'\n" +
")";
String insert_sql = "insert into KafkaTable select username,email,date_time from hTable";
tableEnvironment.executeSql(source_table);
tableEnvironment.executeSql(sink_table);
tableEnvironment.executeSql(insert_sql);
}
}
2.7、通过内置的connector实现读取Kafka上的数据,然后将数据写入到mysql
通过Flink的内置Connector实现读取Kafka数据
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/
创建Kafka的topic,并添加以下数据内容
#创建topic
bin/kafka-topics.sh --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181 --create --topic usr_opt --replication-factor 3 --partitions 3
#打开生产者
bin/kafka-console-producer.sh --broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic usr_opt
测试数据
{"date_time":"2022-10-04 08:01:48","email":"kyzqcd0686@vjikq.tng","id":0,"name":"郑剃"}
{"date_time":"2022-10-04 08:06:31","email":"bvkqwbmgwi@lh80q.4ln","id":1,"name":"闾丘喜造"}
{"date_time":"2022-10-04 08:04:39","email":"axvcbj7vbo@ecyi1.4gw","id":2,"name":"王曙介"}
{"date_time":"2022-10-04 08:00:19","email":"ew1qu5sunz@caxtg.vtn","id":3,"name":"赖溯姆"}
Kafka数据生产与消费
#查看topic列表
cd /opt/install/kafka_2.12-2.6.3/
bin/kafka-topics.sh --zookeeper bigdata01:9092,bigdata02:9092,bigdata03:9092 --list
#创建topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_input --replication-factor 3 --partitions 3
bin/kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --create --topic user_output --replication-factor 3 --partitions 3
#打开消费者
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_output
#打开生产者
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic user_input
#输入数据内容如下
{"user":"Cidy","visit_url":"./home","op_time":"2022-02-03 12:00:00"}
{"user":"Lili","visit_url":"./index","op_time":"2022-02-03 13:30:50"}
{"user":"Tom","visit_url":"./detail","op_time":"2022-02-04 13:35:30"}
定义mysql表
CREATE DATABASE /*!32312 IF NOT EXISTS*/`user_log` /*!40100 DEFAULT CHARACTER SET latin1 */;
USE `user_log`;
/*Table structure for table `clickcount` */
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
`username` varchar(64) DEFAULT NULL,
`result` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `clickcount` */
/*Table structure for table `clicklog` */
DROP TABLE IF EXISTS `clicklog`;
CREATE TABLE `clicklog` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(20) DEFAULT NULL,
`email` varchar(100) DEFAULT NULL,
`date_time` varchar(30) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
/*Data for the table `clicklog` */
insert into `clicklog`(`id`,`username`,`email`,`date_time`) values (1,'zhangsan','kyzqcd0686@vjikq.tng','2022-10-04 08:01:48'),(2,'lisi','bvkqwbmgwi@lh80q.4ln','2022-10-04 08:06:31');
通过TableAPI实现读取Kafka数据内容,然后将数据写入到mysql里面去
import org.apache.flink.table.api.*;
public class FlinkTableWithKafka2MySQL {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
String source_sql = "CREATE TABLE KafkaTable (\n" +
" id Integer,\n" +
" name STRING,\n" +
" email STRING,\n" +
" date_time STRING" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'usr_opt',\n" +
" 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +
" 'properties.group.id' = 'user_opt_group',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")";
String sink_sql = "CREATE TABLE mysql_sink (\n" +
" id Integer,\n" +
" name STRING,\n" +
" email STRING,\n" +
" date_time STRING" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/user_log?characterEncoding=utf-8&serverTimezone=GMT%2B8',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'table-name' = 'clicklog',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")";
String execute_sql = "insert into mysql_sink select id,name,email,date_time from KafkaTable";
tableEnvironment.executeSql(source_sql);
tableEnvironment.executeSql(sink_sql);
tableEnvironment.executeSql(execute_sql);
}
}
2.8、通过内置的connector实现读取Hive数据以及写入Hive数据
通过Flink的内置Connector实现读取hive数据
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/
修改hive-site.xml的配置属性,添加以下配置
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.52.120:9083</value>
</property>
将hive-site.xml存放到项目路径当中
启动hive的metastore服务,并创建hive数据库以及hive数据库表
#启动hive的metastore服务以及hiveserver2服务
[hadoop@bigdata03 apache-hive-3.1.2]$ cd /opt/install/apache-hive-3.1.2/
[hadoop@bigdata03 apache-hive-3.1.2]$ bin/hive --service metastore
[hadoop@bigdata03 apache-hive-3.1.2]$ bin/hive --service hiveserver2
#创建本地文件
[hadoop@bigdata03 install]$ cd /opt/install/
[hadoop@bigdata03 install]$ vim userbase.csv
#文件内容如下
0,郑剃,"2022-10-04 08:01:48",kyzqcd0686@vjikq.tng
1,闾丘喜造,"2022-10-04 08:06:31",bvkqwbmgwi@lh80q.4ln
2,王曙介,"2022-10-04 08:04:39",axvcbj7vbo@ecyi1.4gw
3,赖溯姆,"2022-10-04 08:00:19",ew1qu5sunz@caxtg.vtn
4,钱泼奎,"2022-10-04 08:04:51",50xdhnfppw@vwreu.kxk
5,尉迟亏,"2022-10-04 08:02:25",h8ist2s54k@lorkp.79s
6,贾盏,"2022-10-04 08:05:22",hnzfdmnjgo@rsiq9.syx
7,蔡辟,"2022-10-04 08:03:53",apjlg5pyuo@lhs6l.oj4
8,蔡矛,"2022-10-04 08:05:35",cpqofnn5xd@7iknh.qc5
9,赖妖炬,"2022-10-04 08:05:03",0wg3nfjdv9@fomvu.2kb
10,毛溜孝,"2022-10-04 08:06:37",1kkaib5i4e@ecvb8.6cs
准备Hive数据
#创建test数据库
create database if not exists test location "/user/hive/warehouse/test";
use test;
#创建Hive表
drop table userbase;
create table if not exists userbase
(id int,
name string,
date_time string,
email string
)
row format delimited fields terminated by ","
stored as textfile;
#加载click.txt至Hive表
load data local inpath '/opt/install/userbase.csv' into table userbase;
#查询Hive
select * from userbase;
#创建hive表数据保存目的地表
create table user_count (username string,count_result int) row format delimited fields terminated by '\t';
通过TableAPI实现读取Hive数据内容
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
public class FlinkTableWithHive {
public static void main(String[] args) {
//1.创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2.创建HiveCatalog
String name = "myCataLog";
String defaultDatabase = "test";
String hiveConfDir = "input/hiveconf/";
tEnv.loadModule(name,new HiveModule("3.1.2"));
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
HiveCatalog hive = new HiveCatalog(name,defaultDatabase,hiveConfDir);
//3.注册catalog
tEnv.registerCatalog(name,hive);
//4.设置当前会话使用的catalog和database
tEnv.useCatalog(name);
tEnv.useDatabase(defaultDatabase);
tEnv.executeSql("insert into user_count select username,count(1) as count_result from clicklog group by username");
}
}
- 点赞
- 收藏
- 关注作者
评论(0)