Menu

并行与分布式计算(5)

题目要求

现实世界的很多场景中,需要计算最短路径,如导航中的路径规划等,但是如何在大规模路径中快速找出最短路径,是一个具有挑战性的问题。本项目要求利用MPI+OpenMP,从一个至少包含1万个节点,10万条边的图中,寻找最短路径,边上的权重可随机产生。统一测试程序的执行时间,进行排名,根据排名计算成绩。

几次更新之后数据达到了两万顶点,一亿四千万条边…

使用说明

以下指令均需要先cd进文件所在目录。样例数据10001x10001GraphExamples.txt实在是太大,就没有一起上传了,需要在运行参数上指明其地址,并且代码对文件名不做检查,如果文件不存在会发生段错误。

编译

  • mpicc使用mpicc作为编译器
  • -O3编译优化
1
$ mpicc -O3 -ompi_dijkstra mpi_dijkstra.c

运行并测试时间

由于我只有一台双核四进程的机器,因此本地测试的时候只测试了4个进程。考虑到最终测试环境有3台主机,希望TA大大在测试的时候调整为12个进程同时运行,3台主机每台主机上运行4个进程;并且每一台主机上都要有输入文件。

程序由第一个运行参数指明并行的进程数,并将结果输出到屏幕上,例如,下面10001x10001GraphExamples.txt是输入文件名,测试时需要放进同目录。

1
2
3
4
5
6
$ time mpirun -n 4 ./mpi_dijkstra 10001x10001GraphExamples.txt
2
20000<-826<-0
real    0m1.484s
user    0m3.984s
sys     0m1.313s

第1行是最短路径长度,第2行是(从右向左方向)路径上各点,再后面是运行时间。由于做了挺多的优化(见下),在我的机器上1.484s内就可以跑完。考虑到最终测试环境有3台主机,12进程同时并行的运行时间仍然是值得期待的。

当然,由于每个进程都要读取文件,而机械硬盘的读取实际上是单线程的,同一台机器上多进程同时读取文件会使硬盘频繁地进行随机读取,从而导致读取性能下降。我的程序还支持各进程的并发读取控制。如下,当接受更多的参数时,我的程序会把它作为每个进程的运行主机标识,相同标识的进程之间读取是不能够并发的。关于并发读取控制部分的详细解释在后面。

由于我的机器是固态硬盘,并发读取之后性能没有下降,开启并发读取控制之后,时间反而有了比较大的增加,甚至慢于一个进程运行的结果(主要是我的机器是固态硬盘,文件读取速度较快;如果是在随机读取性能较差的机械硬盘上,并发读取控制可能会有比较好的效果;此外,在多次运行本程序时也同样不需要并发读取控制,因为这时候mmap是直接对文件在内存中的映射做操作,不涉及硬盘上的随机读取,也有更佳的时间表现)。

1
2
3
4
5
6
$ time mpirun -n 4 ./mpi_dijkstra 10001x10001GraphExamples.txt 1 1 1 1
2
20000<-826<-0
real    0m4.696s
user    0m13.047s
sys     0m5.000s

以下是一个进程的运行结果,供对比。

1
2
3
4
5
6
$ time mpirun -n 1 ./mpi_dijkstra 10001x10001GraphExamples.txt
2
20000<-13915<-0
real    0m4.105s
user    0m3.250s
sys     0m0.688s

如果测试环境是3台主机,第1台主机负责0~3号进程,第2台主机负责4~7号进程,第三台主机负责8~11号进程,可以像这样配置并发读取控制:

1
./mpi_dijkstra 10001x10001GraphExamples.txt 1 1 1 1 2 2 2 2 3 3 3 3

实现原理

简单的来说,我的实现方案是,每个进程(进程)负责读入文件的一部分,构造并维护原图的一个子图。在Dijkstra算法的每一轮中,每个进程用各自的图上的边去更新答案;每一轮结束后,各进程「选举」出下一轮考虑的顶点,直到选举出终点T的时候退出。

数据结构的选择

图的数据结构选择了邻接表。如果使用邻接矩阵的话,20000*20000大小的int数组占用内存就已经达到了1.6GB。如果每个进程维护自己的子图的话内存上是很难受的,而共用同一个矩阵又势必在读入的时候引入临界区的问题,也就是要加锁。经过测试,加锁之后我的机器上的运行时间达到了4s。邻接表虽然需要维护大量的指针变量,但是边数是已经知道的(约一亿四千万条边)。当然,边的数量再增加的时候,也只需要修改源码中的宏常量E即可。这样,每个进程的图之间是相互独立的,避开了读入时的临界区问题,不需要加锁。

1
#define E 160000009		//边数

经典的串行Dijkstra算法通常会使用堆来维护剩余顶点中到起点最近的点,从而把单次选举的复杂度从O(N)降到O(log N)。在我实现的并行的版本中,每个进程维护一个自己的堆,而堆中用于比较的权值是这个顶点的答案被本进程更新后的答案。这样做的道理是,每个进程在每一轮中更新后的答案是会有区别的,因此堆里的权值很容易就会失效,如果在失效的时候就从堆里删除这个点,那么运行时的效率就会大大下降。因此,在同步完每一轮更新的答案之后,各进程单独检查一下堆顶的元素是否失效,如果失效则删除堆顶元素直到合法或堆空。

