0%

大数据存储框架之HBase(1) 概述

认识HBase

首先,HBaseHadoop集群环境下的一个是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

HBase的特点

说道HBase的特点,其实也很好说:

  • 海量存储
  • 列式存储
  • 极易扩展: Hbase的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。通过横向添加RegionSever的机器,进行水平扩展,提升Hbase 上层的处理能力,提升Hbsae服务更多Region的能力。
    备注:RegionServer 的作用是管理region、承接业务的访问,这个后面会详细的介绍通过横向添加Datanode的机器,进行存储层扩容,提升Hbase 的数据存储能力和提升后端存储的读写能力。
  • 高并发: 这里说的高并发,主要是在并发的情况下,Hbase的单个IO 延迟下降并不多。能获得高并发、低延迟的服务。
  • 稀疏 :稀疏主要是针对Hbase 列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。

HBase的架构

HBase

从上图,我们可以看出,一个标准的HBase的组成应该包含:Client , Zookeeper , Hmaster,HregionServer,HDFS,下面来介绍下这几个组件之间的相关功能:

  • Client

    Client包含了访问HBase的接口,另外Client还维护对应的cache来加速Hbase的访问,比如cachemeta元数据的信息。

  • Zookeeper

    HBase通过 zookeeper来做 master 的高可用、 RegionServer的监控、元数据的入口以及
    集群配置的维护等工作。具体工作如下:
    通过Zoopkeeper来保证集群中只有 1 个 master在运行,如果 master异常,会通过竞争
    机制产生新的 master 提供服务通过Zoopkeeper 来监控 RegionServer 的状态,当 RegionSevrer有异常的时候,通过回调的形式通知 Master RegionServer上下 线 的信息
    通过Zoopkeeper存储元数据的统一入口地址

  • Hmaster

    • Regionserver分配Region
    • 维护整个集群的负载均衡
    • 维护集群的元数据信息
    • 发现失效的Region,并将失效的Region分配到正常的RegionServer
    • RegionServer失效的时候,协调对应的HLog的拆分
  • HregionServer

    • 管理master为其分配的Region
    • 处理来自客户端的读写请求
    • 负责和底层HDFS的交互,存储数据到HDFS
  • HDFS

    • 提供元数据和表数据的底层分布式存储服务

    • 数据多副本,保证高可靠和高可用性

HBase中的角色

HMaster

  • 监控 RegionServer

  • 处理 RegionServer 故障转移

  • 处理元数据的变更

  • 处理 region 的分配或 转 移

  • 在空闲时间进行数据的负载均衡

  • 通过 Zook eeper 发布自己的位置给客户端

RegionServer

  • 负责存储 HBase 的实际数据
  • 处理分配给它的 Region
  • 刷新缓存到 HDFS
  • 维护 H l og
  • 执行压缩
  • 负责处理 Region 分片

其他组件

Write-Ahead logs

HBase的修改记录,当对 HBase 读写数据的时候,数据不是直接写进磁盘,它会在内存中保留一段时间(时间以及数据量阈值可以设定)。但把数据保存在内存中可能有更高的概率引起数据丢失,为了解决这个问题,数据会先写在一个叫做 Write Ahead log file 的文件中,然后再写入内存中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。

Region

Hbase表的分片, HBase表会根据 RowKey值被切分成不同的 region存储在 RegionServer
中,在一个 RegionServer中可以有多个不同的 region

Store

HFile存储在 Store中,一个 Store对应 HBase表中的一个列族。

Memstore

顾名思义,就是内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在WAL中 之后, RegsionServer会在内存中存储键值对。

HFile

这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件。StoreFile是以 Hfile
的形式存储在 HDFS的。

HBase的安装

Note: HBase启动前必须保证,Zookeeper,Hadoop已经启动了。

解压HBase的压缩包到安装目录后,开始对配置文件进行修改:

  • hbase-env.sh
1
2
3
export JAVA_HOME=/opt/module/jdk1.8.0_201
# Tell HBase whether it should manage it's own instance of Zookeeper or not.
export HBASE_MANAGES_ZK=false
  • hbase-site.xml这里我添加了 Phoinex的服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop101:9000/hbase-1.3.5</value>
