Hbase 2 Java Api 使用详解

教程分享 > Java教程 (1128) 2024-04-17 12:33:20

Hbase2环境搭建

参考:Hbase2.x docker环境-左搜 (leftso.com)

项目创建

创建一个基于spring boot的项目(为了方便一些操作),并引入hbase2的client api包
项目创建

 

项目源码下载:demo-boot-hbase2.zip(9987)


maven pom.xml依赖引入:

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.1.3</version>
</dependency>

 

创建Hbase相关配置类HbaseProperties

@Data
//@Component
@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
    /**
     * 配置
     */
    private Map<String, String> config;
    /**
     * hbase home目录
     * 如果系统配置了环境变量HADOOP_HOME则不需要配置这里
     */
    private String hadoopHome;
}

 

在application.yml添加hbase相关配置

hbase:
  config:
    hbase:
      master: 192.168.79.132:16000
      zookeeper:
        property:
          clientPort: 2181
        quorum: 192.168.79.132
    zookeeper:
      znode:
        parent: /hbase
  hadoop-home: D:\software\HADOOP_HOME

hadoop-home配置目录,Windows请下载源码,找的里面的hadoop-common-2.7.0-bin.zip,解压后就是这个配置的目录

创建hbase配置类HbaseConfig

@org.springframework.context.annotation.Configuration
@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseConfig {

    private final HbaseProperties prop;

    public HbaseConfig(HbaseProperties properties) {
        this.prop = properties;
    }

    @Bean
    public Configuration configuration() {
        //配置
        if (StringUtils.isNotBlank(prop.getHadoopHome())){
            System.setProperty("hadoop.home.dir",prop.getHadoopHome());
        }
        Configuration configuration = HBaseConfiguration.create();
        Map<String, String> config = prop.getConfig();
//        configuration.set("hbase.zookeeper.quorum", prop.getConfig().get("zookeeper.quorum"));
//        configuration.set("zookeeper.znode.parent", "/hbase");
//        configuration.set("hbase.master", prop.getConfig().get("zookeeper.quorum")+":16000");
        config.forEach(configuration::set);
        return configuration;
    }

    @Bean
    public Connection getConnection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }
}

 

创建Hbase2 操作api接口IHbaseService

public interface IHbaseService {
    /**
     * 创建表
     * @param tableName 表名
     * @param columnFamily 族名
     * @throws IOException
     */
    void createTable(String tableName, List<String> columnFamily);

    /**
     * 预分区创建表
     * @param tableName 表名
     * @param columnFamily  族名
     * @param splitKeys 预分区region
     */
    void createTableBySplitKeys(String tableName, List<String> columnFamily,byte[][]splitKeys);

    /**
     * 创建预分区region key
     * @param keys 预分区region key
     */
    byte[][] createSplitKeys(String[] keys);

    /**
     * 创建16进制预分区region key
     * @param startKey 开始key
     * @param endKey 结束key
     * @param regionNum 分区数量
     * @return
     */
    byte[][] createHexSplitKeys(String startKey,String endKey,int regionNum);

    /**
     * 删除表
     * @param tableName
     * @throws IOException
     */
    void deleteTable(String tableName);

    /**
     * 列出所有表
     * @return
     * @throws IOException
     */
    List<String> listTables();

    /**
     * 添加数据
     * @param tableName 表名
     * @param rowKey key
     * @param columnFamily
     * @param columns
     * @param values
     */
    void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values);

    /***
     * 添加数据
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param map
     */
    void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map);

    /**
     * 根据rowKey获取行数据
     * @param tableName
     * @param rowKey
     * @return
     */
    LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily);

    /***
     * 根据rowKey获取行数据,并指定返回行的那些字段(columns)
     * @param tableName 表名
     * @param rowKey 行id
     * @param columnFamily cf
     * @param columns 指定返回字段名称
     * @return
     */
    LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily,  List<byte[]> columns);
    /***
     * 删除行
     * @param tableName
     * @param rowKey
     * @return
     */
    boolean deleteRow(String tableName, String rowKey);

    /**
     *
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param column
     * @param value
     */
    void setColumnValue(String tableName, String rowKey, String familyName, String column, String value);

    /***
     * 删除列族
     * @param tableName 表名
     * @param rowKey 行id
     * @param columnFamily 列族名
     */
    void deleteColumnFamily(String tableName, String rowKey, String columnFamily);

    /***
     * 删除列
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param columnName
     * @return
     */
    boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName);


    /**
     * 获取表所有数据
     * @param tableName
     * @return 返回map说明
     * 第一层
     *  key-> rowKey
     *  value-> 数据
     *
     * 第二层
     *  key   -> columnFamily:columnName
     *  value -> 具体数据值
     */
    Map<String, Map<String, String>> findAll(String tableName);

    /**
     * 根据 startKey-stopKey查询之间的数据
     * @param tableName 表名
     * @param startRowKey 开始行id
     * @param stopRowKey 结束行id
     * @return
     */
    Map<String, Map<String, String>> findList(String tableName,String startRowKey,String stopRowKey);
    /**
     * 根据 startKey-stopKey并指定columnFamily查询之间的数据
     * @param tableName 表名
     * @param columnFamily 列族名
     * @param startRowKey 开始行id
     * @param stopRowKey 结束行id
     * @return
     */
    Map<String, Map<String, String>> findList(String tableName,String columnFamily,String startRowKey,String stopRowKey);


