parallel-processing - 如何使用并行 HDF5 按多个等级分块写入

标签 parallel-processing mpi hdf5

我有 20X20 的数据集。我想将其以 2X2 × 4 列的 block 形式并行写入。我正在使用并行 HDF5。现在每个等级有 25 个 block 要写入。我不明白如何为此编码,因为当我使用普通分块时,所有等级都会写入整个 20X20 数据集。当我使用hyperslab时,我不知道如何为每个等级写入的多个 block 设置它。有人给我指点吗?我真的被困住了。

最佳答案

我不太确定我完全理解你的要求。这就是我如何解释你的问题。

  • 全局域名为20x20
  • 4 MPI 排名
  • 分块为2x2

您不必设置分块,事实上我通常不会。


我会这样做。

  1. MPI 中的域分解。
  2. 生成局部矩阵。
  3. 创建内存超板(基于局部矩阵)。
  4. 创建一个文件 hyperslab(基于全局矩阵)。
  5. 创建数据集分块属性。
  6. 写入数据集。

它看起来是这样的。 HDF5_Parallel_Chunked_Writing


! Program to use MPI_Cart and Parallel HDF5
!
program hdf_pwrite

        use mpi
        use hdf5
        use kinds, only : r_dp

        implicit none

        ! Local array size with halo
        integer, parameter :: g_N   = 20
        integer, parameter :: ndims = 2
        integer, parameter :: halo  = 0

        integer :: argc         ! Number of command line arguments
        integer :: ierr         ! Error status
        integer :: id           ! My rank/ID
        integer :: np           ! Number of processors
        integer :: iunit        ! File descriptor
        integer :: i,j          ! Loop indexers
        integer :: n(ndims)     ! Local N for i and j directions
        integer :: total(ndims) ! Local total dimension size

        ! MPI IO/Lustre file striping
        integer :: lcount       ! Lustre count size
        integer :: lsize        ! Lustre stripe size
        character(len=1024) :: clcount, clsize ! Strings of LFS

        integer :: info                 ! MPI IO Info
        integer :: m_dims(ndims)        ! MPI cart dims
        integer :: coords(ndims)        ! Co-ords of procs in the grid
        logical :: is_periodic(ndims)   ! Periodic boundary conditions
        logical :: reorder              ! Reorder the MPI structure
        integer :: MPI_COMM_2D          ! New communicator

        character(len=1024) :: filename
        integer(kind=hid_t) :: p_id, f_id, x_id, d_id, c_id
        integer(kind=hid_t) :: memspace, filespace
        ! Chunk sizes
        integer(kind=hsize_t) :: c_size(ndims)
        ! Local hyper slab info
        integer(kind=hsize_t) :: d_size(ndims), s_size(ndims), h_size(ndims), &
                                 stride(ndims), block(ndims)
        ! Global hyper slab info
        integer(kind=hsize_t) :: g_size(ndims), g_start(ndims)

        ! Local data array
        real(kind=r_dp), allocatable :: ld(:,:)

        argc = 0
        ierr = 0
        m_dims = (/ 0, 0/)
        is_periodic = .false.      ! Non-periodic
        reorder     = .false.      ! Not allowed to reorder

        call mpi_init(ierr)

        ! Set up the MPI cartesian topology
        call mpi_comm_size(MPI_COMM_WORLD, np, ierr)
        call mpi_dims_create(np, ndims, m_dims, ierr)

        call mpi_cart_create(MPI_COMM_WORLD, ndims, m_dims, is_periodic, &
                             reorder, MPI_COMM_2D, ierr)
        call mpi_comm_rank(MPI_COMM_2D, id, ierr)
        call mpi_cart_coords(MPI_COMM_2D, id, ndims, coords, ierr)

        if (id .eq. 0) then
                if (mod(g_N,np) .ne. 0) then
                        write(0,*) 'Must use divisiable number of procs.'
                        call mpi_abort(MPI_COMM_WORLD, 1, ierr)
                endif

                ! get the filename
                argc = command_argument_count()
                if (argc .lt. 1 ) then
                        write(0, *) 'Must supply a filename'
                        call exit(1)
                endif
                call get_command_argument(1, filename)
        endif

        ! Broadcast the filename
        call mpi_bcast(filename, len(filename), MPI_CHAR, 0, &
                       MPI_COMM_WORLD, ierr)

        ! Init the HDF5 library
        call h5open_f(ierr)

        ! Set a stripe count of 4 and a stripe size of 4MB
        lcount = 4
        lsize  = 4 * 1024 * 1024
        write(clcount, '(I4)') lcount
        write(clsize, '(I8)') lsize

        call mpi_info_create(info, ierr)
        call mpi_info_set(info, "striping_factor", trim(clcount), ierr)
        call mpi_info_set(info, "striping_unit", trim(clsize), ierr)

        ! Set up the access properties
        call h5pcreate_f(H5P_FILE_ACCESS_F, p_id, ierr)
        call h5pset_fapl_mpio_f(p_id, MPI_COMM_2D, info, ierr)

        ! Open the file
        call h5fcreate_f(filename, H5F_ACC_TRUNC_F, f_id, ierr, &
                         access_prp = p_id)
        if (ierr .ne. 0) then
                write(0,*) 'Unable to open: ', trim(filename), ': ', ierr
                call mpi_abort(MPI_COMM_WORLD, 1, ierr)
        endif

        ! Generate our local matrix
        do i = 1, ndims
                n(i) = g_N / m_dims(i)
                total(i) = n(i) + (2 * halo)
        end do
        if (halo .ne. 0) then
                allocate(ld(0:total(1)-1, 0:total(2)-1), stat=ierr)
        else
                allocate(ld(total(1),total(2)), stat=ierr)
        end if
        if (ierr .ne. 0) then
                write(0,*) 'Unable to allocate local data array: ', ierr
                call mpi_abort(MPI_COMM_WORLD, 1, ierr)
        end if

        ld = -99.99
        ! init the local data
        do j = 1, n(2)
                do i = 1, n(1)
                        ld(i,j) = id
                enddo
        enddo

        ! Create the local memory space and hyperslab
        do i = 1, ndims
                d_size(i) = total(i)
                s_size(i) = n(i)
                h_size(i) = halo
                stride(i) = 1
                block(i)  = 1
        enddo

        call h5screate_simple_f(ndims, d_size, memspace, ierr)
        call h5sselect_hyperslab_f(memspace, H5S_SELECT_SET_F, &
                                   h_size, s_size, ierr,       &
                                   stride, block)

        ! Create the global file space and hyperslab
        g_size  = g_N
        do i = 1, ndims
                g_start(i) = n(i) * coords(i)
        enddo

        call h5screate_simple_f(ndims, g_size, filespace, ierr)
        call h5sselect_hyperslab_f(filespace, H5S_SELECT_SET_F, &
                                   g_start, s_size, ierr,       &
                                   stride, block)

        ! Create a data chunking property
        c_size = 2
        call h5pcreate_f(H5P_DATASET_CREATE_F, c_id, ierr)
        call h5pset_chunk_f(c_id, ndims, c_size, ierr)
        ! Create the dataset id
        call h5dcreate_f(f_id, "/data", H5T_IEEE_F64LE, filespace, d_id, &
                         ierr, dcpl_id=c_id)


        ! Create a data transfer property
        call h5pcreate_f(H5P_DATASET_XFER_F, x_id, ierr)
        call h5pset_dxpl_mpio_f(x_id, H5FD_MPIO_COLLECTIVE_F, ierr)
        ! Write the data
        call h5dwrite_f(d_id, H5T_IEEE_F64LE, ld, s_size, ierr,         &
                        file_space_id=filespace, mem_space_id=memspace, &
                        xfer_prp=x_id)

        if (allocated(ld)) then
                deallocate(ld)
        endif

        ! Close everything and exit
        call h5dclose_f(d_id, ierr)
        call h5sclose_f(filespace, ierr)
        call h5sclose_f(memspace, ierr)
        call h5pclose_f(c_id, ierr)
        call h5pclose_f(x_id, ierr)
        call h5pclose_f(p_id, ierr)
        call h5fclose_f(f_id, ierr)
        call h5close_f(ierr)

        call mpi_finalize(ierr)