</property>

<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>

<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop101</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>

<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/dataDir</value>
</property>
<!--
二级索引
phoenix master 配置参数
-->
<property>
<name>hbase.master.loadbalancer.class</name>
<value>org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer</value>
</property>

<property>
<name>hbase.coprocessor.master.classes</name>
<value>org.apache.phoenix.hbase.index.master.IndexMasterObserver</value>
</property>

<!-- 二级索引
添加如下配置到Hbase的Hregionserver节点的hbase-site.xml
-->
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.region.server.rpc.scheduler.factory.class</name>
<value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value>
<description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description>
</property>

<property>
<name>hbase.rpc.controllerfactory.class</name>
<value>org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory</value>
<description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description>
</property>
</configuration>
  • regionserver
1
2
3
hadoop101
hadoop102
hadoop103
  • 软连接hadoop配置文件到hbase
1
2
ln s /opt/ hadoop 2.7.2 /etc/hadoop/core site.xml /opt/module/https://sun-iot.coding.net/p/hexo-image/git/raw/master/bigdata/hbaseconf/core site.xml
ln s /opt/ hadoop 2.7.2 / hadoop/hdfs site.xml /opt/module/https://sun-iot.coding.net/p/hexo-image/git/raw/master/bigdata/hbaseconf/hdfs site.xml
  • 分发出去后,将其他节点上的机器中的hbase-site.xml文件中的phoinex-maxter配置删了,只保留master配置。
  • 在集群时间同步的情况下,我们可以启动服务
1
2
3
bin/start hbase.sh

bin/stop hbase.sh
  • hbase集群时间的超时时间设置,在时间同步时可以搭配使用
1
2
3
4
5
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
<description>Time difference of regionserver from master</description>
</property>

HBase的Shell操作

  • 进入客户端
1
bin/hbase shell
  • 查看帮助命令
1
help
  • 查看当前数据库中有哪些表
1
list

表的操作

  • 创建表
1
2
create [table] [families]
create 'student',"info"
  • 插入数据到表
1
2
put [table] [rowkey] [family:column] [value]
put 'student','1001','info:sex','male'
  • 扫描表数据
1
2
3
4
5
6
scan [table] 
scan 'student'

scan [table] {STARTROW => condition1 , stoprow => condition2 }
scan 'student',{STARTROW => '1001', STOPROW =>'1001'}
scan 'student',{STARTROW => '1001'}
  • 查看表结构
1
2
describe [table]
describe 'student'
  • 更新指定字段数据
1
2
put [table] [rowkey] [family:column] [value]
put 'student','1001','info:name','Nick'
  • 查看“指定行”或“指定列族:列”的数据
1
2
3
4
5
get [table] [rowkey]
get 'student' '1001'

get [table] [rowkey] [famile:column]
get 'student' '1001' 'info:name'
  • 统计表数据行数
1
2
count [table] 
count 'student'
  • 删除数据
1
2
3
4
5
6
-- 删除某 rowkey 的全部数据
deleteall [table] [rowkey]
deleteall 'student' '1001'
-- 删除 rowkey 的某一列数据
delete [table] [rowkey] [family:columa]
delete 'student','1001','info:name'
  • 清空表数据
1
2
truncate [table]
truncate 'student'
  • 删除表
1
2
3
4
5
6
-- 删除表之前,先停止表的状态
disable [table]
drop [table]

disable 'student'
drop 'student'
  • 变更表信息
1
2
3
4
alter [table] {NAME=>FAMILY,VERSION>NEW_NUMBER}
alter 'student',{NAME=>'info',VERSIONS=>3}
-- 查看一下
get 'student' ,'1001' ,{COLUMN=>'info:name' , VERSIONS=>3}

HBase数据结构

Rowkey

相当于我们关系数据库中的主键,全表唯一。我们访问HBase中的行,一般有三种方式:

  • 通过单个Rowkey
  • 通过Rowkeyrange(正则)
  • 全表扫描