Hbase api操作实现类IHbaseService

@Slf4j
@Service
public class HbaseServiceImpl implements IHbaseService {

    @Resource
    Connection connection;

    /**
     * 创建表
     * - 只有一个列族
     * @param tableName 表名
     * @param columnFamily 列族
     * @throws IOException
     */
    public void createTable(String tableName, List<String> columnFamily) {
        TableName table = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(table)) {
                log.warn("表[{}]已存在!", tableName);
                return;
            }
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                        Bytes.toBytes(cf)).build());
            });
            TableDescriptor tableDesc = TableDescriptorBuilder
                    .newBuilder(TableName.valueOf(tableName))
                    .setColumnFamilies(cfDesc).build();
            admin.createTable(tableDesc);
        }catch (Exception e){
            log.error("创建表错误:",e);
        }
    }

    @Override
    public void createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
        TableName table = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(table)) {
                log.warn("表[{}]已存在!", tableName);
                return;
            }
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                        Bytes.toBytes(cf)).build());
            });
            TableDescriptor tableDesc = TableDescriptorBuilder
                    .newBuilder(TableName.valueOf(tableName))
                    .setColumnFamilies(cfDesc).build();
            admin.createTable(tableDesc,splitKeys);
        }catch (Exception e){
            log.error("创建表错误:",e);
        }
    }

    @Override
    public byte[][] createSplitKeys(String[] keys) {
        if(keys==null){
            //默认为3个分区
            keys = new String[] { "1", "2", "3"};
        }
        byte[][] splitKeys = new byte[keys.length][];
        //升序排序
        TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
        for(String key : keys){
            rows.add(Bytes.toBytes(key));
        }
        Iterator<byte[]> rowKeyIter = rows.iterator();
        int i=0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            splitKeys[i] = tempRow;
            i++;
        }
        return splitKeys;
    }

    @Override
    public byte[][] createHexSplitKeys(String startKey, String endKey, int regionNum) {
        byte[][] splits = new byte[regionNum-1][];
        BigInteger lowestKey = new BigInteger(startKey, 16);
        BigInteger highestKey = new BigInteger(endKey, 16);
        BigInteger range = highestKey.subtract(lowestKey);
        BigInteger regionIncrement = range.divide(BigInteger.valueOf(regionNum));
        lowestKey = lowestKey.add(regionIncrement);
        for(int i=0; i < regionNum-1;i++) {
            BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
            byte[] b = String.format("%016x", key).getBytes();
            splits[i] = b;
        }
        return splits;
    }

    /**
     * 删除表
     * @param tableName 表名称
     */
    public void deleteTable(String tableName) {
        TableName tName = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(tName)) {
                admin.disableTable(tName);
                admin.deleteTable(tName);
            } else {
                log.error("表 {} 不存在!", tableName);
                return;
            }
        }catch (Exception e){
            log.error("删除表错误:",e);
        }
    }

    /**
     * 列出hbase中所有的表
     */
    public List<String> listTables() {
        List<String> tables = new ArrayList<>(8);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            TableName[] tableNames = admin.listTableNames();
            for (TableName tableName : tableNames) {
                tables.add(tableName.getNameAsString());
            }
            return tables;
        }catch (Exception e){
            log.error("获取表信息错误:",e);
            return new ArrayList<>();
        }
    }

    @Override
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values) {
        try (Table table = connection.getTable(TableName.valueOf(tableName));){
            Put put=new Put(Bytes.toBytes(rowKey));
            if (columns!=null && values!=null&&columns.length==values.length){
                for (int i=0;i<columns.length;i++){
                    if (columns[i] != null && values[i] != null) {
                        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    } else {
                        throw new NullPointerException(MessageFormat.format(
                                "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                    }
                }

                table.put(put);
                log.debug("putData add or update data Success,rowKey:" + rowKey);
            }
        }catch (Exception e){
            log.error(MessageFormat.format(
                    "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
                    tableName, rowKey, columnFamily), e);
        }
    }


    @Override
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map) {

        try (Table table = connection.getTable(TableName.valueOf(tableName))){
            //设置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));
            if(!CollectionUtils.isEmpty(map)) {
                for(Map.Entry<String, String> cell : map.entrySet()) {
                    String key = cell.getKey();
                    String value = cell.getValue();
                    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(key), Bytes.toBytes(value));
                }
                table.put(put);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public LinkedHashMap<String, String> getRowData(String tableName, String rowKey,String columnFamily) {
        return getRowData(tableName,rowKey, columnFamily,null);
    }

    @Override
    public LinkedHashMap<String, String> getRowData(String tableName, String rowKey, String columnFamily, List<byte[]> columns) {
        System.err.println("表:"+tableName+"   roeKey:"+rowKey +"  查询列个数:"+columns);
        //返回的键值对
        LinkedHashMap<String,String> result = new LinkedHashMap<>();

        Get get = new Get(Bytes.toBytes(rowKey));
        get.addFamily(columnFamily.getBytes());
        if (Objects.nonNull(columns) && columns.size() > 0){
            for (int i = 0; i < columns.size(); i++) {
                get.addColumn(columnFamily.getBytes(),columns.get(i));
            }
        }
        try ( Table table= connection.getTable(TableName.valueOf(tableName))){
            Result hTableResult = table.get(get);
            if (hTableResult != null && !hTableResult.isEmpty()) {

                for (Cell cell : hTableResult.listCells()) {
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        }catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("hbase查询异常");
        }
        System.err.println("当行记录:"+result);
        return result;
    }

    @Override
    public boolean deleteRow(String tableName, String rowKey) {
        try(Admin admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {
            if(admin.tableExists(TableName.valueOf(tableName))){
                // 获取表
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                table.delete(delete);
            }
        }catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    @Override
    public void setColumnValue(String tableName, String rowKey, String familyName, String column, String value) {
        try(Table table=connection.getTable(TableName.valueOf(tableName))) {
            // 设置rowKey
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column), Bytes.toBytes(value));

            table.put(put);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) {
        try (Table table = connection.getTable(TableName.valueOf(tableName))){
            Delete delete = new Delete(rowKey.getBytes());
            delete.addFamily(Bytes.toBytes(columnFamily));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName) {
        try(Admin  admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {

            if(admin.tableExists(TableName.valueOf(tableName))){
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                // 设置待删除的列
                delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
                table.delete(delete);
            }

        }catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return false;
    }

    /**
     * 获取表数据
     * @param tableName 表名
     * @return 返回map说明
     * 第一层
     *  key-> rowKey
     *  value-> 数据
     *
     * 第二层
     *  key   -> columnFamily:columnName
     *  value -> 具体数据值
     */
    private LinkedHashMap<String, Map<String, String>> queryData(String tableName, Scan scan) {
        // <rowKey,对应的行数据>
        LinkedHashMap<String, Map<String,String>> result = new LinkedHashMap<>();
        try(Table table=connection.getTable(TableName.valueOf(tableName)); ResultScanner rs=table.getScanner(scan)) {
            for (Result r : rs) {
                String rowKey = null;
                // 行键,列族和列限定符一起确定一个单元(Cell)
                for (Cell cell : r.listCells()) {
                    String columnFamily = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
                    Map<String,String> rowMap = result.get(rowKey);
                    if (rowKey == null) {
                        rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    }
                    if (Objects.isNull(rowMap)){
                        rowMap=new LinkedHashMap<>();
                        result.put(rowKey,rowMap);
                    }
                    String columnName=Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    String columnValue=Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    String key=columnFamily+":"+columnName;
                    System.out.println(key+"  :  "+columnValue);
                    rowMap.put(key,columnValue);
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", tableName), e);
        }
        return result;
    }

    @Override
    public Map<String, Map<String, String>> findAll(String tableName) {
        Scan scan = new Scan();
        return this.queryData(tableName, scan);
    }

    @Override
    public Map<String, Map<String, String>> findList(String tableName, String startRowKey, String stopRowKey) {
        Scan scan=new Scan();
        if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }
        return this.queryData(tableName, scan);
    }

    @Override
    public Map<String, Map<String, String>> findList(String tableName, String columnFamily, String startRowKey, String stopRowKey) {
        Scan scan=new Scan();
        scan.addFamily(Bytes.toBytes(columnFamily));
        if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }
        return this.queryData(tableName, scan);
    }

 

接下来就是演示调用service对hbase进行操作。

@SpringBootTest
class DemoBootHbase2ApplicationTests {

    @Resource
    IHbaseService hbaseService;

    @Test
    public void tableCreate(){
        hbaseService.createTable("test_base", Arrays.asList("a", "back"));
    }


    @Test
    public void listTable(){
        List<String> strings = hbaseService.listTables();
        System.out.println(String.join(",",strings));
    }

    @Test
    public void delTable(){
        hbaseService.deleteTable("test_base");
    }

    @Test
    public void testPutData(){
        hbaseService.insertOrUpdate("test_base", "000001", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                "0.0000", "12.2293", "null"});
        hbaseService.insertOrUpdate("test_base", "000002", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
                "0.0000", "9.8679", "null"});
        hbaseService.insertOrUpdate("test_base", "000003", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "education", "0.8984",
                "0.0000", "25.5649", "null"});

        Map<String, Map<String,String>> test_base = hbaseService.findAll("test_base");
        System.out.println(0);
    }

    @Test
    public void putMapData(){
        LinkedHashMap<String,String> dataMap=new LinkedHashMap<>();
        int tmp= 20211201;
        for (int i=1;i<20;i++) {
            dataMap.put("userName","李"+i);
            dataMap.put("userAge",String.valueOf(20+i));
            hbaseService.insertOrUpdate("test_base","10000_"+tmp,"a",dataMap);
            dataMap.clear();
            tmp++;
        }
    }

    @Test
    public void findList(){
        Map<String, Map<String, String>> test_base = hbaseService.findList("test_base", "10000_20211201", "10000_20211201");
        System.out.println(test_base);
    }


    @Test
    public void setCol(){
        hbaseService.setColumnValue("test_base","000001","a","姓名","aa");
        LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000","a");
        System.out.println(rowData);
        hbaseService.setColumnValue("test_base","000001","a","姓名","abb");
        rowData = hbaseService.getRowData("test_base", "000","back",Arrays.asList("姓名".getBytes()));
        System.out.println(rowData);
    }

    @Test
    public void getRowData(){
        LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000001","a");
        System.out.println(JSON.toJSONString(rowData));
        rowData= hbaseService.getRowData("test_base", "000001","back");
        System.out.println(JSON.toJSONString(rowData));
    }

    @Test
    public void queryAll(){
        Map<String, Map<String, String>> test_base = hbaseService.findAll("test_base");
         System.out.println(JSON.toJSONString(test_base));
    }
}


 
https://www.leftso.com/article/1045.html

相关文章
Hbase2环境搭建参考:Hbase2.x docker环境-左搜 (leftso.com)项目创建 创建一个基于spring boot的项目(为了方便一些操作),并引入hbase2的clien...
拉取镜像sudo docker pull harisekhon/hbase 运行容器sudo docker run -d -h myhbase -p 2181:2181 -p 8080:8...
Java编程之spring boot FastDFS Java client使用,Java编程,FastDFS Java客户端
Java连接redis启动报错Error redis clients jedis HostAndPort cant resolve localhost address
使用OAuth2安全的Spring REST API,Secure Spring REST API using OAuth2(含demo代码下载)
Java编程中纯jdk java方式编写webservice服务(server)和客服端(client)
java WEB中Cookie的操作和使用,java,cookie,web
备受期待的Java Enterprise Edition 8发布了两个令人兴奋的新API(JSON-Binding 1.0和Java EE Security 1.0)并改进了当前的API(JAX...
spring boot 入门之security oauth2 jwt完美整合例子,Java编程中spring boot框架+spring security框架+spring security o...
创建REST API时,良好的文档是有帮助的。而且,API中的每一个变化都应该在参考文档中同时描述。手动完成这是一个乏味的操作,因此这个过程的自动化是不可避免的。
Java 数据库连接 (JDBC)是标准应用程序编程接口 (API) 的 JavaSoft 规范,它允许 Java 程序访问数据库管理系统
引言在这篇文章中,我们将讨论如何使用Spring Boot Security OAuth2保护REST API
前言       在本教程中,我们将了解Retrofit 的基础知识以和创建一个android HTTP client请求REST API
Java编程之Spring Boot 文件上传 REST风格API ajax方式
spring boot webflux client实战,webclient是spring webflux的一个小组件。对于Java的http通讯来说,webclient是非常简单易用的。