因此,最终期待的时间复杂度为O(E/P+Nlog N),其中E是边数,N是顶点数,P是并行时的进程数(每一条边至多被一个进程读入并访问一次用于更新答案;每个进程都要维护一个堆来选举下一个顶点)。

边的存储

如果不加说明一下的话,边的存储结构可能看起来会有一些奇怪。

1
2
3
4
5
struct Edge
{
	int from;
	short to, len;
};

struct Edge同时用在邻接表中的边、记录答案的ans数组、每个进程维护的堆,下面一点点来说明。

在邻接表中,struct Edge是邻接链表的每个节点。from指向了链表的上一条边,由于边数超过了16位数据结构的限制,因此from是32位的;to指向这条边的终点;len是边长。

在ans数组中,from变量用来记录到达这个顶点最短路上的前一个点,to这里没有用上,len记录到达这一点的最短路。可能你已经注意到了,struct Edge中的变量类型有一些奇怪。这样做的好处是,struct Edge的大小恰好相当于MPI_INT64_T,并且最高位是从len的最高位开始的,在MPI_Allreduce中可以直接使用MPI_MIN来同步最优的答案以及选举下一个顶点(还做了一个特殊处理,当距离相等时,优先选举T作为答案)。

1
2
3
4
5
6
7
8
9
10
11
12
cur = qSiz && q[0].len < ans[T].len ? q[0] : ans[T];
if (cur.to++ == T) //优先选择T
	cur.to = 0;
MPI_Allreduce(
	MPI_IN_PLACE,
	&cur,
	1,
	MPI_INT64_T,
	MPI_MIN,
	MPI_COMM_WORLD);
if (!cur.to--)
	cur.to = T;

实际情况下,有大概一万五千个顶点到起点的最短路长都小于2,用16位short去存tolen是完全足够的,还减小了内存消耗。

在堆中,我们仍然需要记录进堆时的答案,因此和ans数组的用法一样,只是要用到to用来标记了顶点的序号。

堆的实现

由于想保持语言的纯粹性,没有用C++,所以自己实现了STL里面的push_heappop_heap两个操作。

1
2
3
4
5
6
7
8
9
10
11
// 将ans[e[j].to]入堆
int i = qSiz++;
while (i > 0)
{
	int p = (i - 1) / 2;
	if (q[p].len <= ans[e[j].to].len)
		break;
	q[i] = q[p];
	i = p;
}
q[i] = ans[e[j].to];
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//将q[0]出堆
--qSiz;
int i = 0;
while (i < qSiz / 2)
{
	int ch = i * 2 + 1;
	if (ch + 1 < qSiz && q[ch].len > q[ch + 1].len)
		++ch;
	if (q[qSiz].len <= q[ch].len)
		break;
	q[i] = q[ch];
	i = ch;
}
q[i] = q[qSiz];
1
#define HEAP_SIZE 65535

经过测试,堆的大小设置成这么大已经足够,即使只有一个进程也不会溢出。

初始化

给起点的距离赋值-INF,而省去了把距离数组全部初始化成INF的操作。最终输出结果时加上这个INF即可。

读入部分的优化

由于输入文件达到了1.74G,对读入部分优化的效果最明显。

最开始我使用了标准库的函数fread,将输入文件整体读进内存之后做并行化字符串切割。这里遇到了一个问题,加上存储图的结构,进程的内存占用超过了2G,因此编译时要加上-mcmodel=medium解除内存模型限制。然而即使在NVMe固态硬盘上(所用机器是VAIO Z Flip 2016,硬盘读取速度可以达到3G/s),单次fread的时间都超过了0.5s。

经过查询资料,我换用了Linux系统下提供的接口函数mmap。这个函数可以将文件直接映射到内存,不用经过内核缓冲区。并且使用mmap在MPI下还有一个额外的好处:将mmap参数设置为PROT_READ(只读)、MAP_SHARED(共享访问)后,文件实际上只被加载到内存中一次,各个进程访问的是内存中的同一块地址,既加快了文件读入,也减少了多进程时额外的内存占用。

在运行的开始,文件被mmap进内存后,各进程根据自己的进程id计算自己的读取偏移量。具体做法是先直接按比例计算出位置,随后定位到最近的一个换行符。

各进程计算出自己读入起点的偏移量后,需要获得下一个进程的偏移量作为自己的读入终点。这里使用MPI_Sendrecv_replace实现偏移量的传递。

然而,这样做会引来一个新的问题,就是随机读取时的性能下降问题。由于我用的机器是固态硬盘,带来的影响较小。在机械硬盘上,多进程同时读取文件的不同位置会使硬盘的磁头不断地重新寻址,从而导致随机读取的性能大幅下降。换言之,假如测试的机器没有做raid 0的话,其读文件的过程其实是单线程的。因此,我这样实现了一个并发读取控制:每个进程读取文件前需要先从同一台主机的之前的已经读取过的进程接受消息;读取文件之后向本主机没有读取文件的进程发送就绪消息。这样,同一台主机在同一时刻只会有一个进程在访问硬盘,并且是顺序读入的,避免了频繁重新寻址造成的读入性能下降。