Rowkey 设计原则

  • 长度原则
  • 散列原则
  • 唯一原则

Rowkey 的设计方案

  • 随机数
  • Hash
  • 散列值
  • 字符串反转
  • Column Family

列族:HBASE表中的每个列,都归属于某个列族。列族是表的 schema的一部 分( 而列不是 ),必须在使用表之前定义。列名都以列族作为前缀。例如courses:history courses:math都属于 courses 这个列族。

Cell

{rowkey, column Family columu , version} 唯一确定的单元。 cell中的数据是没有类型的,全部是字节码形式存贮。

关键字:无类型、字节码

Time Stamp

HBase中通过rowkey和columns确定的为一个存贮单元称为cell。每个cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是64位整型。时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统。时间戳也可以是由客户显示赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳,每个cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

为了避免数据存在过多版本造成的管理(包括存贮和索引)负担,HBase提供了两种数据版本回收方式,一是保存数据的最后 n 个版本,而是保存最近一段时间的版本(比如近七天),用户可以针对每个列族进行设置。

命名空间

命名空间结构:

1572590350006

  • Table表: 所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定,则在default默认的命名空间中;
  • RegionServer Group: 一个命名空间包含默认的RegionServer Group;
  • Permission: 权限,命名空间能够让我们来定义访问控制列表 ACL(Access Control List) , 比如创建表,读取表等;
  • Quota: 限额,可以强制一个命名空间可包含的region的数量

HBase的原理

读流程

1572590720917

  • Client先访问 Zookeeper,从meta表读取region的位置,然后读取meta表中的数据。meta中有存储用户表的 region信息
  • 根据 namespace,表名和rowkeymeta表中找到对应的region信息
  • 找到这个region对应的regionserver
  • 先从MemStore找到数据,如果没有,再到BlockCache里面读
  • BlockCache还没有,再到StoreFile上读(为了读取的效率)
  • 如果是从StroFile里面读取的数据,不是直接返回客户端,而是先写入BlockCache,再返回给客户端。

写流程

