客快物流大数据项目(四十二):Java代码操作Kudu

举报
Lansonli 发表于 2022/02/20 00:40:21 2022/02/20
【摘要】 目录 Java代码操作Kudu 一、构建maven工程 二、导入依赖 三、​​​​​​​创建包结构 四、​​​​​​​初始化方法 五、​​​​​​​创建表 六、​​​​​​​插入数据 七、​​​​​​​查询数据 八、修改数据 九、​​​​​​​删除数据 十、​​​​​​​修改表 十一、​​​​​​​删除表 ...

目录

Java代码操作Kudu

一、构建maven工程

二、导入依赖

三、​​​​​​​创建包结构

四、​​​​​​​初始化方法

五、​​​​​​​创建表

六、​​​​​​​插入数据

七、​​​​​​​查询数据

八、修改数据

九、​​​​​​​删除数据

十、​​​​​​​修改表

十一、​​​​​​​删除表


Java代码操作Kudu

一、​​​​​​​构建maven工程

二、导入依赖


  
  1. <repositories>
  2. <repository>
  3. <id>cloudera</id>
  4. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  5. </repository>
  6. </repositories>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.apache.kudu</groupId>
  10. <artifactId>kudu-client</artifactId>
  11. <version>1.9.0-cdh6.2.1</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>junit</groupId>
  15. <artifactId>junit</artifactId>
  16. <version>4.12</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.kudu</groupId>
  20. <artifactId>kudu-client-tools</artifactId>
  21. <version>1.9.0-cdh6.2.1</version>
  22. </dependency>
  23. <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
  24. <dependency>
  25. <groupId>org.apache.kudu</groupId>
  26. <artifactId>kudu-spark2_2.11</artifactId>
  27. <version>1.9.0-cdh6.2.1</version>
  28. </dependency>
  29. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-sql_2.11</artifactId>
  33. <version>2.1.0</version>
  34. </dependency>
  35. </dependencies>

三、​​​​​​​创建包结构

包名

说明

cn.it

代码所在的包目录

 

四、​​​​​​​初始化方法


  
  1. package cn.it;
  2. import org.apache.kudu.ColumnSchema;
  3. import org.apache.kudu.Type;
  4. import org.apache.kudu.client.KuduClient;
  5. import org.junit.Before;
  6. public class TestKudu {
  7. //定义KuduClient客户端对象
  8. private static KuduClient kuduClient;
  9. //定义表名
  10. private static String tableName = "person";
  11. /**
  12. * 初始化方法
  13. */
  14. @Before
  15. public void init() {
  16. //指定master地址
  17. String masterAddress = "node2.cn";
  18. //创建kudu的数据库连接
  19. kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
  20. }
  21. //构建表schema的字段信息
  22. //字段名称 数据类型 是否为主键
  23. public ColumnSchema newColumn(String name, Type type, boolean isKey) {
  24. ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
  25. column.key(isKey);
  26. return column.build();
  27. }
  28. }

