并发模式帮助 - 扇入并返回结果?

标签 concurrency go channel

我正在编写一个快速并发集成测试包。我已经编写了 POC,现在我正在尝试为它想出一个新模式。我希望遵守以下规则:

  • 一个测试套件可能有很多测试
  • 一个测试套件必须有 n 个 worker 来运行 TestPreppers
  • 一个测试套件必须有 n 个 worker 来运行 TestValidators
  • 测试必须通过准备才能运行验证
  • 一个测试可能有很多 child
  • 在运行子测试之前,测试必须通过验证

结构如下:

package conctest

func New() *TestSuite {
    return &TestSuite{nil, 1, 1, make(chan TestPrepper), make(chan TestValidator)}
}

type TestSuite struct {
    Tests                []*Test
    ConcurrentPreppers   int
    ConcurrentValidators int

    prepperChan   chan TestPrepper
    validatorChan chan TestValidator
}

type TestPrepper func() error
type TestValidator func() ValidatorResult

type ValidatorResult struct {
    Pass  bool
    Error error
}

type Test struct {
    Convey    string
    Details   string
    Prepper   TestPrepper
    Validator TestValidator
    MaxRuns   int
    Children  []*Test

    runs   int
    errors []error
}

我无法提出满足要求的并发设计。我需要从 TestSuite 公开一个可供测试使用的方法,该方法将允许它将其工作发送给 TestSuites 工作人员并将结果返回给测试。

最佳答案

这是我想出的解决方案。我欢迎任何批评或更好的方法,并将接受该答案。我创建了一个私有(private)传输结构,其中包含我的函数和一个返回结果的 channel :

package conctest

import (
    "sync"
    "time"
)

func New() *ConcTest {
    return &ConcTest{nil, 1, 1, make(chan *prepperTransport), make(chan *validatorTransport), nil}
}

type ConcTest struct {
    Tests                []*Test
    ConcurrentPreppers   int
    ConcurrentValidators int

    prepperChan   chan *prepperTransport
    validatorChan chan *validatorTransport

    testSync *sync.WaitGroup
}

func (ct *ConcTest) Run() {
    // start up prepper workers
    for i := 0; i < ct.ConcurrentPreppers; i++ {
        go func() {
            for p := range ct.prepperChan {
                time.Sleep(time.Second)
                p.Result <- p.Prepper()
            }
        }()
    }

    // start up validator workers
    for i := 0; i < ct.ConcurrentValidators; i++ {
        go func() {
            for v := range ct.validatorChan {
                time.Sleep(time.Second)
                v.Result <- v.Validator()
            }
        }()
    }

    // start parent tests, child tests will be called recursively
    ct.testSync = &sync.WaitGroup{}
    for _, t := range ct.Tests {
        ct.testSync.Add(1)
        go ct.runTest(t)
    }

    // wait for all tests to complete
    ct.testSync.Wait()
}

func (ct *ConcTest) runTest(t *Test) {
    // test is a pass until failure encountered
    t.Pass = true

    // run and wait for prep to finish
    pt := &prepperTransport{t.Prepper, make(chan PrepperResult)}
    ct.prepperChan <- pt
    pr := <-pt.Result

    // return on prep failure
    if pr != nil {
        t.Pass = false
        t.Errors = append(t.Errors, pr)
        ct.testSync.Done()
        return
    }

    // run the validator until pass or max runs reached
    for {
        // sleep for given frequency
        time.Sleep(t.Frequency)

        // send the validator to the queue
        t.Runs++
        vt := &validatorTransport{t.Validator, make(chan ValidatorResult)}
        ct.validatorChan <- vt

        // wait for validator response
        vr := <-vt.Result

        // append error to the test
        if vr.Error != nil {
            t.Errors = append(t.Errors, vr.Error)
        }

        // break on pass
        if vr.Pass {
            break
        }

        // break on max attempts
        if t.MaxRuns == t.Runs {
            t.Pass = false
            break
        }
    }

    // break on validator failure
    if !t.Pass {
        ct.testSync.Done()
        return
    }

    // run all children tests
    for _, c := range t.Children {
        ct.testSync.Add(1)
        go ct.runTest(c)
    }

    ct.testSync.Done()
    return
}

type Prepper func() PrepperResult
type PrepperResult error
type prepperTransport struct {
    Prepper Prepper
    Result  chan PrepperResult
}

type Validator func() ValidatorResult
type ValidatorResult struct {
    Pass  bool
    Error error
}
type validatorTransport struct {
    Validator Validator
    Result    chan ValidatorResult
}

type Test struct {
    Convey    string
    Details   string
    Frequency time.Duration
    MaxRuns   int

    Prepper   Prepper
    Validator Validator
    Children  []*Test

    Runs   int
    Errors []error
    Pass   bool
}

关于并发模式帮助 - 扇入并返回结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29101909/

相关文章:

java - 以线程安全方式创建对象

Java 非重入锁实现

unit-testing - testify/assert.Contains 如何与 map 一起使用?

Golang 重映射接口(interface) go-cache

elixir - 如何从控制台或 Phoenix 中的任何模块广播消息?

go - Go channel 是如何实现的?

Java - KeyListener 的线程导致并发修改。解决方案?

java - 我如何用 Java 编写一个信号量来优先考虑以前成功的申请人?

go - 如何高效地查找所有目录中与特定后缀匹配的所有文件?

java - Backendless - 我应该从服务订阅 channel 吗?