1572590740409

  • ClientHRegionServer发送请求
  • HRegionServer将数据写到HLogWrite Ahead Log).为了数据的吃持久化和恢复;
  • HRegionServer将数据写入到内存(MemStore
  • 反馈Client写成功

数据Flush过程

  • MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;
  • 并将数据存储到HDFS
  • 在HLog

数据合并过程

  • 当数据块达到4 块,Hmaster 触发合并操作,Region 将数据块加载到本地,进行合并;
  • 当合并的数据超过256M,进行拆分,将拆分后的Region 分配给不同的HregionServer管理;
  • 当HregionServer 宕机后,将HregionServer 上的hlog 拆分,然后分配给不同的HregionServer加载,修改.META.;
  • 注意:HLog 会同步到HDFS。

HBase API

添加依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>

下面的HBase操作工具类,连接缓慢,不推荐使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
package com.ci123.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Copyright (c) 2018-2028 Corp-ci All Rights Reserved
* <p> HBase的操作的工具类
* Project: telecom-customer-service
* Package: com.ci123.hbase
* Version: 2.0
* <p> 比1.0 增加了 多线程的连接方式 使用方式:
* HBaseUtil build = HBaseUtil.create()
* .setZkUrl("ip")
* .setZkPort("2181")
* .setMasterUrl("ip:16000")
* .build();
* 后期的修改思路如下:面对多用户操作时,可考虑一个用户一个连接,并将 用户-连接 作为一组保存在一个集合里面,当用户下次再来访问的时候,直接从这个集合 中获取连接
* Created by SunYang on 2019/11/1 15:19
*/
public class HBaseUtil {
private static final Logger logger = LoggerFactory.getLogger(HBaseUtil.class);
private String zkUrl;
private String zkPort;
private String masterUrl;

private Configuration configuration;
private ExecutorService executor;
private Connection connection;

private HBaseUtil() {
}

private void init() {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", zkUrl);
configuration.set("hbase.zookeeper.property.clientPort", zkPort);
configuration.set("hbase.master", masterUrl);
executor = Executors.newFixedThreadPool(32);
try {
connection = ConnectionFactory.createConnection(configuration, executor);
} catch (IOException e) {
logger.error("HBase connect failed {}.", e.getMessage());
throw new RuntimeException(String.format("HBase connect failed {%s}.", e.getMessage()));
}
}

public static HBaseOperateBuilder create() {
return new HBaseOperateBuilder();
}

/**
* 判断 HBase中的表是否存在
*
* @param tableName
* @return true
*/
public boolean isTableExit(String tableName) {
HBaseAdmin admin = null;
try {
admin = (HBaseAdmin) connection.getAdmin();
return admin.tableExists(tableName);
} catch (IOException e) {
logger.error("server connect failed {}", e.getMessage());
return false;
} finally {
try {
admin.close();
} catch (IOException e) {
logger.error("admin close failed {}. ", e.getMessage());
}
}
}

/**
* 创建HBase表
*
* @param tableName
* @param families
* @return true
*/
public boolean createTable(String tableName, String... families) {
if (isTableExit(tableName)) {
logger.warn("the table is exit , nothing to do.");
return false;
} else if(null == families){
logger.error("table family must be not null {}." );
return false ;
}else {
HBaseAdmin admin = null;
// 创建表属性的对象,表名需要转字节
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String family : families) {
tableDescriptor.addFamily(new HColumnDescriptor(family));
}
// 根据表的配置,创建表
try {
admin = (HBaseAdmin) connection.getAdmin();
admin.createTable(tableDescriptor);
logger.info("the table create successful");
return true;
} catch (IOException e) {
logger.error("table create failed {}.", e.getMessage());
return false;
} finally {
try {
admin.close();
} catch (IOException e) {
logger.error("admin close failed {}. ", e.getMessage());
}
}
}
}

/**
* 删除 HBase中的表
*
* @param tableName
* @return true
*/
public boolean deleteTable(String tableName) {
if (isTableExit(tableName)) {
HBaseAdmin admin = null;
try {
admin = (HBaseAdmin) connection.getAdmin();
} catch (IOException e) {
logger.error("admin create failed {}. ", e.getMessage());
return false;
}
try {
admin.disableTable(tableName);
} catch (IOException e) {
logger.error("table disable failed {}.", e.getMessage());
return false;
}
try {
admin.deleteTable(tableName);
} catch (IOException e) {
logger.error("table delete failed {].", e.getMessage());
return false;
} finally {
try {
admin.close();
} catch (IOException e) {
logger.error("admin close failed {}. ", e.getMessage());
}
}
return true;
} else {
logger.error("table is not exits");
return false;
}
}

/**
* 插入一行数据
*
* @param tableName
* @param rowkey
* @param family
* @param qualifier
* @param value
* @return
*/
public boolean putRowData(String tableName, String rowkey, String family, String qualifier, String value) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("get table failed {}.", e.getMessage());
return false;
}
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
try {
table.put(put);
return true;
} catch (IOException e) {
logger.error("table put failed {}.", e.getMessage());
return false;
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table close failed {}.", e.getMessage());
return false;
}
}
}
public boolean putList(String tableName , String rowKey, String family , String[] qualifiers , String[] values){
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
if (createTable(tableName , family)) {
logger.warn("table is not exit , but we create it auto");
}
}
List<Put> putList=new ArrayList<>();
Put put = new Put(Bytes.toBytes(rowKey));
for (int i = 0 ; i < values.length ; i++ ){
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifiers[i]), Bytes.toBytes(values[i]));
putList.add(put) ;
// 这个3177 是根据HBase的缓冲区大小设置的
if (putList.size() == 3177 ){
try {
table.put(putList);
putList.clear();
} catch (IOException e) {
logger.error("table putlist failed {}." , e.getMessage());
e.printStackTrace();
}
}
}

try {
table.put(putList);
return true ;
} catch (IOException e) {
logger.error("table putlist failed {}." , e.getMessage());
}

return false ;

}

