## 实现并行梯形数值积分的 MPI 算法

lf ask(lf a, lf b, lf f(lf x), lf e)
{
lf ans = 0, fa = f(a);
while (a < b)
{
lf fb = f(a += e);
ans += fa + fb;
fa = fb;
}
return ans * e * 0.5;
}


### 用 MPI 的点对点通信函数完成梯形数值积分的并行算法

	if (!id)
{
for (int i = 1; i < numThreads; ++i)
{
MPI_Recv(
&myAns,
1,
MPI_DOUBLE,
i,
0,
MPI_COMM_WORLD,
MPI_STATUES_IGNORE);
ans += myAns;
}
}
else
MPI_Send(
&myAns,
1,
MPI_DOUBLE,
0,
0,
MPI_COMM_WORLD);


### 用 MPI 的集合通信函数完成梯形数值积分的并行算法

	MPI_Reduce(
&myAns,
&ans,
1,
MPI_DOUBLE,
MPI_SUM,
0,
MPI_COMM_WORLD);


### 将上面的算法对瑕积分扩展

• f(x)=exp(bx)/sqrt(1+exp(cx)), 0<x<=L;
• 这时积分区间[0,L]划分不能是等长的，每个任务的小区间需要传递，并且参数 b，c，L 也需要传递。

lf f(lf x)
{
return exp(x) / sqrt(1 + exp(x));
}
lf simpson(lf a, lf b, lf f(lf x))
{
if (fabs(a) < EPS)
return f((a + b) * 0.5) * (b - a);
return (f(a) + 4 * f((a + b) * 0.5) + f(b)) * (b - a) / 6;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
if (e < EPS)
return simpson(a, b, f);
lf c = (a + b) * 0.5, L = simpson(a, c, f), R = simpson(c, b, f), delta = (L + R - simpson(a, b, f)) / 15;
return fabs(delta) < e ? L + R + delta : ask(a, c, f, e * 0.5) + ask(c, b, f, e * 0.5);
}


$time mpiexec -n 1 ./integral 9 3022859422404135022056366724809190601572944773201862114369138391962853640177743445854646916709852400063199838208.000000 with 1 proces, problem size 512. real 0m0.058s user 0m0.000s sys 0m0.063s$ time mpiexec -n 1 ./integral 10
-nan with 1 proces, problem size 1024.
real    0m0.058s
user    0m0.000s
sys     0m0.047s


### 填表

#### 运行时间 T（单位 s）

10.4240.5131.92823.536357.396706.6441441.164
20.4260.5531.40311.794182.182354.155706.369
40.4660.5421.0196.23092.010179.062353.818
80.5430.4870.7163.66746.01492.823177.530
160.5410.5760.7192.17723.25346.10293.670
320.8980.7470.8261.66312.63323.16945.404
641.4641.0671.1161.6386.81212.32423.790
1281.1101.0761.1761.4164.1056.95212.525
2561.5341.1881.1961.3192.8484.1106.936
5121.8021.7041.6951.7542.2102.6263.478

• 随着问题规模增加，同进程数下运行时间不断增加
• 问题规模比较小的时候，进程越多并行时间开销越大
• 问题规模比较大的时候，运行时间减少

#### 加速比 S

11.001.001.001.001.001.001.00
21.000.931.372.001.962.002.04
40.910.951.893.783.883.944.07
80.781.052.696.417.777.618.12
160.780.892.6810.8115.3715.3315.39
320.470.692.3314.1528.2930.5031.74
640.290.481.7214.3852.4757.3460.58
1280.380.481.6316.6287.06101.65115.06
2560.280.431.6117.84125.49171.93207.78
5120.240.301.1413.42161.71269.10414.37

• 随着问题规模增加，同进程数下加速比不断增加
• 随着进程数增加，同问题规模下加速比不断减少

#### 效率 E

