0%

这篇文章将为大家介绍如何使用Go语言自己实现一个简单的bitmap。

一.Bitmap简介

1
2
bitmap也就是位图.它的作用一般是用来标记对应元素是否存在的.下面我们来举一个简单的例子:
假设我现在在我的系统当中插入了1,3,7,8,11这五个数字.我需要很快速的判断这五个数字存在与否,那么我们可以使用位图来判断.如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
如上图所示,我们将1,3,7,8,11首先通过hash函数映射可以得到他们的位置:
hash(1) = 1 % 11 = 1
hash(3) = 3 % 11 = 3
hash(7) = 7 % 11 = 7
hash(8) = 8 % 11 = 8
hash(11) = 11 % 11 = 0
于是我们把对应位置标记为1,因此比如下次我们要查看1是否存在,我们使用bitmap(hash(1)) = 1;
可知其是存在的.但是这种方案存在一种弊端就是可能出现误判,比如我们判断12是否存在,那么
bitmap(hash(12)) = 1;会判断12也存在.这种现象叫做假阳性.所以我们需要知道一点就是位图可
以用于判断一个元素不存在(也就是bitmap对应位置为0,那么久必然不存在),但是无法准确判断一个元素
一定存在.假阳性存在的本质是由于hash冲突导致,为了减小假阳性发生的概率,可以考虑设计好的hash函数
和根据对假阳性率要求设定位图大小.这些详细的优化有兴趣的读者请参考:
https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf 本文不做深入解读.

二. 动手实现一个简单的Bitmap

