DA project 总结
coconutnut

目标

基于UDP Socket,实现Perfect Links -> Uniform Reliable Broadcast -> FIFO Broadcast application和Localized Causal Broadcast

Perfect Links层

点对点的通信,测试时跑n个process,给其中一个发m条消息。

需要满足的性质:

  • Reliable delivery -> 如果发送、接收方都正确,每一条发出的消息最终都会被接收方deliver
  • No duplication -> 不重复deliver
  • No creation -> 不凭空产生消息

    主要的几个类

  1. PerfectLinks
    • 调用SocketClient发消息
    • 检查并deliver,向上(URB)indicate
  2. SocketClient
    • 创建并发送Socket(包括普通消息和ACK)
  3. SocketServer(单独的线程)
    • 监听Socket,收到消息时,交给SocketServerHandler处理
  4. SocketServerHandler(单独的线程)
    • 处理收到的消息,向上(PerfectLinks)indicate
  5. MessageResender(单独的线程)
    • 定时检查并重发消息

      保证Reliable delivery

      通过ACK实现。PerfectLinks发送消息后,维持一个pending集合,用于存所有已经发送、但还没有收到ACK的消息,收到ACK后将消息移出。MessageResender定时检查这个集合,重发其中的所有消息。

      保证No duplication

      维持一个delivered集合即可。

      线程池

      SocketServerHandler用到线程池,防止同时收到消息太多时炸掉。

      并发控制

      可能会出现冲突的是pending和delivered,用ConcurrentHashMap实现。

      SEQ细节

      上层的消息有一个SEQ,创建之后不会改变,且在后面的广播中,发送给每一个process的SEQ都是一样的。而PL层需要区分发给不同process的SEQ,于是加入了PSEQ。PerfectLinks有一个AtomicInteger,每次创建消息时getAndIncrement得到PSEQ。

      Uniform Reliable Broadcast层

      每个process对其他所有process广播m条消息,需满足的性质:
  • Validity -> 如果一个正确的process广播了一条消息,它最终会deliver这条消息
  • No duplication
  • No creation
  • Agreement -> 如果某个正确的process deliver了一条消息,最终所有的process都会deliver这条消息

    思路

    UniformReliableBroadcast类调用PerfectLinks对每一个process发送消息。

    保证Agreement

    测试环境保证了至少一半的process是正确的,基于这个假设,只要确保大多数process收到消息即可。

每次broadcast或收到一条新消息后,加入pending。收到新消息还需要转发,即再次广播这条消息,此时SEQ是不变的,消息中需要含有creator id和sender id。

对pending中的每一条消息,统计从哪些不同的sender处收到了这条消息(此处收到消息,类似PL层收到ACK),如果总数过半,即可deliver。

实现细节

上述统计用到BitSet实现。对pending中的每条消息,用一个BitSet记录从哪些id收到了这条消息。

当总数过半,deliver之后,消息可以从pending中移出。转而加入delivered中,此时不需要保留之前的统计信息,记录一个SEQ就够了。

FIFO Broadcast application层

性质:在URB的基础上,还需保证按序deliver

思路

用一个AtomicInteger数组next,记录对于每一个process,当前以及deliver的消息的SEQ。一开始用了PriorityQueue,每次比较顶部和next。后面发现PriorityQueue并发得加锁,于是还是改成了用set记录收到的SEQ,可并发的set通过ConcurrentHashMap.newKeySet()得到。

流量控制

用一个AtomicInteger统计当前process发出、但还没有deliver的消息数量。该数量超过阈值时,暂停发送新消息。

检查deliver细节

每次收到消息后,检查pending中来自这条消息creator的,有没有符合要求可以deliver的消息。用到这样一句:

1
2
3
4
while(currentPending.remove(currentNext.get())){
// ...
currentNext.getAndIncrement();
}

currentPending是一个ConcurrentSet,只有remove成功时才会返回true,可以避免并发访问时的冲突。

至此,从PerfectLinks到FIFO,都不用加synchronized。

UML activity graph 1

Localized Causal Broadcast层

性质:在URB的基础上,还需保证causal order
例如,1依赖2、3,则当1deliver了2、3的消息m1、m2后,再broadcast新消息m3,那么其它所有process在deliver m3之前,都必须已经deliver了2、3的消息m1、m2。
无依赖的消息仍需保证FIFO。

思路

用2个AtomicInteger数组,一个是vector clock,当deliver一条消息后,对应的位置+1;另一个是dependency clock,当deliver依赖的消息后,对应的位置+1。发送消息时,同时发送当前的dependency clock。收到消息时,需先解析出消息附带的clock,判断这条消息的所有依赖是否满足,是才可以deliver,否则加入pending。

2个clock

区分不同的vector clock和dependency clock,是考虑到以下情况:
对于某一个process i,可能broadcast了很多条消息,但都还没有deliver,那么此时vector clock还没有更新。但是对于其它接收消息的process,它们在deliver来自i的消息时,要保证顺序。通过dependency clock可以统一的处理顺序和依赖。
(实际上只用一个vector clock记录也行,但是打包和更新有点麻烦,于是干脆就分开了,也只占用process总数个AtomicInteger,可以接受)

检查deliver

类似FIFO,仍然是在收到每条消息后,检查对应的creator,是否有来自改creator的消息可以deliver,不同的是需要比较dependency。
这里用深度优先遍历的方法,从左到右比较消息的clock和当前的vector clock,如果全部满足,deliver该消息并返回true。如果缺少依赖,先看能否解决依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean checkDeliver(int createrId){
ConcurrentHashMap<Integer, URBMessage> currentPending = pending.get(createrId);
boolean hasDeilivered = false;

// traverse according to SEQ of this creator
while(currentPending.containsKey(vectorClock[createrId-1].get()+1)){
URBMessage urbMessage = currentPending.get(vectorClock[createrId-1].get()+1);
// check dependency
int i;
boolean flag = true;
for(i=1; i<=totalHost; i++){
if(vectorClock[i-1].get() < urbMessage.vectorClock[i-1]){
flag = false;
break;
}
}

if(flag==false){
// has dependency
System.out.println("depend on "+i);
// check if dependency can be solved
if(createrId==myId || checkDeliver(i)==false){
// nothing delivered
break;
}
}else{
deliver(urbMessage);
hasDeilivered = true;
}
}

return hasDeilivered;
}

UML activity graph 2