11.001.001.001.001.001.001.00
20.500.930.681.000.981.001.02
40.230.240.470.950.970.991.02
80.100.130.340.810.970.951.02
160.050.060.170.680.960.960.96
320.010.020.070.440.880.950.99
640.0050.0080.030.220.820.900.95
1280.0030.0040.010.130.680.790.90
2560.0010.0020.0060.070.490.670.81
5120.00050.00060.0020.030.320.530.81

• 随着问题规模增加，同进程数下效率不断增加
• 随着进程数增加，同问题规模下效率不断减少

### 源代码integral.c

//#define WK_SIMPSON
//#define WK_P2P
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
typedef double lf;
const lf EPS = 1e-5;
#ifdef WK_SIMPSON
lf f(lf x)
{
return exp(x) / sqrt(1 + exp(x));
}
lf simpson(lf a, lf b, lf f(lf x))
{
if (fabs(a) < EPS)
return f((a + b) * 0.5) * (b - a);
return (f(a) + 4 * f((a + b) * 0.5) + f(b)) * (b - a) / 6;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
if (e < EPS)
return simpson(a, b, f);
lf c = (a + b) * 0.5, L = simpson(a, c, f), R = simpson(c, b, f), delta = (L + R - simpson(a, b, f)) / 15;
return fabs(delta) < e ? L + R + delta : ask(a, c, f, e * 0.5) + ask(c, b, f, e * 0.5);
}
#else
lf f(lf x)
{
return x;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
lf ans = 0, fa = f(a);
while (a < b)
{
lf fb = f(a += e);
ans += fa + fb;
fa = fb;
}
return ans * e * 0.5;
}
#endif
int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &id);
lf n = pow(2, atoi(argv[1])),
myL = id * len,
myAns = ask(myL, myL + len, f, 1e-2),
ans = myAns;
#ifdef WK_P2P
if (!id)
{
for (int i = 1; i < numThreads; ++i)
{
MPI_Recv(
&myAns,
1,
MPI_DOUBLE,
i,
0,
MPI_COMM_WORLD,
MPI_STATUES_IGNORE);
ans += myAns;
}
}
else
MPI_Send(
&myAns,
1,
MPI_DOUBLE,
0,
0,
MPI_COMM_WORLD);
#else
MPI_Reduce(
&myAns,
&ans,
1,
MPI_DOUBLE,
MPI_SUM,
0,
MPI_COMM_WORLD);
#endif
if (!id)
printf("%f with %d proces, problem size %.0f.", ans, numThreads, n);
MPI_Finalize();
}


### 作业脚本integral.pbs

#PBS -N integral
#PBS -l nodes=16:ppn=32
#PBS -j oe

source /public/software/profile.d/mpi_openmpi-intel-2.1.2.sh
mpicc integral.c -o integral -std=c11 -lm
for logN in 14 18 22 26 30 31 32
do
time mpiexec -machinefile $PBS_NODEFILE ./integral$logN
done


### 输出文件integral.o1761

• integral.o1795对应 256 核（nodes=8:ppn=32
• integral.o1796对应 128 核（nodes=4:ppn=32
• integral.o1797对应 64 核（nodes=2:ppn=32
• integral.o1798对应 32 核（nodes=1:ppn=32
• integral.o1799对应 16 核（nodes=1:ppn=16
• integral.o1800对应 8 核（nodes=1:ppn=8
• integral.o1801对应 4 核（nodes=1:ppn=4
• integral.o1802对应 2 核（nodes=1:ppn=2
• integral.o1803对应 1 核（nodes=1:ppn=1
134218391.043055 with 512 proces, problem size 16384.
real	0m1.802s
user	0m2.328s
sys	0m6.054s
34359872522.271759 with 512 proces, problem size 262144.
real	0m1.704s
user	0m2.188s
sys	0m5.706s
8796101094338.814453 with 512 proces, problem size 4194304.
real	0m1.695s
user	0m2.281s
sys	0m5.844s
2251800146489160.500000 with 512 proces, problem size 67108864.
real	0m1.754s
user	0m4.053s
sys	0m5.900s
576461292861516288.000000 with 512 proces, problem size 1073741824.
real	0m2.210s
user	0m20.316s
sys	0m5.308s
2305845201815247872.000000 with 512 proces, problem size 2147483648.
real	0m2.626s
user	0m36.019s
sys	0m4.861s
9223215905289959424.000000 with 512 proces, problem size 4294967296.
real	0m3.478s
user	0m59.168s
sys	0m5.120s