end program hdf_pwrite

为了完整起见,这里是种类的定义。

module kinds

        use, intrinsic                               :: iso_fortran_env

        implicit none

        private
        public  :: i_sp, i_dp,      &
                   r_sp, r_dp, r_qp

        integer, parameter                           :: i_sp = INT32
        integer, parameter                           :: i_dp = INT64

        integer, parameter                           :: r_sp = REAL32
        integer, parameter                           :: r_dp = REAL64
        integer, parameter                           :: r_qp = REAL128

end module kinds

然后编译、运行并查看输出文件:

$ make 
rm -f kinds.o kinds.mod
h5pfc -c -O3  -o kinds.o kinds.f90
rm -f hdf_pwrite.o hdf_pwrite.mod
h5pfc -c -O3  -o hdf_pwrite.o hdf_pwrite.f90
h5pfc -O3  -o hdf_pwrite kinds.o hdf_pwrite.o

$ mpiexec -np 4 ./hdf_pwrite test.h5

$ h5dump test.h5
HDF5 "test.h5" {
GROUP "/" {
   DATASET "data" {
      DATATYPE  H5T_IEEE_F64LE
      DATASPACE  SIMPLE { ( 20, 20 ) / ( 20, 20 ) }
      DATA {
      (0,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (1,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (2,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (3,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (4,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (5,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (6,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (7,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (8,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (9,0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
      (10,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (11,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (12,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (13,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (14,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (15,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (16,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (17,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (18,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
      (19,0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
      }
   }
}
}

希望这有帮助。

编辑: 当然,您应该使用更好的算法来进行域分解,例如 MPE_Decomp1d .

关于parallel-processing - 如何使用并行 HDF5 按多个等级分块写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31283594/

相关文章:

c - 使用 MPI_Isend 和 MPI_Irecv 而不是 MPI_Bcast 为所有处理器定时广播消息

python - 在 bash、R、python 或 NCL 中将 hdf5 转换为 netcdf4?

python - 如何扩展 h5py 以便我可以访问 hdf5 文件中的数据?

python - 在 TensorFlow 中导入巨大的非图像数据集

c# - TPL 数据流资源未发布

java - 2个线程同时访问同步函数

scala - 使用 ScalaMeter 测试并行操作的性能

c++ - 使用 MPI 的 C++ 代码中每个进程的相同随机数

java - OpenMPI Java 绑定(bind)行为差异(锁定、累积、获取)

c - 如何使用 MPI_Isend 发送多维、动态和不断增长的数组