在数据库系统当中,当我们对某一列做聚合操作的时候,我们有时候需要快速判断某一行值是否为NULL来做特殊处理,这里就会使用到位图。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Bitmap struct {
Len int
Any bool
Data []byte
}
/*这里位图设计细节是一个bit位来标记一行,而不是使用一个字节标记的,这样浪费空间了
比如我有16行,我只需要两个字节就够了
byte[0] 0 0 0 0 0 0 0 0 (代表0-7行)
byte[1] 0 0 0 0 0 0 0 0 (代表8-15行)
比如标记第7行的结果为NULL,那么结果就是:
byte[0] 0 0 0 0 0 0 0 1 (代表0-7行)
byte[1] 0 0 0 0 0 0 0 0 (代表8-15行)
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package bitmap

import "fmt"
//假如我们有n行需要判断,就设计大小为n
func New(n int) *Bitmap {
return &Bitmap{
Len: n,
Any: false,
Data: make([]byte, (n-1)/8+1),
}
}

func (n *Bitmap) IsEmpty() bool {
return !n.Any
}
//row>>3等价于row/8, row & 0x7 等价于 row%8
func (n *Bitmap) Add(row uint64) {
n.Data[row>>3] |= 1 << (row & 0x7)
n.Any = true
}

func (n *Bitmap) Del(row uint64) {
n.Data[row>>3] &= ^(uint8(1) << (row & 0x7))
}

func (n *Bitmap) Contains(row uint64) bool {
return (n.Data[row>>3] & (1 << (row & 0x7))) != 0
}

func (n *Bitmap) Filter(sels []int64) *Bitmap {
m := New(n.Len)
for i, sel := range sels {
if n.Contains(uint64(sel)) {
m.Add(uint64(i))
}
}
return m
}

func (n *Bitmap) Rows() []uint64 {
var rows []uint64

for i, j := uint64(0), uint64(n.Len); i < j; i++ {
if n.Contains(i) {
rows = append(rows, i)
}
}
return rows
}

func (n *Bitmap) String() string {
return fmt.Sprintf("%v", n.Rows())
}
1
2
3
优化点:
1.Bitmap在实际使用当中利用内存池分配和释放
2.修改Bitmap结构保证删除时更新Any

原理设计

由于考虑到使用单独map来做会导致并发性能非常大,比如当并发写的情况下,由于使用锁机制,这样并发写的性能就会受到很大的影响。于是我们设计并发map时考虑采取多个map的方案,按照上图那样我们会根据key值进行hash分区,来判断用于哪一个innermap,这样就会对并发性能由很大的提升。

代码复现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package concurrent_map

import (
"sync"
)

type innerMap struct {
m map[interface{}]interface{}
lock sync.RWMutex
}

//这个接口用于给用户自己设计key,hash方法也可以自己设计
type Concurrent_Key interface {
RawKey() interface{} //原始的key值
KeyAfterPartitioned() int64 //原始key值经过hash运算后得到的结果
}

type ConcurrentMap struct {
partitions_map []*innerMap
numsOfPartitions int64 //记录分区数量
}

func CreatConcurrentMap(numsOfPartitions int64) *ConcurrentMap {
var partitions_map []*innerMap
for i := 0; i < int(numsOfPartitions); i++ {
partitions_map = append(partitions_map, &innerMap{
m: make(map[interface{}]interface{}),
})
}
return &ConcurrentMap{
partitions_map: partitions_map,
numsOfPartitions: numsOfPartitions,
}
}

//下面是concurrentMap的一些基本使用方式,由于innermap是内
//部map,因此它的操作方法都应该首字母小写
func (im *innerMap) get(key Concurrent_Key) (interface{}, bool) {
im.lock.RLock()
val, ok := im.m[key.RawKey()]
im.lock.RUnlock()
return val, ok
}

func (im *innerMap) set(key Concurrent_Key, val interface{}) {
im.lock.Lock()
im.m[key.RawKey()] = val
im.lock.Unlock()
}

func (im *innerMap) del(key Concurrent_Key) {
im.lock.Lock()
delete(im.m, key.RawKey())
im.lock.Unlock()
}

//这个方法用于获取分区
func (cm *ConcurrentMap) getPartition(key Concurrent_Key) *innerMap {
return cm.partitions_map[key.KeyAfterPartitioned()%cm.numsOfPartitions]
}

//下面的方法需要大写首字母
func (cm *ConcurrentMap) Get(key Concurrent_Key) (interface{}, bool) {
im := cm.getPartition(key)
return im.get(key)
}

func (cm *ConcurrentMap) Set(key Concurrent_Key, val interface{}) {
im := cm.getPartition(key)
im.set(key, val)
}

func (cm *ConcurrentMap) Del(key Concurrent_Key) {
im := cm.getPartition(key)
im.del(key)
}

我们要使用上面的并发map,需要自己实现Concurrent_Key接口,下面是一个案例

1
2
3
4
5
6
7
8
9
10
11
type StringPartitionedKey struct {
val string
}

func (key *StringPartitionedKey) RawKey() interface{} {
return key.val
}

func (key *StringPartitionedKey) KeyAfterPartitioned() int64{
return hash(key.val)
}

关于hash的实现可以参考我的JackTan25/MurmurHashTest3

这篇文章主要介绍经典的分布式快照算法——Chandy Lamport Algorithm.

引入

比如在我们上面这张图里面,我们可以很容易知晓各国领导人的当前状态,因为无论从时间上还是空间的角度而言,他们都是同步好了,也就是说我们知道了在这一时刻每个领导人的状态。

但是如果这些领导人他们各自在家,我们有该如何去获取这样一个快照呢?而且同时我们实际需要的快照会更加复杂些,比如目前领导者之前通话说的内容也需要记载。

因此我们需要一个全局的快照来记录一个全局的状态。

对于一个分布式系统而言,会有许多应用程序分布在不同的物理机上。这些应用程序进程之间的交互则是使用channel(通道),那么基于这样的一个抽象的模型,我们定义我们的全局快照如下:

1
一个全局快照需要记录每一个进程的本地状态(例如程序变量等)以及每一个channel的状态。

那么我们用这个快照的好处有哪些呢?

1
2
3
4
5
1.用作检查点:当出现应用程序失败时可以重启恢复
2.垃圾收集(GC):可以讲不再用到的数据删除
3.死锁检测:可以检测当前应用程序状态是否出现死锁
4.用于调试:使用快照讲更优于打印的方式来进行调试
.......

那么快照的生成我们需要的各个进程的同一时刻的状态,且要是全局的,但是我们怎么去保证这样一个同一时刻呢?因为存在时钟倾斜的问题,我们很难保证做到这一点,也就是说同步快照是很难做到的。

1
2
由于无法做到"同时",因此在时间差上就可能有状态的变化,任何进程接受或者发送消息或者做了
其它什么事情状态都将被认为改变了。

Chandy Lamport Algorithm

为了解决这个时钟的问题,于是我们有了这个算法。

1
2
3
4
5
6
7
8
9
10
11
12
系统模型:
待解决的问题:
记录一个全局的快照(记录每个进程和每个channel的状态)
模型:
1.系统当中有N个进程,并且都不会失败
2.在每个Process对之间都会有两个单向的FIFO的通道(P1--->P2 和 P2---->P1)
3.所有的消息都会到达,并且完好无损(intact),没有重复
要求:
1.生成快照不应该影响系统的正常运行
2.每个进程都可以自己记录自己的状态
3.可以分布式地去搜集这些状态
4.任何进程都可以初始化一个快照

初始化快照(快照的开始点)

1
2
3
对于进程Pi而言,它会记录自己的状态同时也会准备一个特殊的标记消息(marker message),然
后它会将这个标记消息发送给其它N-1个节点.接下来就会去记录所有从其它节点(也就是Cji,i!=j)
过来的消息(因为在channel里面的消息也是快照的一部分,这些消息暂留在channel)

传播快照

1
2
3
4
5
6
7
8
9
对于所有的进程它会对于收到的消息做一个识别.当它发现自己收到了一个marker message就会触发快照的
的执行.下面为了方便描述,我们考虑Pj,它的对应marker message的channel记作Ckj,代表数据从k流向j
如果收到marker消息时(这个标记消息是我们第一次收到的,如果重复收就不是这个条件了):
1.Pj记录自己的状态并标记Ckj为空
2.发送该marker message给所有其它N-1个节点
3.接下来开始去记录在Clj里面的暂留的消息(l!=j,l!=k,关于这里其实也没有那么严格
其实后那个例子在这一点也不是这么严格的,即使不遵守只要保证幂等也可以)
否则就将Ctj(t!=j)通道里面过来的消息记录到状态中(注意这里的记录状态按照我的理解应该
指的是普通的日志记录)

中断快照

1
2
当所有的进程都收到了marker message后并且自己的状态也记录好了,这时就会有一个
中央服务器来收集部分状态构建出一个全局的快照

算法例子

如上图时最开始的起始状系统状态,首先我们P1作为快照起始点,它会先记录自己的本地状态,如下,在记录完本地状态后就成为下图了(蓝色代表状态记录完毕)

在达到上面这个图的系统状态后,在下图当中,此时P1发送了一条marker message,同时P2也发了一条消息到通道当中。

在下图当中,此时P1就会开始记录通道的消息(发送完marker message后),而P2在收到marker message后也将这个marker message广播出去了

在上图当中P2收到了标记消息(第一次)就会记录本地状态,也会将通道C12标记为empty,同时记录C12里面滞留的消息(由于上C12上面实际上啥也没有了,所以实际没做啥),现在就变成了上面的系统状态了(蓝色区域代表都已经生成好的状态)

在上图当中,P1由于发完了标记消息后也会记录C21的消息,所以M1也被记录到状态当中,然后C21也被标记为empty

最后的结果就是上面蓝色部分整体构成一个全局的快照。

算法正确性证明

首先我们这个算法的目的是解决时钟倾斜的问题的,而实际上本算法是通过保证了一个因果一致性来另辟蹊径的。因此我们需要证明它的因果一致性。

1
2
3
4
5
	1.如果事件发生在任意一个进程的快照之前,就称之为预快照(presnapshot),也就是
说这个事件也应当最后在我们的全局快照当中
2.如果一个事件发生任意一个进程快照之后,那么就是称之为后快照(postsnapshot),也
就是说这个事件不应该出现在我们的全局快照当中
3.如果A发生在预快照B之前,那么A也是预快照

下面开始证明:

1
2
3
4
5
6
7
8
1.如果A和B事件都发生在同一个进程当中,那么我们无需担心在生成快照时出问题
2.如果时不同的进程,那么假设我们在进程P上发送事件是A,而在Q上相应的接受事件是B,
如果说B是presnapshot,那么说明Q就还没有收到marker message(收到marker mess
age就代表快照开始了),那么P肯定也还没有发送marker message(发送marker mess
age就代表快照开始了),这说明了A肯定也是一个presnapshot
3.当B是postsnapshot同样可以证明A也是postsnapshot

综上所述我们证明了快照的正确性