## 完成正则采样排序 PSRS 的 MPI 算法

### psrs.c

//#define WK_GEN_DATA
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
typedef long int ll;
int sgn(ll a) { return a < 0 ? -1 : a ? 1 : 0; }
int cmpll(const void *a, const void *b) { return sgn(*(ll *)a - *(ll *)b); }
int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
double start = MPI_Wtime();
MPI_Comm_rank(MPI_COMM_WORLD, &id);
#ifdef WK_GEN_DATA
ll n = ((ll)1) << atoi(argv[1]),
*a = (ll *)malloc(len * sizeof(ll));
#else
ll n;
MPI_File fh;
MPI_File_open(
MPI_COMM_WORLD,
argv[1],
MPI_MODE_RDONLY,
MPI_INFO_NULL,
&fh);
fh,
0,
&n,
1 * sizeof(ll), //1,
MPI_BYTE,		//MPI_LONG_INT,
MPI_STATUSES_IGNORE);
beg = id * blockSize,
end = beg + blockSize < n ? beg + blockSize : n,
len = end - beg,
*a = (ll *)malloc(len * sizeof(ll));
fh,
sizeof(ll) * (1 + beg),
a,
len * sizeof(ll), //len,
MPI_BYTE,		  //MPI_LONG_INT,
MPI_STATUSES_IGNORE);
MPI_File_close(&fh);
MPI_Barrier(MPI_COMM_WORLD);
if (!id)
printf("Finish reading %ld datas at %fs with %d procs.\n", n, MPI_Wtime() - start, numThreads);
#endif
qsort(a, len, sizeof(ll), cmpll);
//局部排序
ll *sample = (ll *)malloc(numThreads * sizeof(ll)),
for (int i = 0; i < numThreads; ++i)
sample[i] = a[len / numThreads * i];

//采样
MPI_Gather(
sample,
MPI_BYTE,				 //MPI_LONG_INT,
sampleGather,
MPI_BYTE,				 //MPI_LONG_INT,
0,
MPI_COMM_WORLD);

