客快物流大数据项目(四十二):Java代码操作Kudu
【摘要】
目录
Java代码操作Kudu
一、构建maven工程
二、导入依赖
三、创建包结构
四、初始化方法
五、创建表
六、插入数据
七、查询数据
八、修改数据
九、删除数据
十、修改表
十一、删除表
...
目录
Java代码操作Kudu
一、构建maven工程
二、导入依赖
-
<repositories>
-
<repository>
-
<id>cloudera</id>
-
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
-
</repository>
-
</repositories>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.kudu</groupId>
-
<artifactId>kudu-client</artifactId>
-
<version>1.9.0-cdh6.2.1</version>
-
</dependency>
-
-
<dependency>
-
<groupId>junit</groupId>
-
<artifactId>junit</artifactId>
-
<version>4.12</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.kudu</groupId>
-
<artifactId>kudu-client-tools</artifactId>
-
<version>1.9.0-cdh6.2.1</version>
-
</dependency>
-
-
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
-
<dependency>
-
<groupId>org.apache.kudu</groupId>
-
<artifactId>kudu-spark2_2.11</artifactId>
-
<version>1.9.0-cdh6.2.1</version>
-
</dependency>
-
-
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
-
<dependency>
-
<groupId>org.apache.spark</groupId>
-
<artifactId>spark-sql_2.11</artifactId>
-
<version>2.1.0</version>
-
</dependency>
-
</dependencies>
三、创建包结构
包名 |
说明 |
cn.it |
代码所在的包目录 |
四、初始化方法
-
package cn.it;
-
-
import org.apache.kudu.ColumnSchema;
-
import org.apache.kudu.Type;
-
import org.apache.kudu.client.KuduClient;
-
import org.junit.Before;
-
-
public class TestKudu {
-
//定义KuduClient客户端对象
-
private static KuduClient kuduClient;
-
//定义表名
-
private static String tableName = "person";
-
-
/**
-
* 初始化方法
-
*/
-
@Before
-
public void init() {
-
//指定master地址
-
String masterAddress = "node2.cn";
-
//创建kudu的数据库连接
-
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
-
}
-
-
//构建表schema的字段信息
-
//字段名称 数据类型 是否为主键
-
public ColumnSchema newColumn(String name, Type type, boolean isKey) {
-
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
-
column.key(isKey);
-
return column.build();
-
}
-
}
五、创建表
-
/** 使用junit进行测试
-
*
-
* 创建表
-
* @throws KuduException
-
*/
-
@Test
-
public void createTable() throws KuduException {
-
//设置表的schema
-
List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
-
columns.add(newColumn("CompanyId", Type.INT32, true));
-
columns.add(newColumn("WorkId", Type.INT32, false));
-
columns.add(newColumn("Name", Type.STRING, false));
-
columns.add(newColumn("Gender", Type.STRING, false));
-
columns.add(newColumn("Photo", Type.STRING, false));
-
Schema schema = new Schema(columns);
-
//创建表时提供的所有选项
-
CreateTableOptions tableOptions = new CreateTableOptions();
-
//设置表的副本和分区规则
-
LinkedList<String> list = new LinkedList<String>();
-
list.add("CompanyId");
-
//设置表副本数
-
tableOptions.setNumReplicas(1);
-
//设置range分区
-
//tableOptions.setRangePartitionColumns(list);
-
//设置hash分区和分区的数量
-
tableOptions.addHashPartitions(list, 3);
-
try {
-
kuduClient.createTable("person", schema, tableOptions);
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
kuduClient.close();
-
}
六、插入数据
-
/**
-
* 向表中加载数据
-
* @throws KuduException
-
*/
-
@Test
-
public void loadData() throws KuduException {
-
//打开表
-
KuduTable kuduTable = kuduClient.openTable(tableName);
-
//创建KuduSession对象 kudu必须通过KuduSession写入数据
-
KuduSession kuduSession = kuduClient.newSession();
-
//采用flush方式 手动刷新
-
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
-
kuduSession.setMutationBufferSpace(3000);
-
//准备数据
-
for(int i=1; i<=10; i++){
-
Insert insert = kuduTable.newInsert();
-
//设置字段的内容
-
insert.getRow().addInt("CompanyId",i);
-
insert.getRow().addInt("WorkId",i);
-
insert.getRow().addString("Name","lisi"+i);
-
insert.getRow().addString("Gender","male");
-
insert.getRow().addString("Photo","person"+i);
-
kuduSession.flush();
-
kuduSession.apply(insert);
-
}
-
kuduSession.close();
-
kuduClient.close();
-
}
七、查询数据
-
/**
-
* 查询表数据
-
* @throws KuduException
-
*/
-
@Test
-
public void queryData() throws KuduException {
-
//打开表
-
KuduTable kuduTable = kuduClient.openTable(tableName);
-
//获取scanner扫描器
-
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
-
KuduScanner scanner = scannerBuilder.build();
-
//遍历
-
while(scanner.hasMoreRows()){
-
RowResultIterator rowResults = scanner.nextRows();
-
while (rowResults.hasNext()){
-
RowResult result = rowResults.next();
-
int companyId = result.getInt("CompanyId");
-
int workId = result.getInt("WorkId");
-
String name = result.getString("Name");
-
String gender = result.getString("Gender");
-
String photo = result.getString("Photo");
-
System.out.print("companyId:"+companyId+" ");
-
System.out.print("workId:"+workId+" ");
-
System.out.print("name:"+name+" ");
-
System.out.print("gender:"+gender+" ");
-
System.out.println("photo:"+photo);
-
}
-
}
-
//关闭
-
scanner.close();
-
kuduClient.close();
-
}
八、修改数据
-
/**
-
* 修改数据
-
* @throws KuduException
-
*/
-
@Test
-
public void upDATEData() throws KuduException {
-
//打开表
-
KuduTable kuduTable = kuduClient.openTable(tableName);
-
//构建kuduSession对象
-
KuduSession kuduSession = kuduClient.newSession();
-
//设置刷新数据模式,自动提交
-
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-
-
//更新数据需要获取UpDATE对象
-
UpDATE upDATE = kuduTable.newUpDATE();
-
//获取row对象
-
PartialRow row = upDATE.getRow();
-
//设置要更新的数据信息
-
row.addInt("CompanyId",1);
-
row.addString("Name","kobe");
-
//操作这个upDATE对象
-
kuduSession.apply(upDATE);
-
kuduSession.close();
-
}
九、删除数据
-
/**
-
* 删除表中的数据
-
*/
-
@Test
-
public void deleteData() throws KuduException {
-
//打开表
-
KuduTable kuduTable = kuduClient.openTable(tableName);
-
KuduSession kuduSession = kuduClient.newSession();
-
//获取Delete对象
-
Delete delete = kuduTable.newDelete();
-
//构建要删除的行对象
-
PartialRow row = delete.getRow();
-
//设置删除数据的条件
-
row.addInt("CompanyId",2);
-
kuduSession.flush();
-
kuduSession.apply(delete);
-
kuduSession.close();
-
kuduClient.close();
-
}
十、修改表
-
package cn.it.kudu;
-
-
import org.apache.kudu.ColumnSchema;
-
import org.apache.kudu.Type;
-
import org.apache.kudu.client.*;
-
import org.junit.Before;
-
import org.junit.Test;
-
-
import java.util.List;
-
-
/**
-
* 修改表操作
-
*/
-
public class AlterTable {
-
//定义kudu的客户端对象
-
private static KuduClient kuduClient;
-
//定义一张表名称
-
private static String tableName = "person";
-
-
/**
-
* 初始化操作
-
*/
-
@Before
-
public void init() {
-
//指定kudu的master地址
-
String masterAddress = "node2.cn";
-
//创建kudu的数据库连接
-
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
-
}
-
-
/**
-
* 添加列
-
*/
-
@Test
-
public void alterTableAddColumn() {
-
AlterTableOptions alterTableOptions = new AlterTableOptions();
-
alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
-
try {
-
kuduClient.alterTable(tableName, alterTableOptions);
-
} catch (KuduException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 删除列
-
*/
-
@Test
-
public void alterTableDeleteColumn(){
-
AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
-
try {
-
kuduClient.alterTable(tableName, alterTableOptions);
-
} catch (KuduException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 添加分区列
-
*/
-
@Test
-
public void alterTableAddRangePartition(){
-
int lowerValue = 110;
-
int upperValue = 120;
-
try {
-
KuduTable kuduTable = kuduClient.openTable(tableName);
-
List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
-
boolean flag = true;
-
for (Partition rangePartition : rangePartitions) {
-
int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
-
if(startKey == lowerValue){
-
flag = false;
-
}
-
}
-
if(flag) {
-
PartialRow lower = kuduTable.getSchema().newPartialRow();
-
lower.addInt("Id", lowerValue);
-
PartialRow upper = kuduTable.getSchema().newPartialRow();
-
upper.addInt("Id", upperValue);
-
kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
-
}else{
-
System.out.println("分区已经存在,不能重复创建!");
-
}
-
} catch (KuduException e) {
-
e.printStackTrace();
-
} catch (Exception exception) {
-
exception.printStackTrace();
-
}
-
}
-
-
/**
-
* 删除表
-
* @throws KuduException
-
*/
-
@Test
-
public void dropTable() throws KuduException {
-
kuduClient.deleteTable(tableName);
-
}
-
}
十一、删除表
-
/**
-
* 删除表
-
*/
-
@Test
-
public void dropTable() throws KuduException {
-
//删除表
-
DeleteTableResponse response = kuduClient.deleteTable(tableName);
-
//关闭客户端连接
-
kuduClient.close();
-
}
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/123012993
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)