当然,操作系统会将访问的文件放入内存中缓存。因此,在多次运行本程序时就不需要并发读取控制了,因为这时候mmap是直接对文件在内存中的映射做操作,不涉及硬盘上的随机读写,也有更佳的时间表现。

程序源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <mpi.h>
#define N 20001			//点数
#define E 160000009		//边数
#define S 0				//起点
#define T 20000			//终点
#define INF 16383		//正无穷,且两个正无穷相加不会溢出
#define HEAP_SIZE 65535 // 堆的大小
struct Edge
{
	int from;
	short to, len;
} e[E],					//链表
	q[HEAP_SIZE],		//小根堆
	ans[N],				//本节点维护的答案
	cur = {S, S, -INF}; //当前访问到的点
int v[N],				//链表头
	id,					//当前进程号
	numThreads;			//当前进程总数
int main(int argc, char **argv)
{
	MPI_Init(&argc, &argv);
	MPI_Comm_rank(MPI_COMM_WORLD, &id);
	MPI_Comm_size(MPI_COMM_WORLD, &numThreads);
	int fd = open(argv[1], O_RDONLY), fs = lseek(fd, 0, SEEK_END), fi = fs / numThreads * id;
	const char *b = (const char *)mmap(NULL, fs, PROT_READ, MAP_SHARED, fd, 0);
	if (id)
		while (b[fi] != '\n')
			++fi;
	int offset = fi;
	MPI_Sendrecv_replace( //传递偏移量到offset
		&offset,
		1,
		MPI_INT,
		(id + numThreads - 1) % numThreads,
		0,
		(id + 1) % numThreads,
		0,
		MPI_COMM_WORLD,
		MPI_STATUSES_IGNORE);
	if (argc - 2 >= numThreads) //并发读入控制
		for (int i = 0, fe = id ? fi : fs; i < id; ++i)
			if (!strcmp(argv[2 + id], argv[2 + i])) //接受就绪信号
				MPI_Recv(&fe, 1, MPI_INT, i, i, MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
	for (int fe = id + 1 < numThreads ? offset : fs, tot = 1, i = 0, a[3];; ++i)
	{
		while (fi < fe && b[fi] < '0')
			++fi;
		if (fi >= fe)
			break;
		for (a[i] = 0; fi < fe && b[fi] >= '0'; ++fi)
			a[i] = a[i] * 10 + b[fi] - '0';
		if (i == 2)
		{
			i = -1;
			e[tot].len = a[2];
			e[tot].to = a[1];
			e[tot].from = v[a[0]];
			v[a[0]] = tot++;
		}
	}
	if (argc - 2 >= numThreads) //并发读入控制
		for (int i = id + 1, fe = id ? fi : fs; i < numThreads; ++i)
			if (!strcmp(argv[2 + id], argv[2 + i])) //发送就绪信号
				MPI_Send(&offset, 1, MPI_INT, i, id, MPI_COMM_WORLD);
	for (int qSiz = 0; cur.to != T;) //开始跑最短路
	{
		ans[cur.to] = cur;
		for (int j = v[cur.to]; j; j = e[j].from)
			if (ans[e[j].to].len > cur.len + e[j].len)
			{
				ans[e[j].to].len = cur.len + e[j].len;
				ans[e[j].to].to = e[j].to;
				ans[e[j].to].from = cur.to;
				int i = qSiz++;
				while (i > 0)
				{
					int p = (i - 1) / 2;
					if (q[p].len <= ans[e[j].to].len)
						break;
					q[i] = q[p];
					i = p;
				}
				q[i] = ans[e[j].to];
			}
		while (qSiz && (q[0].to == cur.to || q[0].len > ans[q[0].to].len)) //从堆顶删除无效元素
		{
			--qSiz;
			int i = 0;
			while (i < qSiz / 2)
			{
				int ch = i * 2 + 1;
				if (ch + 1 < qSiz && q[ch].len > q[ch + 1].len)
					++ch;
				if (q[qSiz].len <= q[ch].len)
					break;
				q[i] = q[ch];
				i = ch;
			}
			q[i] = q[qSiz];
		}
		cur = qSiz && q[0].len < ans[T].len ? q[0] : ans[T];
		if (cur.to++ == T) //优先选择T
			cur.to = 0;
		MPI_Allreduce(
			MPI_IN_PLACE,
			&cur,
			1,
			MPI_INT64_T,
			MPI_MIN,
			MPI_COMM_WORLD);
		if (!cur.to--)
			cur.to = T;
	}
	if (!id)
		for (printf("%d\n%d", cur.len + INF, cur.to); cur.to != S; cur = ans[cur.from])
			printf("<-%d", cur.from);
	MPI_Finalize();
}