Hbase 2 Java Api 使用详解

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(765)   2024-04-17 12:33:20

Hbase2环境搭建

参考:Hbase2.x docker环境-左搜 (leftso.com)

项目创建

创建一个基于spring boot的项目(为了方便一些操作),并引入hbase2的client api包
项目创建

 

项目源码下载:demo-boot-hbase2.zip(9987)


maven pom.xml依赖引入:

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.1.3</version>
</dependency>

 

创建Hbase相关配置类HbaseProperties

@Data
//@Component
@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
    /**
     * 配置
     */
    private Map<String, String> config;
    /**
     * hbase home目录
     * 如果系统配置了环境变量HADOOP_HOME则不需要配置这里
     */
    private String hadoopHome;
}

 

在application.yml添加hbase相关配置

hbase:
  config:
    hbase:
      master: 192.168.79.132:16000
      zookeeper:
        property:
          clientPort: 2181
        quorum: 192.168.79.132
    zookeeper:
      znode:
        parent: /hbase
  hadoop-home: D:\software\HADOOP_HOME

hadoop-home配置目录,Windows请下载源码,找的里面的hadoop-common-2.7.0-bin.zip,解压后就是这个配置的目录

创建hbase配置类HbaseConfig

@org.springframework.context.annotation.Configuration
@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseConfig {

    private final HbaseProperties prop;

    public HbaseConfig(HbaseProperties properties) {
        this.prop = properties;
    }

    @Bean
    public Configuration configuration() {
        //配置
        if (StringUtils.isNotBlank(prop.getHadoopHome())){
            System.setProperty("hadoop.home.dir",prop.getHadoopHome());
        }
        Configuration configuration = HBaseConfiguration.create();
        Map<String, String> config = prop.getConfig();
//        configuration.set("hbase.zookeeper.quorum", prop.getConfig().get("zookeeper.quorum"));
//        configuration.set("zookeeper.znode.parent", "/hbase");
//        configuration.set("hbase.master", prop.getConfig().get("zookeeper.quorum")+":16000");
        config.forEach(configuration::set);
        return configuration;
    }

    @Bean
    public Connection getConnection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }
}

 

创建Hbase2 操作api接口IHbaseService

public interface IHbaseService {
    /**
     * 创建表
     * @param tableName 表名
     * @param columnFamily 族名
     * @throws IOException
     */
    void createTable(String tableName, List<String> columnFamily);

    /**
     * 预分区创建表
     * @param tableName 表名
     * @param columnFamily  族名
     * @param splitKeys 预分区region
     */
    void createTableBySplitKeys(String tableName, List<String> columnFamily,byte[][]splitKeys);

    /**
     * 创建预分区region key
     * @param keys 预分区region key
     */
    byte[][] createSplitKeys(String[] keys);

    /**
     * 创建16进制预分区region key
     * @param startKey 开始key
     * @param endKey 结束key
     * @param regionNum 分区数量
     * @return
     */
    byte[][] createHexSplitKeys(String startKey,String endKey,int regionNum);

    /**
     * 删除表
     * @param tableName
     * @throws IOException
     */
    void deleteTable(String tableName);

    /**
     * 列出所有表
     * @return
     * @throws IOException
     */
    List<String> listTables();

    /**
     * 添加数据
     * @param tableName 表名
     * @param rowKey key
     * @param columnFamily
     * @param columns
     * @param values
     */
    void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values);

    /***
     * 添加数据
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param map
     */
    void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map);

    /**
     * 根据rowKey获取行数据
     * @param tableName
     * @param rowKey
     * @return
     */
    LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily);

    /***
     * 根据rowKey获取行数据,并指定返回行的那些字段(columns)
     * @param tableName 表名
     * @param rowKey 行id
     * @param columnFamily cf
     * @param columns 指定返回字段名称
     * @return
     */
    LinkedHashMap<String,String> getRowData(String tableName, String rowKey,String columnFamily,  List<byte[]> columns);
    /***
     * 删除行
     * @param tableName
     * @param rowKey
     * @return
     */
    boolean deleteRow(String tableName, String rowKey);

    /**
     *
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param column
     * @param value
     */
    void setColumnValue(String tableName, String rowKey, String familyName, String column, String value);

    /***
     * 删除列族
     * @param tableName 表名
     * @param rowKey 行id
     * @param columnFamily 列族名
     */
    void deleteColumnFamily(String tableName, String rowKey, String columnFamily);

    /***
     * 删除列
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param columnName
     * @return
     */
    boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName);


    /**
     * 获取表所有数据
     * @param tableName
     * @return 返回map说明
     * 第一层
     *  key-> rowKey
     *  value-> 数据
     *
     * 第二层
     *  key   -> columnFamily:columnName
     *  value -> 具体数据值
     */
    Map<String, Map<String, String>> findAll(String tableName);

    /**
     * 根据 startKey-stopKey查询之间的数据
     * @param tableName 表名
     * @param startRowKey 开始行id
     * @param stopRowKey 结束行id
     * @return
     */
    Map<String, Map<String, String>> findList(String tableName,String startRowKey,String stopRowKey);
    /**
     * 根据 startKey-stopKey并指定columnFamily查询之间的数据
     * @param tableName 表名
     * @param columnFamily 列族名
     * @param startRowKey 开始行id
     * @param stopRowKey 结束行id
     * @return
     */
    Map<String, Map<String, String>> findList(String tableName,String columnFamily,String startRowKey,String stopRowKey);


