multithreading - 在 Julia 中并行粒子群优化

标签 multithreading parallel-processing thread-safety julia

我在 Julia 中编写了粒子群优化算法,并尝试使用 Threads 来加速每个种群的计算。 这是代码

function uniform(dim::Int, lb::Array{Float64, 1}, ub::Array{Float64, 1})
    arr = rand(Float64, dim)
    @inbounds for i in 1:dim; arr[i] = arr[i] * (ub[i] - lb[i]) + lb[i] end
    return arr
end

function initialize_particles(problem, population)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest_position = uniform(dim, lb, ub)
    gbest = Gbest(gbest_position, cost_func(gbest_position))

    particles = []
    for i in 1:population
        position = uniform(dim, lb, ub)
        velocity = zeros(dim)
        cost = cost_func(position)
        best_position = copy(position)
        best_cost = copy(cost)
        push!(particles, Particle(position, velocity, cost, best_position, best_cost))

        if best_cost < gbest.cost
            gbest.position = copy(best_position)
            gbest.cost = copy(best_cost)
        end
    end
    return gbest, particles
end

function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest, particles = initialize_particles(problem, population)

    # main loop
    for iter in 1:max_iter
        @threads for i in 1:population
            particles[i].velocity .= w .* particles[i].velocity .+
                c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) .+
                c2 .* rand(dim) .* (gbest.position .- particles[i].position)

            particles[i].position .= particles[i].position .+ particles[i].velocity
            particles[i].position .= max.(particles[i].position, lb)
            particles[i].position .= min.(particles[i].position, ub)

            particles[i].cost = cost_func(particles[i].position)

            if particles[i].cost < particles[i].best_cost
                particles[i].best_position = copy(particles[i].position)
                particles[i].best_cost = copy(particles[i].cost)
        
                if particles[i].best_cost < gbest.cost
                    gbest.position = copy(particles[i].best_position)
                    gbest.cost = copy(particles[i].best_cost)
                end
            end
        end
        w = w * wdamp
        if iter % 50 == 1
            println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
            println("Best Position = " * string(gbest.position))
            println()
        end
    end
    gbest, particles
end

显然,它在 PSO 的主循环中不是线程安全的。这是因为计算时需要修改gbest的位置。 我尝试使用 Atomic,但每个粒子的位置是一个数组。所以这种方法不适用。 Lock 需要初始化,所以它也不会工作。 我也试过用Floop,但是出错了。

那么在并行计算的时候有没有办法保证数据是锁住的呢?

感谢您的宝贵时间!

最佳答案

不需要锁定变量。当您进行并行计算时,您总是可以考虑 Split-Combine 策略的术语(抱歉,只是发明了这个术语来类比 DataFrames 计算,您也可以将其称为 Map-Reduce 方法)。这个想法是,您围绕不同的线程拆分计算并独立执行它们,并在最后阶段将结果组合在一个线程上。所以你的代码看起来像这样(可能会有语法错误,因为没有 Particle 和其他定义我无法运行代码,但我希望它足以给出这个想法)

function minby(itr; by=identity, init=nothing)
     init = isnothing(init) ? pop!(itr) : init
     mapreduce(x->(by(x)=>x), (x,y)->(first(x)>first(y)) ? y : x, itr; init=by(init)=>init).second
end

function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    dim = problem.dim
    lb = problem.lb
    ub = problem.ub
    cost_func = problem.cost_func

    gbest, particles = initialize_particles(problem, population)

    # main loop
    for iter in 1:max_iter
        gbests = fill((gbest.cost, 0), Threads.nthreads())
        @threads for i in 1:population
            particles[i].velocity .= w .* particles[i].velocity .+
                c1 .* rand(dim) .* (particles[i].best_position .- particles[i].position) .+
                c2 .* rand(dim) .* (gbest.position .- particles[i].position)

            particles[i].position .= particles[i].position .+ particles[i].velocity
            particles[i].position .= max.(particles[i].position, lb)
            particles[i].position .= min.(particles[i].position, ub)

            particles[i].cost = cost_func(particles[i].position)

            if particles[i].cost < particles[i].best_cost
                particles[i].best_position = copy(particles[i].position)
                particles[i].best_cost = copy(particles[i].cost)
        
                if particles[i].best_cost < gbests[Threads.threadid()][1]
                    gbests[Threads.threadid()] = (particles[i].best_cost, i)
                end
            end
        end
        gbest1 = minby(gbests, by = x -> x[1])
        if gbest1[2] != 0
            idx = gbest1[2]
            gbest.position = copy(particles[idx].best_position)
            gbest.cost = copy(particles[idx].best_cost)
        end
        w = w * wdamp
        if iter % 50 == 1
            println("Iteration " * string(iter) * ": Best Cost = " * string(gbest.cost))
            println("Best Position = " * string(gbest.position))
            println()
        end
    end
    gbest, particles
end

顺便说一句,你可能会找到那个包 UnPack.jl还是比较方便的。除了手动分配,您还可以这样做

using UnPack
function PSO(problem; max_iter=100,population=100,c1=1.4962,c2=1.4962,w=0.7298,wdamp=1.0)
    @unpack dim, lb, ub, cost_func = problem
    ....

关于multithreading - 在 Julia 中并行粒子群优化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64767091/

相关文章:

c++ - 查找并行化 C/C++ 的第 n 个排列

parallel-processing - C# 中的并行 A* 搜索 - 不同的路径

c - 运行时加载的共享库中的 Libcurl 和 curl_global_init

c# - 启动同一个线程两次 - 第一个完成后的第二次

c++ - QTimer->remainingTime() 返回 -1371648957

c++ - 多线程应用程序中的独立日志记录

具有动态延迟的 C# 任务

.net - 如何同步锁定共享整数

c# - 从后台工作人员登录MyLogger时发生跨线程InvalidOperationException

java - 平滑 Android 游戏循环