我有一个包含四个进程的 nextflow 管道,最后一个进程“INTERSECT”的行为很奇怪,有时它只是不执行任何操作,但也没有给出任何错误。 下面的例子:

nextflow run 
N E X T F L O W  ~  version 22.10.6
Launching `` [backstabbing_lumiere] DSL2 - revision: bb41e3dd9a
executor >  local (11)
[61/6edec1] process > GIMME_SCAN (3)    [100%] 5 of 5 ✔
[48/bfb62c] process > EXTRACT_GIMME (5) [100%] 5 of 5 ✔
[23/b911e5] process > EXTRACT_NR_MOTIFS [100%] 1 of 1 ✔
[-        ] process > INTERSECT    

如果我 -resume,它会正常执行并输出预期结果。

nextflow run  -resume
N E X T F L O W  ~  version 22.10.6
Launching `` [festering_bardeen] DSL2 - revision: bb41e3dd9a
executor >  local (5)
[52/17a139] process > GIMME_SCAN (2)    [100%] 5 of 5, cached: 5 ✔
[46/61caed] process > EXTRACT_GIMME (1) [100%] 5 of 5, cached: 5 ✔
[23/b911e5] process > EXTRACT_NR_MOTIFS [100%] 1 of 1, cached: 1 ✔
[d8/42ae7a] process > INTERSECT (Ss4)   [100%] 5 of 5 ✔

我是 nextflow 的新手,不知道如何在执行中设置依赖项。有人可以帮我解决这个问题吗?

我的 nextflow 管道如下:

#!/usr/bin/env nextflow

params.input = "beds/*.bed"
params.pfm = file("nonredundant.motifs.pfm")
params.nr_count = file("")

params.gimme_scan_bed_files = './results/gimme_extract/*/*.bed'
params.coord_bed_files = './results/motifs_nr_coord/*.bed'
params.outdir = './results/intersect'

process GIMME_SCAN {
    publishDir 'results/gimme_scan', mode: 'copy', overwrite: false

    path BED
    path PFM
    file ("${BED}_gimme.scan.bed")

    gimme scan $BED -g Ssal_v3.1 -p $PFM -f 0.05 -b > ${BED}_gimme.scan.bed

    publishDir 'results/gimme_extract', mode: 'copy', overwrite: false

    path gimme_scan_file
    path '*'

    Rscript '$baseDir/gimmeScan_extract_motifs.R' $gimme_scan_file gimme_extract

    publishDir 'results/', mode: 'copy', overwrite: false

    path nr_count

    path '*'

    Rscript '$baseDir/extract_motifs_maelstrom.R' $nr_count

process INTERSECT {

    tag { scan_id }

    publishDir "${params.outdir}", mode: 'copy'

    tuple val(scan_id), path('bed_dir/*')
    path 'coord_dir/*'

    tuple val(scan_id), path("${scan_id}/*")

    mkdir "!{scan_id}"
    for bed in bed_dir/*; do

        bedtools intersect \\
            -a "${bed}" \\
            -b "coord_dir/$(basename "${bed}")" \\
            -wa |
        sort \\
            -u \\
            > "!{scan_id}/$(basename "${bed}" '.bed')_intersected.bed"

workflow {
    input_ch = Channel.fromPath(params.input)
    pfm_file = params.pfm
    nr_count_file = params.nr_count

    GIMME_SCAN(input_ch, pfm_file)
        .fromFilePairs( params.gimme_scan_bed_files, size: -1) {
  '_') + 1)
        .set { gimme_scan_bed_files }

        .fromPath( params.coord_bed_files )
        .set { coord_bed_files }

    INTERSECT(gimme_scan_bed_files, coord_bed_files)



问题是您正在尝试访问publishDir中的一些输出文件。 ,但在(第一次)运行完成之前它们并不存在。解决方案是确保仅使用一个或多个 channel 访问输出文件:

Files are copied into the specified directory in an asynchronous manner, so they may not be immediately available in the published directory at the end of the process execution. For this reason, downstream processes should not try to access output files through the publish directory, but through channels.

我不清楚你的 Rscript 的输出是什么(使用 output: path('*') 并没有告诉我们太多),但如果我理解正确的话,以下 < em>可能适合你。它至少应该让您开始:

arams.input_beds = "./beds/*.bed"
params.pfm = "nonredundant.motifs.pfm"
params.nr_count = ""

params.outdir = './results'
process GIMME_SCAN {

    tag { sample }

    publishDir "${params.outdir}/gimme_scan", mode: 'copy'

    tuple val(sample), path(bed)
    path pfm
    tuple val(sample), path("${bed.baseName}_gimme.scan.bed")

    gimme scan \\
        -g Ssal_v3.1 \\
        -p "${pfm}" \\
        -f 0.05 \\
        -b \\
        "${bed}" \\
        > "${bed.baseName}_gimme.scan.bed"

    tag { sample }

    publishDir "${params.outdir}/gimme_extract", mode: 'copy'

    tuple val(sample), path(gimme_scan_file)
    tuple val(sample), path("${sample}/*.bed")

    gimmeScan_extract_motifs.R \\
        "${gimme_scan_file}" \\

    publishDir "${params.outdir}/extract_nr_motifs", mode: 'copy'

    path nr_count

    path '*.bed'

    extract_motifs_maelstrom.R "${nr_count}"
process INTERSECT {

    tag { sample }

    publishDir "${params.outdir}/intersect", mode: 'copy'

    tuple val(sample), path('bed_dir/*')
    path 'coord_dir/*'

    tuple val(sample), path("${sample}/*")

    mkdir "!{sample}"
    for bed in bed_dir/*; do

        bedtools intersect \\
            -a "${bed}" \\
            -b "coord_dir/$(basename "${bed}")" \\
            -wa |
        sort \\
            -u \\
            > "!{sample}/$(basename "${bed}" '.bed')_intersected.bed"
workflow {

    input_beds = Channel.fromFilePairs( params.input_beds, size: 1 )
    pfm_file = file( params.pfm )
    nr_count_file = file( params.nr_count )

    EXTRACT_NR_MOTIFS( nr_count_file )

    GIMME_SCAN( input_beds, pfm_file )

请注意,您可以将 Rscript 移动到项目存储库根目录中名为“bin”的文件夹中。 Nextflow 会自动将此文件夹添加到执行环境中的PATH 1 。如果它们尚未可执行,您可以使用 chmod +x your_script.R 使其可执行。

关于nextflow - nextflow 进程之一将无法一致地运行/完成,我们在Stack Overflow上找到一个类似的问题:


