目标
基于UDP Socket,实现Perfect Links -> Uniform Reliable Broadcast -> FIFO Broadcast application和Localized Causal Broadcast
Perfect Links层
点对点的通信,测试时跑n个process,给其中一个发m条消息。
需要满足的性质:
- Reliable delivery -> 如果发送、接收方都正确,每一条发出的消息最终都会被接收方deliver
- No duplication -> 不重复deliver
- No creation -> 不凭空产生消息
主要的几个类
- PerfectLinks
- 调用SocketClient发消息
- 检查并deliver,向上(URB)indicate
- SocketClient
- 创建并发送Socket(包括普通消息和ACK)
- SocketServer(单独的线程)
- 监听Socket,收到消息时,交给SocketServerHandler处理
- SocketServerHandler(单独的线程)
- 处理收到的消息,向上(PerfectLinks)indicate
- MessageResender(单独的线程)
- 定时检查并重发消息
保证Reliable delivery
通过ACK实现。PerfectLinks发送消息后,维持一个pending集合,用于存所有已经发送、但还没有收到ACK的消息,收到ACK后将消息移出。MessageResender定时检查这个集合,重发其中的所有消息。保证No duplication
维持一个delivered集合即可。线程池
SocketServerHandler用到线程池,防止同时收到消息太多时炸掉。并发控制
可能会出现冲突的是pending和delivered,用ConcurrentHashMap实现。SEQ细节
上层的消息有一个SEQ,创建之后不会改变,且在后面的广播中,发送给每一个process的SEQ都是一样的。而PL层需要区分发给不同process的SEQ,于是加入了PSEQ。PerfectLinks有一个AtomicInteger,每次创建消息时getAndIncrement得到PSEQ。Uniform Reliable Broadcast层
每个process对其他所有process广播m条消息,需满足的性质:
- 定时检查并重发消息
- Validity -> 如果一个正确的process广播了一条消息,它最终会deliver这条消息
- No duplication
- No creation
- Agreement -> 如果某个正确的process deliver了一条消息,最终所有的process都会deliver这条消息
思路
UniformReliableBroadcast类调用PerfectLinks对每一个process发送消息。保证Agreement
测试环境保证了至少一半的process是正确的,基于这个假设,只要确保大多数process收到消息即可。
每次broadcast或收到一条新消息后,加入pending。收到新消息还需要转发,即再次广播这条消息,此时SEQ是不变的,消息中需要含有creator id和sender id。
对pending中的每一条消息,统计从哪些不同的sender处收到了这条消息(此处收到消息,类似PL层收到ACK),如果总数过半,即可deliver。
实现细节
上述统计用到BitSet实现。对pending中的每条消息,用一个BitSet记录从哪些id收到了这条消息。
当总数过半,deliver之后,消息可以从pending中移出。转而加入delivered中,此时不需要保留之前的统计信息,记录一个SEQ就够了。
FIFO Broadcast application层
性质:在URB的基础上,还需保证按序deliver
思路
用一个AtomicInteger数组next,记录对于每一个process,当前以及deliver的消息的SEQ。一开始用了PriorityQueue,每次比较顶部和next。后面发现PriorityQueue并发得加锁,于是还是改成了用set记录收到的SEQ,可并发的set通过ConcurrentHashMap.newKeySet()得到。
流量控制
用一个AtomicInteger统计当前process发出、但还没有deliver的消息数量。该数量超过阈值时,暂停发送新消息。
检查deliver细节
每次收到消息后,检查pending中来自这条消息creator的,有没有符合要求可以deliver的消息。用到这样一句:
1 | while(currentPending.remove(currentNext.get())){ |
currentPending是一个ConcurrentSet,只有remove成功时才会返回true,可以避免并发访问时的冲突。
至此,从PerfectLinks到FIFO,都不用加synchronized。
UML activity graph 1
Localized Causal Broadcast层
性质:在URB的基础上,还需保证causal order
例如,1依赖2、3,则当1deliver了2、3的消息m1、m2后,再broadcast新消息m3,那么其它所有process在deliver m3之前,都必须已经deliver了2、3的消息m1、m2。
无依赖的消息仍需保证FIFO。
思路
用2个AtomicInteger数组,一个是vector clock,当deliver一条消息后,对应的位置+1;另一个是dependency clock,当deliver依赖的消息后,对应的位置+1。发送消息时,同时发送当前的dependency clock。收到消息时,需先解析出消息附带的clock,判断这条消息的所有依赖是否满足,是才可以deliver,否则加入pending。
2个clock
区分不同的vector clock和dependency clock,是考虑到以下情况:
对于某一个process i,可能broadcast了很多条消息,但都还没有deliver,那么此时vector clock还没有更新。但是对于其它接收消息的process,它们在deliver来自i的消息时,要保证顺序。通过dependency clock可以统一的处理顺序和依赖。
(实际上只用一个vector clock记录也行,但是打包和更新有点麻烦,于是干脆就分开了,也只占用process总数个AtomicInteger,可以接受)
检查deliver
类似FIFO,仍然是在收到每条消息后,检查对应的creator,是否有来自改creator的消息可以deliver,不同的是需要比较dependency。
这里用深度优先遍历的方法,从左到右比较消息的clock和当前的vector clock,如果全部满足,deliver该消息并返回true。如果缺少依赖,先看能否解决依赖。
1 | public boolean checkDeliver(int createrId){ |