python-3.x - 如何在 python 中使用 mpi4py 库连接收集的数据

标签 python-3.x parallel-processing mpi cluster-computing mpi4py

我曾经使用mpi4py列出追加数据,并尝试将数据按顺序保存在source(root==0)节点。

按照 Alan22 的建议,我修改了代码并且它可以工作,但是脚本没有正确连接,所以我得到了如图 01 所示的输出文件。

任何人都可以帮助解决错误消息吗?此外,无论我用 python 脚本编写什么 [如下所示],都不是解决问题的最佳方法。 saved output array

有什么方法可以有效地解决这类问题吗?非常感谢任何帮助。

python脚本给出如下:

import numpy as np
from scipy import signal
from mpi4py import MPI  
import random
import cmath, math
import matplotlib.pyplot as plt
import time

#File storing path
save_results_to = 'File storing path'
count_day = 1
count_hour = 1

arr_x = [0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0, 8.49, 12.0]
arr_y = [0, 8.49, 12.0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0]
M = len(arr_x)
N = len(arr_y)

np.random.seed(12345)
total_rows = 50000
raw_data=np.reshape(np.random.rand(total_rows*N),(total_rows,N))

# Function of CSD:: Using For Loop
fs = 500;       # Sampling frequency
def csdMat(data):
    dat, cols = data.shape   # For 2D data
    total_csd = []
    for i in range(cols):
        col_csd =[]
        for j in range( cols):
            freq, Pxy = signal.csd(data[:,i], data[:, j], fs=fs, window='hann', nperseg=100, noverlap=70, nfft=5000) 
            col_csd.append(Pxy)  
        total_csd.append(col_csd)
        pxy = np.array(total_csd)
    return freq, pxy

# Finding cross spectral density (CSD)
t0 = time.time()
freq, csd = csdMat(raw_data)
print('The shape of the csd data', csd.shape)
print('Time required {} seconds to execute CSD--For loop'.format(time.time()-t0))

kf=1*2*np.pi/10
resolution = 50 # This is important:: the HIGHER the Resolution, the higher the execution time!!!
grid_size = N * resolution
kx = np.linspace(-kf, kf, )  # space vector
ky = np.linspace(-kf, kf, grid_size)  # space vector

def DFT2D(data):
    P=len(kx)
    Q=len(ky)
    dft2d = np.zeros((P,Q), dtype=complex)
    for k in range(P):
        for l in range(Q):
            sum_log = []
            mat2d = np.zeros((M,N))
            sum_matrix = 0.0
            for m in range(M):
                for n in range(N):
                    e = cmath.exp(-1j*((((dx[m]-dx[n])*kx[l])/1) + (((dy[m]-dy[n])*ky[k])/1)))
                    sum_matrix += data[m, n] * e
            dft2d[k,l] = sum_matrix
    return dft2d

dx = arr_x[:]; dy = arr_y[:]


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = []
start_freq = 100
end_freq   = 109
freq_range = np.arange(start_freq,end_freq)
no_of_freq = len(freq_range)

for fr_count in range(start_freq, end_freq):
    if fr_count % size == rank:
        dft = np.zeros((grid_size, grid_size))
        spec_csd = csd[:,:, fr_count]
        dft = DFT2D(spec_csd)  # Call the DFT2D function
        spec = np.array(np.real(dft))  # Spectrum or 2D_DFT of data[real part]
        print('Shape of spec', spec.shape)
        data.append(spec)
        #data = np.append(data,spec)
        np.seterr(invalid='ignore')
data = comm.gather(data, root =0)
#    comm.Allreduce(MPI.IN_PLACE,data,op=MPI.MAX)
print("Rank: ", rank, ". Spectrum shape is:\n", spec.shape)


if rank == 0:
    output_data = np.concatenate(data, axis = 0)
    #output_data = np.c_(data, axis = 0)
    dft_tot = np.array((output_data), dtype='object')
    res = np.zeros((grid_size, grid_size))
    for k in range(size):
        for i in range(no_of_freq):

            jj = np.around(freq[freq_range[i]], decimals = 2)

            #print('The shape of data after indexing', data1.shape)
            #data_final=data1.reshape(data1.shape[0]*data1.shape[1], data1.shape[2])
            res[i * size + k] = dft_tot[k][i] #np.array(data[k])
            data = np.array(res)
            #print('The shape of the dft at root node', data.shape)
            np.savetxt(save_results_to + f'Day_{count_day}_hour_{count_hour}_f_{jj}_hz.txt', data.view(float))

我使用以下 bash 脚本命令来运行脚本(即 my_file.sh)。我使用命令 sbatch my_file.sh

提交作业
#! /bin/bash -l
#SBATCH -J testmvapich2
#SBATCH -N 1 ## Maximum 04 nodes
#SBATCH --ntasks=10
#SBATCH --cpus-per-task=1        # cpu-cores per task
#SBATCH --mem-per-cpu=3000MB
#SBATCH --time=00:20:00
#SBATCH -p para
#SBATCH --output="stdout.txt"
#SBATCH --error="stderr.txt"
#SBATCH -A camk
##SBATCH --mail-type=ALL
##SBATCH --chdir=/work/cluster_computer/my_name/data_work/MMC331/


eval "$(conda shell.bash hook)"
conda activate myenv
#conda activate fast-mpi4py

cd $SLURM_SUBMIT_DIR

#module purge
#module add mpi/mvapich2-2.2-x86_64

mpirun python3 mpi_test.py

最佳答案

你可以在“data = comm.gather(data, root=0)”之后尝试这个

if rank == 0:
    print('Type of data:', type(data))
    dft_tot = np.array((data))#, dtype='object')
    print('shape of DATA array:', dft_tot.shape)
    #print('Type of dft array:', type(dft_tot))
    res = np.zeros((450,450))
    for k in range(size):
#            for i in range(len(data[rank])):
        for i in range(no_of_freq):

            jj = np.around(freq[freq_range[k]], decimals = 2)
            #data1 = np.array(dft_tot[k])
            res[i * size + k] = data[k]
            data = np.array(res)#.reshape(data1.shape[0]*data1.shape[1], data1.shape[2])
            print('The shape of the dft at root node', data.shape)
            np.savetxt(save_results_to + f'Day_{count_day}_hour_{co

这是链接。希望对你有帮助mpi4py on HPC: comm.gather

关于python-3.x - 如何在 python 中使用 mpi4py 库连接收集的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72522846/

相关文章:

python - 如何在 PyQT5 中为 QPlainTextEdit(或任何其他组件)实现关键监听器

python-3.x - 从 Python 中的 HTTP 触发的 azure 函数返回 excel 文件

Cuda 编程与 C 编程比较

c# - 如何在 C# 中并行执行事件处理程序

c - 使用 MPI 在 C 中发送二维数组 block

algorithm - 分析和理解这个算法

python-3.x - 如何格式化机器人 'send_message' 输出,使其像表格一样对齐?

Python如何清理脏的日期时间字符串

c# - 使用 LINQ 生成素数

mpi - 有人可以提出一种理解 MPI 工作原理的好方法吗?