DA project 2 validation tool
coconutnut

Update

‼️ Dependency bug fixed. (10/12)

‼️ 不测试FIFO

‼️ 最多9个线程(若要测10+可修改拼接文件名代码)


为了测试submission2(Localized Causal Broadcast)的输出写了个简陋工具,仅有限情况测试
仅!供!参!考!不!保!证!对!

思路

如,process 1依赖2、3,和4、5没关系,先找出所有process输出中关于1、2、3的消息,然后统计1 broadcast每条消息时的vector clock,以及其它process deliver每条消息时的vector clock,后者的>=前者。1的输出可以比其它多,此时多的不考虑,只比较前面的部分。

使用

基本和给的validate_fifo.py差不多
把代码、config、output放在同一个文件夹内(即stress.py的所有输出,有多余的没关系)
带参数–proc_num运行

流程示例

  1. cd到tools
  2. 把validate_lcausal.py放到tools中
  3. run stress
    1
    2
    3
    4
    python stress.py -r ../template_java/run.sh -t lcausal -l output -p 4 -m 100

    // VM
    python3 stress.py -r ../template_java/run.sh -t lcausal -l output -p 4 -m 100
  4. run validate
    1
    2
    3
    4
    python validate_lcausal.py --proc_num 4

    // VM
    python3 validate_lcausal.py --proc_num 4

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import argparse

config_path = './output/config'
output_path = './output/proc0' # append id.output later, id<=9

def read_config():
f = open(config_path,"r")
data = f.readlines()
m = 0
dict = {}
count = 0
for line in data:
splited = line.split(" ")
if count == 0:
m = int(splited[0]) # how many messages each process should broadcast
else:
sender = int(splited[0])
for i in range(len(splited)):
dict.setdefault(sender,[]).append(int(splited[i])) # dependency (including sender itself)
count+=1
return dict

# find all 'd sender seq' in output_id, where sender is in depent set
def getDependentList(id, dset):
cur_output_path = output_path+str(id)+'.output'
f = open(cur_output_path,"r")
data = f.readlines()

sequence = []
for line in data:
splited = line.split(" ")
if splited[0] == "d": # only consider diliver
if int(splited[1]) in dset: # only consider sender in depent set
sequence.append(line)
print(str(id),'#msg:',len(sequence))
return sequence

# find all 'b seq' and relevent delivered message in output_id, where sender is in depent set
def getDependentListOfCreator(id, dset):
cur_output_path = output_path+str(id)+'.output'
f = open(cur_output_path,"r")
data = f.readlines()

sequence = []
for line in data:
splited = line.split(" ")
if splited[0] == "b":
sequence.append(line)
else:
if int(splited[1]) in dset:
sequence.append(line)
print(str(id),'#msg:',len(sequence))
return sequence

# get vector clock associated with each message created by id from sequence
def getVectorClock(sequence, id, proc_num):
clock = []
msg_clock_list = []
for i in range(proc_num+1):
clock.append(0)
for msg in sequence:
splited = msg.split(" ")
sender = int(splited[1])
clock[sender]+=1
if sender==id:
msg_clock_list.append(clock.copy())
return msg_clock_list

# get vector clock associated with each message broadcasted by id from sequence
def getVectorClockOfCreator(sequence, id, proc_num):
clock = []
msg_clock_list = []
for i in range(proc_num+1):
clock.append(0)
for msg in sequence:
splited = msg.split(" ")
sender = int(splited[1])
if splited[0]=='d':
clock[sender]+=1
else:
msg_clock_list.append(clock.copy())
return msg_clock_list

# check if dependency of id is satisfied in all other process
def checkProcessId(id, dset):
print('checking process', str(id), ',depend on', dset)

# sequence of currrent process
ref_sequence = getDependentListOfCreator(id, dset)
ref_clock = getVectorClockOfCreator(ref_sequence, id, proc_num)
print(id,'#clock:',len(ref_clock))

for i in range(proc_num):
cur_id = i+1
if cur_id != id:
# sequence of other process
sequence = getDependentList(cur_id, dset)
# get vector clock
clock = getVectorClock(sequence, id, proc_num)
print(cur_id,'#clock:',len(clock))

# number should be less of equal
if len(clock)>len(ref_clock):
print('Number exceeds!')
return False
# output should be the same with ref_sequence
for i in range(len(clock)):
for j in range(proc_num+1):
if ref_clock[i][j] > clock[i][j]:
print('Clock not match!')
print('ref_clock:',ref_clock[i])
print('clock :',clock[i])
return False
return True

# check output of all processes
def checkProcess(proc_num):
depend = read_config()
print('dependency:',depend,'\n')

for i in range(proc_num):
id = i+1
if checkProcessId(id, depend[id])==False:
return False
print('validate process',str(id),'OK\n')
return True

if __name__ == "__main__":
parser = argparse.ArgumentParser()

parser.add_argument(
"--proc_num",
required=True,
dest="proc_num",
help="Total number of processes",
)

results = parser.parse_args()

proc_num = int(results.proc_num)
if checkProcess(proc_num):
print("Validation OK")
else:
print("Validation failed!")

关于dependency

We say that a process x is affected by a process z if all the messages which process z broadcasts and which process x delivers become dependencies for all future messages broadcast by process x.

个人理解是,如果

  • p1 b 1
  • p2 d 1 1
  • p1 d 2 1
  • p1 d 3 1
  • p1 b 2
    此时p1的dependency是[1 1 1],p2在d 1 2时需要有这些dependency
    加入broadcast或deliver每条消息时对应的clock
  • p1 b 1 [0 0 0]
  • p2 d 1 1 [0 0 0]
  • p1 d 2 1
  • p1 d 3 1
  • p1 b 2 [1 1 1]
  • p2 d 3 1
  • p2 d 2 1
  • p2 d 1 2 [1 1 1]
    统计时,对reference需要统计broadcast时的clock,而其它需统计deliver时的clock