Hbase api操作实现类IHbaseService

@Slf4j
@Service
public class HbaseServiceImpl implements IHbaseService {

    @Resource
    Connection connection;

    /**
     * 创建表
     * - 只有一个列族
     * @param tableName 表名
     * @param columnFamily 列族
     * @throws IOException
     */
    public void createTable(String tableName, List<String> columnFamily) {
        TableName table = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(table)) {
                log.warn("表[{}]已存在!", tableName);
                return;
            }
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                        Bytes.toBytes(cf)).build());
            });
            TableDescriptor tableDesc = TableDescriptorBuilder
                    .newBuilder(TableName.valueOf(tableName))
                    .setColumnFamilies(cfDesc).build();
            admin.createTable(tableDesc);
        }catch (Exception e){
            log.error("创建表错误:",e);
        }
    }

    @Override
    public void createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
        TableName table = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(table)) {
                log.warn("表[{}]已存在!", tableName);
                return;
            }
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                        Bytes.toBytes(cf)).build());
            });
            TableDescriptor tableDesc = TableDescriptorBuilder
                    .newBuilder(TableName.valueOf(tableName))
                    .setColumnFamilies(cfDesc).build();
            admin.createTable(tableDesc,splitKeys);
        }catch (Exception e){
            log.error("创建表错误:",e);
        }
    }

    @Override
    public byte[][] createSplitKeys(String[] keys) {
        if(keys==null){
            //默认为3个分区
            keys = new String[] { "1", "2", "3"};
        }
        byte[][] splitKeys = new byte[keys.length][];
        //升序排序
        TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
        for(String key : keys){
            rows.add(Bytes.toBytes(key));
        }
        Iterator<byte[]> rowKeyIter = rows.iterator();
        int i=0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            splitKeys[i] = tempRow;
            i++;
        }
        return splitKeys;
    }

    @Override
    public byte[][] createHexSplitKeys(String startKey, String endKey, int regionNum) {
        byte[][] splits = new byte[regionNum-1][];
        BigInteger lowestKey = new BigInteger(startKey, 16);
        BigInteger highestKey = new BigInteger(endKey, 16);
        BigInteger range = highestKey.subtract(lowestKey);
        BigInteger regionIncrement = range.divide(BigInteger.valueOf(regionNum));
        lowestKey = lowestKey.add(regionIncrement);
        for(int i=0; i < regionNum-1;i++) {
            BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
            byte[] b = String.format("%016x", key).getBytes();
            splits[i] = b;
        }
        return splits;
    }

    /**
     * 删除表
     * @param tableName 表名称
     */
    public void deleteTable(String tableName) {
        TableName tName = TableName.valueOf(tableName);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            if (admin.tableExists(tName)) {
                admin.disableTable(tName);
                admin.deleteTable(tName);
            } else {
                log.error("表 {} 不存在!", tableName);
                return;
            }
        }catch (Exception e){
            log.error("删除表错误:",e);
        }
    }

    /**
     * 列出hbase中所有的表
     */
    public List<String> listTables() {
        List<String> tables = new ArrayList<>(8);
        try (HBaseAdmin admin = (HBaseAdmin) connection.getAdmin()) {
            TableName[] tableNames = admin.listTableNames();
            for (TableName tableName : tableNames) {
                tables.add(tableName.getNameAsString());
            }
            return tables;
        }catch (Exception e){
            log.error("获取表信息错误:",e);
            return new ArrayList<>();
        }
    }

    @Override
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values) {
        try (Table table = connection.getTable(TableName.valueOf(tableName));){
            Put put=new Put(Bytes.toBytes(rowKey));
            if (columns!=null && values!=null&&columns.length==values.length){
                for (int i=0;i<columns.length;i++){
                    if (columns[i] != null && values[i] != null) {
                        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    } else {
                        throw new NullPointerException(MessageFormat.format(
                                "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                    }
                }

                table.put(put);
                log.debug("putData add or update data Success,rowKey:" + rowKey);
            }
        }catch (Exception e){
            log.error(MessageFormat.format(
                    "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
                    tableName, rowKey, columnFamily), e);
        }
    }


    @Override
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, LinkedHashMap<String, String> map) {

        try (Table table = connection.getTable(TableName.valueOf(tableName))){
            //设置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));
            if(!CollectionUtils.isEmpty(map)) {
                for(Map.Entry<String, String> cell : map.entrySet()) {
                    String key = cell.getKey();
                    String value = cell.getValue();
                    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(key), Bytes.toBytes(value));
                }
                table.put(put);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public LinkedHashMap<String, String> getRowData(String tableName, String rowKey,String columnFamily) {
        return getRowData(tableName,rowKey, columnFamily,null);
    }

    @Override
    public LinkedHashMap<String, String> getRowData(String tableName, String rowKey, String columnFamily, List<byte[]> columns) {
        System.err.println("表:"+tableName+"   roeKey:"+rowKey +"  查询列个数:"+columns);
        //返回的键值对
        LinkedHashMap<String,String> result = new LinkedHashMap<>();

        Get get = new Get(Bytes.toBytes(rowKey));
        get.addFamily(columnFamily.getBytes());
        if (Objects.nonNull(columns) && columns.size() > 0){
            for (int i = 0; i < columns.size(); i++) {
                get.addColumn(columnFamily.getBytes(),columns.get(i));
            }
        }
        try ( Table table= connection.getTable(TableName.valueOf(tableName))){
            Result hTableResult = table.get(get);
            if (hTableResult != null && !hTableResult.isEmpty()) {

                for (Cell cell : hTableResult.listCells()) {
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        }catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("hbase查询异常");
        }
        System.err.println("当行记录:"+result);
        return result;
    }

    @Override
    public boolean deleteRow(String tableName, String rowKey) {
        try(Admin admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {
            if(admin.tableExists(TableName.valueOf(tableName))){
                // 获取表
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                table.delete(delete);
            }
        }catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    @Override
    public void setColumnValue(String tableName, String rowKey, String familyName, String column, String value) {
        try(Table table=connection.getTable(TableName.valueOf(tableName))) {
            // 设置rowKey
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column), Bytes.toBytes(value));

            table.put(put);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) {
        try (Table table = connection.getTable(TableName.valueOf(tableName))){
            Delete delete = new Delete(rowKey.getBytes());
            delete.addFamily(Bytes.toBytes(columnFamily));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean deleteColumn(String tableName, String rowKey, String columnFamily, String columnName) {
        try(Admin  admin = connection.getAdmin();Table table=connection.getTable(TableName.valueOf(tableName))) {

            if(admin.tableExists(TableName.valueOf(tableName))){
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                // 设置待删除的列
                delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
                table.delete(delete);
            }

        }catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return false;
    }

    /**
     * 获取表数据
     * @param tableName 表名
     * @return 返回map说明
     * 第一层
     *  key-> rowKey
     *  value-> 数据
     *
     * 第二层
     *  key   -> columnFamily:columnName
     *  value -> 具体数据值
     */
    private LinkedHashMap<String, Map<String, String>> queryData(String tableName, Scan scan) {
        // <rowKey,对应的行数据>
        LinkedHashMap<String, Map<String,String>> result = new LinkedHashMap<>();
        try(Table table=connection.getTable(TableName.valueOf(tableName)); ResultScanner rs=table.getScanner(scan)) {
            for (Result r : rs) {
                String rowKey = null;
                // 行键,列族和列限定符一起确定一个单元(Cell)
                for (Cell cell : r.listCells()) {
                    String columnFamily = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
                    Map<String,String> rowMap = result.get(rowKey);
                    if (rowKey == null) {
                        rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    }
                    if (Objects.isNull(rowMap)){
                        rowMap=new LinkedHashMap<>();
                        result.put(rowKey,rowMap);
                    }
                    String columnName=Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    String columnValue=Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    String key=columnFamily+":"+columnName;
                    System.out.println(key+"  :  "+columnValue);
                    rowMap.put(key,columnValue);
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", tableName), e);
        }
        return result;
    }

    @Override
    public Map<String, Map<String, String>> findAll(String tableName) {
        Scan scan = new Scan();
        return this.queryData(tableName, scan);
    }

    @Override
    public Map<String, Map<String, String>> findList(String tableName, String startRowKey, String stopRowKey) {
        Scan scan=new Scan();
        if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }
        return this.queryData(tableName, scan);
    }

    @Override
    public Map<String, Map<String, String>> findList(String tableName, String columnFamily, String startRowKey, String stopRowKey) {
        Scan scan=new Scan();
        scan.addFamily(Bytes.toBytes(columnFamily));
        if (StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }
        return this.queryData(tableName, scan);
    }

 

接下来就是演示调用service对hbase进行操作。

@SpringBootTest
class DemoBootHbase2ApplicationTests {

    @Resource
    IHbaseService hbaseService;

    @Test
    public void tableCreate(){
        hbaseService.createTable("test_base", Arrays.asList("a", "back"));
    }


    @Test
    public void listTable(){
        List<String> strings = hbaseService.listTables();
        System.out.println(String.join(",",strings));
    }

    @Test
    public void delTable(){
        hbaseService.deleteTable("test_base");
    }

    @Test
    public void testPutData(){
        hbaseService.insertOrUpdate("test_base", "000001", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                "0.0000", "12.2293", "null"});
        hbaseService.insertOrUpdate("test_base", "000002", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
                "0.0000", "9.8679", "null"});
        hbaseService.insertOrUpdate("test_base", "000003", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "education", "0.8984",
                "0.0000", "25.5649", "null"});

        Map<String, Map<String,String>> test_base = hbaseService.findAll("test_base");
        System.out.println(0);
    }

    @Test
    public void putMapData(){
        LinkedHashMap<String,String> dataMap=new LinkedHashMap<>();
        int tmp= 20211201;
        for (int i=1;i<20;i++) {
            dataMap.put("userName","李"+i);
            dataMap.put("userAge",String.valueOf(20+i));
            hbaseService.insertOrUpdate("test_base","10000_"+tmp,"a",dataMap);
            dataMap.clear();
            tmp++;
        }
    }

    @Test
    public void findList(){
        Map<String, Map<String, String>> test_base = hbaseService.findList("test_base", "10000_20211201", "10000_20211201");
        System.out.println(test_base);
    }


    @Test
    public void setCol(){
        hbaseService.setColumnValue("test_base","000001","a","姓名","aa");
        LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000","a");
        System.out.println(rowData);
        hbaseService.setColumnValue("test_base","000001","a","姓名","abb");
        rowData = hbaseService.getRowData("test_base", "000","back",Arrays.asList("姓名".getBytes()));
        System.out.println(rowData);
    }

    @Test
    public void getRowData(){
        LinkedHashMap<String, String> rowData = hbaseService.getRowData("test_base", "000001","a");
        System.out.println(JSON.toJSONString(rowData));
        rowData= hbaseService.getRowData("test_base", "000001","back");
        System.out.println(JSON.toJSONString(rowData));
    }

    @Test
    public void queryAll(){
        Map<String, Map<String, String>> test_base = hbaseService.findAll("test_base");
         System.out.println(JSON.toJSONString(test_base));
    }
}


 
地址:https://www.leftso.com/article/1045.html

相关阅读

Hbase2环境搭建参考:Hbase2.x docker环境-左搜 (leftso.com)项目创建 创建一个基于spring boot的项目(为了方便一些操作),并引入hbase2的clien...
拉取镜像sudo docker pull harisekhon/hbase 运行容器sudo docker run -d -h myhbase -p 2181:2181 -p 8080:8...
Java编程之spring boot FastDFS Java client使用,Java编程,FastDFS Java客户端
Java连接redis启动报错Error redis clients jedis HostAndPort cant resolve localhost address
java编程中使用nodejs的apidoc工具生成Java api美观的HTML文档,apidoc可以根据代码注释生成web api文档,支持大部分主流语言
java WEB中Cookie的操作和使用,java,cookie,web
Java编程中纯jdk java方式编写webservice服务(server)和客服端(client)
备受期待的Java Enterprise Edition 8发布了两个令人兴奋的新API(JSON-Binding 1.0和Java EE Security 1.0)并改进了当前的API(JAX...
Java 数据库连接 (JDBC)是标准应用程序编程接口 (API) 的 JavaSoft 规范,它允许 Java 程序访问数据库管理系统
使用OAuth2安全的Spring REST API,Secure Spring REST API using OAuth2(含demo代码下载)