五、​​​​​​​创建表


  
  1. /** 使用junit进行测试
  2. *
  3. * 创建表
  4. * @throws KuduException
  5. */
  6. @Test
  7. public void createTable() throws KuduException {
  8. //设置表的schema
  9. List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
  10. columns.add(newColumn("CompanyId", Type.INT32, true));
  11. columns.add(newColumn("WorkId", Type.INT32, false));
  12. columns.add(newColumn("Name", Type.STRING, false));
  13. columns.add(newColumn("Gender", Type.STRING, false));
  14. columns.add(newColumn("Photo", Type.STRING, false));
  15. Schema schema = new Schema(columns);
  16. //创建表时提供的所有选项
  17. CreateTableOptions tableOptions = new CreateTableOptions();
  18. //设置表的副本和分区规则
  19. LinkedList<String> list = new LinkedList<String>();
  20. list.add("CompanyId");
  21. //设置表副本数
  22. tableOptions.setNumReplicas(1);
  23. //设置range分区
  24. //tableOptions.setRangePartitionColumns(list);
  25. //设置hash分区和分区的数量
  26. tableOptions.addHashPartitions(list, 3);
  27. try {
  28. kuduClient.createTable("person", schema, tableOptions);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. kuduClient.close();
  33. }

 

六、​​​​​​​插入数据


  
  1. /**
  2. * 向表中加载数据
  3. * @throws KuduException
  4. */
  5. @Test
  6. public void loadData() throws KuduException {
  7. //打开表
  8. KuduTable kuduTable = kuduClient.openTable(tableName);
  9. //创建KuduSession对象 kudu必须通过KuduSession写入数据
  10. KuduSession kuduSession = kuduClient.newSession();
  11. //采用flush方式 手动刷新
  12. kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
  13. kuduSession.setMutationBufferSpace(3000);
  14. //准备数据
  15. for(int i=1; i<=10; i++){
  16. Insert insert = kuduTable.newInsert();
  17. //设置字段的内容
  18. insert.getRow().addInt("CompanyId",i);
  19. insert.getRow().addInt("WorkId",i);
  20. insert.getRow().addString("Name","lisi"+i);
  21. insert.getRow().addString("Gender","male");
  22. insert.getRow().addString("Photo","person"+i);
  23. kuduSession.flush();
  24. kuduSession.apply(insert);
  25. }
  26. kuduSession.close();
  27. kuduClient.close();
  28. }

七、​​​​​​​查询数据


  
  1.  /**
  2. * 查询表数据
  3. * @throws KuduException
  4. */
  5. @Test
  6. public void queryData() throws KuduException {
  7. //打开表
  8. KuduTable kuduTable = kuduClient.openTable(tableName);
  9. //获取scanner扫描器
  10. KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
  11. KuduScanner scanner = scannerBuilder.build();
  12. //遍历
  13. while(scanner.hasMoreRows()){
  14. RowResultIterator rowResults = scanner.nextRows();
  15. while (rowResults.hasNext()){
  16. RowResult result = rowResults.next();
  17. int companyId = result.getInt("CompanyId");
  18. int workId = result.getInt("WorkId");
  19. String name = result.getString("Name");
  20. String gender = result.getString("Gender");
  21. String photo = result.getString("Photo");
  22. System.out.print("companyId:"+companyId+" ");
  23. System.out.print("workId:"+workId+" ");
  24. System.out.print("name:"+name+" ");
  25. System.out.print("gender:"+gender+" ");
  26. System.out.println("photo:"+photo);
  27. }
  28. }
  29. //关闭
  30. scanner.close();
  31. kuduClient.close();
  32. }

 

八、修改数据


  
  1. /**
  2. * 修改数据
  3. * @throws KuduException
  4. */
  5. @Test
  6. public void upDATEData() throws KuduException {
  7. //打开表
  8. KuduTable kuduTable = kuduClient.openTable(tableName);
  9. //构建kuduSession对象
  10. KuduSession kuduSession = kuduClient.newSession();
  11. //设置刷新数据模式,自动提交
  12. kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
  13. //更新数据需要获取UpDATE对象
  14. UpDATE upDATE = kuduTable.newUpDATE();
  15. //获取row对象
  16. PartialRow row = upDATE.getRow();
  17. //设置要更新的数据信息
  18. row.addInt("CompanyId",1);
  19. row.addString("Name","kobe");
  20. //操作这个upDATE对象
  21. kuduSession.apply(upDATE);
  22. kuduSession.close();
  23. }

 

九、​​​​​​​删除数据


  
  1. /**
  2. * 删除表中的数据
  3. */
  4. @Test
  5. public void deleteData() throws KuduException {
  6. //打开表
  7. KuduTable kuduTable = kuduClient.openTable(tableName);
  8. KuduSession kuduSession = kuduClient.newSession();
  9. //获取Delete对象
  10. Delete delete = kuduTable.newDelete();
  11. //构建要删除的行对象
  12. PartialRow row = delete.getRow();
  13. //设置删除数据的条件
  14. row.addInt("CompanyId",2);
  15. kuduSession.flush();
  16. kuduSession.apply(delete);
  17. kuduSession.close();
  18. kuduClient.close();
  19. }

 

十、​​​​​​​修改表


  
  1. package cn.it.kudu;
  2. import org.apache.kudu.ColumnSchema;
  3. import org.apache.kudu.Type;
  4. import org.apache.kudu.client.*;
  5. import org.junit.Before;
  6. import org.junit.Test;
  7. import java.util.List;
  8. /**
  9. * 修改表操作
  10. */
  11. public class AlterTable {
  12. //定义kudu的客户端对象
  13. private static KuduClient kuduClient;
  14. //定义一张表名称
  15. private static String tableName = "person";
  16. /**
  17. * 初始化操作
  18. */
  19. @Before
  20. public void init() {
  21. //指定kudu的master地址
  22. String masterAddress = "node2.cn";
  23. //创建kudu的数据库连接
  24. kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
  25. }
  26. /**
  27. * 添加列
  28. */
  29. @Test
  30. public void alterTableAddColumn() {
  31. AlterTableOptions alterTableOptions = new AlterTableOptions();
  32. alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
  33. try {
  34. kuduClient.alterTable(tableName, alterTableOptions);
  35. } catch (KuduException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. /**
  40. * 删除列
  41. */
  42. @Test
  43. public void alterTableDeleteColumn(){
  44. AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
  45. try {
  46. kuduClient.alterTable(tableName, alterTableOptions);
  47. } catch (KuduException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. /**
  52. * 添加分区列
  53. */
  54. @Test
  55. public void alterTableAddRangePartition(){
  56. int lowerValue = 110;
  57. int upperValue = 120;
  58. try {
  59. KuduTable kuduTable = kuduClient.openTable(tableName);
  60. List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
  61. boolean flag = true;
  62. for (Partition rangePartition : rangePartitions) {
  63. int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
  64. if(startKey == lowerValue){
  65. flag = false;
  66. }
  67. }
  68. if(flag) {
  69. PartialRow lower = kuduTable.getSchema().newPartialRow();
  70. lower.addInt("Id", lowerValue);
  71. PartialRow upper = kuduTable.getSchema().newPartialRow();
  72. upper.addInt("Id", upperValue);
  73. kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
  74. }else{
  75. System.out.println("分区已经存在,不能重复创建!");
  76. }
  77. } catch (KuduException e) {
  78. e.printStackTrace();
  79. } catch (Exception exception) {
  80. exception.printStackTrace();
  81. }
  82. }
  83. /**
  84. * 删除表
  85. * @throws KuduException
  86. */
  87. @Test
  88. public void dropTable() throws KuduException {
  89. kuduClient.deleteTable(tableName);
  90. }
  91. }

十一、​​​​​​​删除表


  
  1. /**
  2. * 删除表
  3. */
  4. @Test
  5. public void dropTable() throws KuduException {
  6. //删除表
  7. DeleteTableResponse response = kuduClient.deleteTable(tableName);
  8. //关闭客户端连接
  9. kuduClient.close();
  10. }


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

 

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123012993

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。