/**
* 删除多行数据
*
* @param tableName
* @param rowkeys
* @return
*/
public boolean deleteMultiRow(String tableName, String... rowkeys) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("get table failed {}.", e.getMessage());
return false;
}
List<Delete> deleteList = new ArrayList<Delete>();
for (String rowkey : rowkeys) {
deleteList.add(new Delete(Bytes.toBytes(rowkey)));
}
try {
table.delete(deleteList);
return true;
} catch (IOException e) {
logger.error("delete multi rowkey failed {}.", e.getMessage());
return false;
}
}

/**
* 拿到左右的数据
*
* @param tableName
* @return
*/
public List<Result> getAllRows(String tableName) {
Table table = null;
Scan scan = new Scan();
List<Result> resultList = new ArrayList<>();
table = getTable(tableName);
try {
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
resultList.add(result);
}
return resultList;
} catch (IOException e) {
logger.error("scan failed {}.", e.getMessage());
return null;
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table close failed");
}
}
}

/**
* 获取某一行的数据
*
* @param tableName
* @param rowkey
* @param showVersion 是否显示当前的版本信息
* @return
*/
public Result getRow(String tableName, String rowkey, boolean showVersion) {
Table table = null;
Get get = new Get(Bytes.toBytes(rowkey));
if (showVersion) {
get.setMaxVersions();
}
table = getTable(tableName);
try {
return table.get(get);
} catch (IOException e) {
logger.error("get failed {}.", e.getMessage());
return null;
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table close failed");
}
}
}

/**
* 根据当前的时间戳,显示某一行的所有的细腻,包括版本信息
*
* @param tableName
* @param rowkey
* @param timestamp 时间戳,显示指定时间戳的版本
* @return
*/
public Result getRow(String tableName, String rowkey, long timestamp) {
Table table = null;
Get get = new Get(Bytes.toBytes(rowkey));
get.setMaxVersions();
try {
get.setTimeStamp(timestamp);
} catch (IOException e) {
logger.error("get setTimestamp failed {].", e.getMessage());
return null;
}
table = getTable(tableName);
try {
return table.get(get);
} catch (IOException e) {
logger.error("get failed {}.", e.getMessage());
return null;
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table close failed");
}
}
}

/**
* 获取某一行指定 “列族:列”的数据
*
* @param tableName
* @param rowkey
* @param family
* @param qualifier
* @return
*/
public Result getQualifier(String tableName, String rowkey, String family, String qualifier) {
Table table = null;
Get get = new Get(Bytes.toBytes(tableName));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
table = getTable(tableName);
try {
return table.get(get);
} catch (IOException e) {
logger.error("get qualifier failed {}.", e.getMessage());
return null;
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table close failed");
}
}
}

/**
* 获取一个HBase的表
*
* @param tableName
* @return
*/
private Table getTable(String tableName) {
try {
return connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("get table failed {}.", e.getMessage());
return null;
}
}

// 将线程池与连接池关闭
public void close() {
try {
if (null != connection) {
connection.close();
}
if (null != executor) {
executor.shutdown();
}
} catch (IOException e) {
throw new RuntimeException(String.format("connection | executor closed failed {}%s.", e.getMessage()) ) ;
}
}

public static class HBaseOperateBuilder {
HBaseUtil hBaseUtil = null;

public HBaseOperateBuilder() {
hBaseUtil = new HBaseUtil();
}

public HBaseOperateBuilder setZkUrl(String zkUrl) {
hBaseUtil.zkUrl = zkUrl;
return this;
}

public HBaseOperateBuilder setZkPort(String zkPort) {
hBaseUtil.zkPort = zkPort;
return this;
}

public HBaseOperateBuilder setMasterUrl(String masterUrl) {
hBaseUtil.masterUrl = masterUrl;
return this;
}

public HBaseUtil build() {
hBaseUtil.init();
return hBaseUtil;
}
}

}

使用Phoenix构建HBase的SQL查询

笔者在之前的配置中涉及到了 Phoenix的二级索引的构建,这里补充下Phoenix的HBase的SQL查询的步骤。

首先我们需要有一个Phoenix的包,这个包需要和我们的HBase的版本保持一致(避免有其他麻烦)

