我一直在Golang中创建一个模拟器类型的系统,我想使用InfluxDB 2.0的Golang客户端存储该系统中的数据。但是,想看看我是否可以通过goroutine提高数据库写入的速度。
由于为每个数据点设置Goroutine将导致InfluxDB2不堪重负,因此我决定实现一个工作池系统,该系统将限制所使用Goroutine的数量。但是,尽管数据是通过工作程序池写入数据库的,但它总是会被破坏,并且会有以前不存在的奇怪的峰值和值的变化(而不是直线,而是不稳定的)。
我通过一个名为Simulate的函数来执行此操作,该函数接收一个时间值(用于时间序列数据库),一个实体struct指针(包含要模拟的所有数据)和两个单独的客户端,每个客户端编写一个不同的集合数据的。
maxNumGoroutines := flag.Int("maxNumGoroutines", 10, "Max num of goroutines")
flag.Parse()
concurrentGoroutines := make(chan struct{}, *maxNumGoroutines) // Semaphore
var wg sync.WaitGroup // Wait for goroutines to finish
timeLength := setTimeLength(inputVar) // Example of setting length of time
simObjects:= &entities.Objects // objects are propagated as *Object, meaning no return value
// Additional entities also exist inside the entities struct
for timeIterator := 0; timeIterator <= timeLength; timeIterator++ {
for _, objectID:= range entities.Objects.GetObjectIDs() { // all objects within the simulation
wg.Add(1)
go func(entityState *entities.EntityHolder, chosenObj string, timeGo time.Time) {
defer wg.Done()
concurrentGoroutines <- struct{}{} // Set goroutine as busy
calc.Propagate(entityState.Objects.GetObject(chosenObj), timeGo) // Edit the value at pointer addr
calc.Metrics(entityState, chosenObj, timeGo, metricDB) // seperate further calcs and write
PassInflux(entityState.Objects.GetObject(chosenObj), clientDB, timeGo) // send propagated data
<-concurrentGoroutines // Free up goroutine
}(entities, objectID, timeIterator) // pass in as variables, otherwise operating on changing pointers
}
}
wg.Wait()
log.Println("Simulation complete.")
Metrics()写入是非阻塞的,这意味着它可以异步写入。但是,我会传播Propagate()数据以确保在传播所有对象之后发送数据;如果不这样做,则试图一次写入太多对象(即使数据库使用5000点的批处理大小)。我在这里想念什么吗?是否有名义上的方法来建立带有指针的工作池?
最佳答案
事实证明,我使用InfluxDB 2客户端的方式不正确-我应该一直在向传播函数而不是整个客户端传递写API。这样做意味着写入速度大大加快。因此,不需要goroutine。
关于database - Golang worker 池导致数据库损坏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62508844/