if (!id)
{
for (int i = 0; i < numThreads; ++i)
sample[i] = sampleGather[i * numThreads]; //获得主元
#ifndef WK_GEN_DATA
printf("Get privot at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
}

MPI_Bcast(
sample,
MPI_BYTE,				 //MPI_LONG_INT,
0,
MPI_COMM_WORLD); //分发主元

int *sendCounts = (int *)malloc(numThreads * sizeof(int)),
*recvCounts = (int *)malloc(numThreads * sizeof(int)),
*sdisp = (int *)malloc(numThreads * sizeof(int)),
*rdisp = (int *)malloc(numThreads * sizeof(int));

for (int i = 0; i < numThreads; ++i)
sendCounts[i] = 0;

for (int i = 0, j = 0; i < len; ++i)
{
while (j < numThreads && a[i] >= sample[j])
++j;
//++sendCounts[j - 1];
sendCounts[j - 1] += sizeof(ll);
}

MPI_Alltoall( //提前通知一下节点，各个节点要准备接收多少数据
sendCounts,
//1 * sizeof(int),
1,
//MPI_BYTE,
MPI_INT,
recvCounts,
//1 * sizeof(int),
1,
//MPI_BYTE,
MPI_INT,
MPI_COMM_WORLD);
#ifndef WK_GEN_DATA
if (!id)
printf("Send counts at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
sdisp[0] = rdisp[0] = 0;
for (int i = 1; i < numThreads; ++i)
{
sdisp[i] = sendCounts[i - 1] + sdisp[i - 1];
rdisp[i] = recvCounts[i - 1] + rdisp[i - 1];
}

//ll *local = (ll *)malloc((rdisp[numThreads - 1] + recvCounts[numThreads - 1]) * sizeof(ll));

MPI_Alltoallv(
a,
sendCounts,
sdisp,
MPI_BYTE, //MPI_LONG_INT,
local,
recvCounts,
rdisp,
MPI_BYTE, //MPI_LONG_INT,
MPI_COMM_WORLD);
#ifndef WK_GEN_DATA
if (!id)
printf("Alltoallv at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
qsort(local, (rdisp[numThreads - 1] + recvCounts[numThreads - 1]) / sizeof(ll), sizeof(ll), cmpll); //可优化为n路归并

if (!id)
printf("Finish sort %ld elements at %fs with %d procs.\n", n, MPI_Wtime() - start, numThreads);

free(local);
free(sendCounts);
free(recvCounts);
free(sdisp);
free(rdisp);

free(sample);
free(sampleGather);
free(a);

MPI_Finalize();
}


### psrs.pbs

#PBS -N psrs
#PBS -l nodes=16:ppn=32
#PBS -j oe

source /public/software/profile.d/mpi_openmpi-intel-2.1.2.sh
mpicc psrs.c -o psrs -std=c11
mpiexec -machinefile $PBS_NODEFILE ./psrs /public/home/shared_dir/psrs_data  ### psrs.o1529 作业脚本得到的输出文件。可以看到，这里一共使用了18.124634s就成功对4294967295个数进行了排序，其中有6.403623s花费在文件读入上，也就是总共花了不到十二秒就完成了排序，还是明显优于单机串行排序的。 Finish reading 4294967295 datas at 6.403623s with 512 procs. Get privot at 8.314104s with 512 procs. Send counts at 8.333872s with 512 procs. Alltoallv at 17.238057s with 512 procs. Finish all the works at 18.124634s with 512 procs.  ### 填表 去掉#define WK_GEN_DATA前的注释，进行测试（不再从输入文件中得到数据，各进程随机生成指定问题规模规模的数据）。 此外，由于并行正则采样排序的限制，排序元素的数量至少要是进程数的平方倍。因此这里问题规模从$2^18$开始（进程数最多有$512=2^9$个）。 由于数据规模过大，进程数少的时候运行时间超过一小时被调度器kill了，因此对应数据没有测出运行时间和相应加速比。 #### 运行时间 T（单位 s） 详见下面的输出文件部分。 进程数\问题规模$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32} 10.040.357.10936.30xx 20.040.233.10463.53xx 40.050.202.12229.26890.62x 80.050.181.56149.69592.83x 160.050.201.2280.37250.98490.23 320.090.220.9643.30176.67277.68 640.140.470.8921.77136.3090.79 1280.210.441.1317.8733.2144.52 2560.290.451.289.2217.4323.93 5120.380.571.475.108.3014.47 #### 加速比 S 加速比 S=同等规模下的串行时间/并行时间。 进程数\问题规模2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$11.000.351.001.00xx 21.001.522.292.02xx 40.801.753.354.08xx 80.801.944.556.25xx 160.801.755.8111.65xx 320.441.597.4021.62xx 640.290.747.9843.00xx 1280.190.806.2852.40xx 2560.140.785.55101.55xx 5120.110.614.83183.58xx #### 效率 E 运行效率 E=加速比 S/并行线程数。 进程数\问题规模$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}2^{32}\$
11.000.351.001.00xx
21.000.761.151.01xx
40.200.440.841.02xx
80.100.240.570.78xx
160.050.110.320.73xx
320.010.050.230.66xx
640.0050.010.120.67xx
1280.0010.0060.050.41xx
2560.00050.0030.020.40xx
5120.00020.0010.0010.36xx