保持与 Redis 相同的 TTL 操作

Redis 中没有 Hbase 中时间版本的概念与机制,也可以认为只有当前版本,并且 TTL 是针对整个 Key 的,而 Hbase 的 TTL 可以基于列族或者单元设定,不是基于整个行键的,如果我们将 Redis 中的数据迁移到 Hbase 中,对应的 TTL 就是一个问题,原来基于 Redis 机制的应用中使用也是一个问题,所以为了保持旧的应用兼容(在不对原有应用做大量重构测试的情况下,只在 API 层面兼容修改),只能手动针对 Hbase 的单元的 TTL 进行设置。

假设现在 API 层面,需要将原有的 Redis 访问替换为 Hbase 的访问,需要做一些 API 层面的映射。首先抛开所使用的底层缓存方案,将缓存抽象:每个缓存数据都属于一张表,这张表中缓存的是某个特定领域,特定应用的数据,访问时,需要指定表名,有两种基本的结构(和 Redis 保持一致) Key-Value, Key-HashKey-Value, 这样就可以为开发人员提供一个抽象层的 API, 开发人员并不需要特别关注底层缓存的方案,如果后面更换缓存方案,可以提供 API 层的兼容。

Hbase 中单元有时间版本的概念(VERSIONS), 一般设置 1 就够了,还有最小版本(MIN_VERSIONS), 可以设置为 0, 保证早于 TTL 的时间版本的数据不会返回。如果我们多次对一个单元进行写,即使设置了最大版本为 1, 如果最近的版本因为 TTL 过期,之前的版本没有过期,而且因为没有合并,没有被删除,还是会被查询出来,这显示对于原来使用 Redis 的应用来说,是不可接受的,所以只能在设置 TTL 时,手动删除之前的版本,而且, TTL 一般来说只是影响数据保存的时间长短,我们是为了清理数据设置的 TTL, 所以这个操作可以异步来进行。即使某次操作失败了,也可以等待下一次的操作进行设置。

这是一个同步的测试,使用 checkAndMutate 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package test.bigdata

import java.time.{Duration, Instant, LocalDateTime, ZoneId}
import java.util
import java.util.Date

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, Get, Put, RowMutations, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes.toBytes

/**
* @author guo
* @date 2022/7/24
*/
object HbaseExpireTest {

def main(args: Array[String]): Unit = {
val configuration = HBaseConfiguration.create()

// hbase pseudo distributed cluster on nas docker
configuration.set("hbase.zookeeper.quorum", "harisekhon-hbase1")
configuration.set("hbase.zookeeper.property.clientPort", "2181")

var connection = ConnectionFactory.createConnection(configuration)
var table: Table = connection.getTable(TableName.valueOf("Test:user"))

try {
connection = ConnectionFactory.createConnection(configuration)
table = connection.getTable(TableName.valueOf("Test:user"))
val rowKey = toBytes("r14")
val family = toBytes("info")
val qualifier = toBytes("name")
val value = toBytes("hello world")
val ttl = Duration.ofMinutes(10).toMillis

// 先不设置 TTL, 直接 Put
val put = new Put(rowKey).addColumn(family, qualifier, value)
table.put(put)

val get = new Get(rowKey).addColumn(family, qualifier)
println("after first set:" + Bytes.toString(table.get(get).getValue(family, qualifier)))

// 设置 TTL, 先检查值是否相等,再进行原子操作: 删除,新增并设置 TTL
// 如果不相等,说明其它实例可能修改了单元的值,所以此次不再设置,等待其它实例设置即可
val time = new Date().getTime
val firstDel = new Delete(rowKey, time).addColumns(family, qualifier)
val secondPut = new Put(rowKey, time + 100).addColumn(family, qualifier, value).setTTL(ttl)
val delAndPut = RowMutations.of(util.Arrays.asList(firstDel, secondPut))

val bool = table.checkAndMutate(rowKey, family)
.qualifier(qualifier)
.ifEquals(value)
.thenMutate(delAndPut)
println("check and execute result:" + bool)

val limitMinutes = 5
val end = LocalDateTime.now().plusMinutes(limitMinutes)
while (LocalDateTime.now().isBefore(end)) {
val get = new Get(rowKey).addColumn(family, qualifier).readAllVersions
val cells = table.get(get).rawCells()
if (cells != null && cells.nonEmpty) {
for (cell <- cells) {
val timestamp = cell.getTimestamp
val family = Bytes.toString(CellUtil.cloneFamily(cell))
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
val row = Bytes.toString(CellUtil.cloneRow(cell))
println(s"${LocalDateTime.now()} row:$row, family: $family, qualifier:$qualifier, timestamp:$timestamp, value:$value, ${Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime}")
}
} else {
println("cells null or empty")
}
Thread.sleep(1000)
}
} finally {
table.close()
connection.close()
}
}
}