笔者的HBase是1.1.2版本,所以笔者的Phoenix是4.7.0-Hbase-1.1.2。

我们开始复制我们的Phoenix的服务器和客户端的JAR包到我们的HBase的lib包下面

1
2
cp phoenix-4.7.0-HBase-1.1-server.jar /opt/module/hbase-1.1.2/lib/
cp phoenix-4.7.0-HBase-1.1-client.jar /opt/module/hbase-1.1.2/lib/

重启我们的HBase,在启动我们的phoenix,进入phoenix目录

1
bin/sqlline.py hadoop101:2181

然后,我们就可以在phoenix使用sql来操作我们的HBase了。

Phoenix的简单使用

1
2
3
4
5
6
7
8
9
10
11
12
-- 创建表
CREATE TABLE IF NOT EXISTS order_detail (
"__time" BIGINT NOT NULL,
"orderID" VARCHAR NOT NULL,
"userID" VARCHAR NOT NULL,
"skuID" VARCHAR ,
"skuPrice" BIGINT ,
"skuCount" BIGINT ,
"orderPrice" BIGINT ,
"payMethods" VARCHAR ,
"createTime" VARCHAR,
CONSTRAINT emp_pk PRIMARY KEY ("__time" , "orderID", "userID"))
  • 当我们的HBase里面有表的时候,我们怎么把表的数据映射到我们的Phoenix呢?
  • 我们都知道,在默认的情况下,我们在HBase里面创建的表,在我们的PHoenix里面是看不到,那我们现在就需要通过Phoenix去操作我们的HBase里面的数据表,则需要建立映射。
  • 映射分为视图映射和表映射

比如说:我现在phoenix里面的表如下:

1573782011532

在我们的Hbase里面,我们的表如下:

1573782125398

好的,我们看到,我们没有在HBse里面创建额外的表,好吧,那我先去创建一个,再看这个两个图:

1573782596991

phoenix的图还是一样的没变,每骗你们,可以自己实验的。

我们看一下这个telecom-customer-service的这个表结构是什么

1573782882018

我发现直接做describe telecom-customer-service 看起来不舒服,还是用了scan。我们可以看到,我里面的所有的字段都在一个 列簇info 下面。一共有 7个字段。那现在就是有两种方式去创建我们的映射关系了。

视图映射

视图映射,很好理解,就是将我们的HBase里面的表数据投放到我们的Phoenix里面,但是我们的看到的只是一个视图,就像海市蜃楼一样,我们可以看到,但是却无法对其操作,除了查询之外。依据上述,我们建立我们的映射:

1
2
3
4
5
6
7
8
9
10
-- 视图映射
CREATE VIEW "telecom-customer-service" (
"rowkey" VARCHAR PRIMARY KEY,
"info"."callA" VARCHAR,
"info"."phoneNumberA" VARCHAR,
"info"."callB" VARCHAR,
"info"."phoneNumberB" VARCHAR,
"info"."dateTime" VARCHAR,
"info"."timestamp" BIGINT,
"info"."duration" BIGINT);

这样我们的试图就算建好了。一起看下效果:

我们就找刚刚这个rowkey=15908402447_15801891027_1487887705000的数据。

1
select * from "telecom-customer-service" where "rowkey"='15908402447_15801891027_1487887705000';

1573784709972

这里的时间戳好像有点问题。

表映射

1
2
3
4
5
6
7
8
9
CREATE TABLE "telecom-customer-service" (
"rowkey" VARCHAR PRIMARY KEY,
"info"."callA" VARCHAR,
"info"."phoneNumberA" VARCHAR,
"info"."callB" VARCHAR,
"info"."phoneNumberB" VARCHAR,
"info"."dateTime" VARCHAR,
"info"."timestamp" BIGINT,
"info"."duration" BIGINT);

这样就行了,很简单。

1
2
3
4
5
6
7
select count(*) from "telecom-customer-service";

+-----------+
| COUNT(1) |
+-----------+
| 2699 |
+-----------+
这是打赏的地方...
---------Thanks for your attention---------