database - 使用 Golang 及时提交到数据库

标签 database go

我成功地“批处理”了 500-1000 行中的许多语句,一次插入。然而,这是使用简单的 for 循环并手动将其设置为 500-1000 循环。像这样的东西:

for i:=0;i<500;i++ {
   // Create a string of 500 values to be inserted at once
}
// Insert the 500 values

有没有一种方法可以及时 commit(),例如:“每秒提交一次”?

从概念上讲,我想要类似的东西;

// Create connection to DB
// Begin a transaction
// Prepare a statement

go timelyCommits(tx)  // spawn a commit ticker
for {
   // Constantly create string of values to be inserted like:
   // Values (1, "one"),(2,"two"),(3,"three")...(1000,"thousand")...
   // Insert without commit
}

func timelyCommits(tx){
   for {
      time.Sleep(1 * time.Second)
      tx.Commit()
   }
}

最佳答案

优化不是一项微不足道的任务,还可能涉及数据库调优等。如果不了解您要实现的系统的详细信息,就很难给出适当的建议。除了答案中已经建议的内容之外,您可能还需要实现一种缓冲,例如具有固定容量的 channel 。然后当 bufferFULL 或计时器 EXPIRED 时,构建查询然后在事务中执行 BULK INSERT。在 The Go Playground 试试.

package main

import (
    "fmt"
    "time"
)

type DataBuffer struct {
    capacity int
    duration time.Duration

    incomingData chan interface{}
    full chan bool
    mustExit chan bool
    done chan bool

    query string
    args []interface{}
}

func NewDataBuffer(capacity int, dur time.Duration) *DataBuffer {
    buf := &DataBuffer {
        incomingData: make(chan interface{}, capacity),
        full: make(chan bool),
        args: make([]interface{}, capacity),
        duration: dur,
        mustExit: make(chan bool, 1),
        done: make(chan bool, 1),
    }
    return buf
}

func (b *DataBuffer) Append(d interface{}) {
    if !b.put(d) {
        //Notify that buffer is full
        //<- will wait until space available
        b.full <- true
        b.incomingData <- d
    }
}

func (b *DataBuffer) put(d interface{}) bool {
    //Try to append the data
    //If channel is full, do nothing, then return false
    select {
    case b.incomingData <- d:
        return true
    default:
        //channel is full
        return false
    }
}

func (b *DataBuffer) execTransaction() error {
    /*
        Begin transaction
        Insert Data Group 
        Commit/rollback
    */

    fmt.Print(time.Now())
    fmt.Println(b.query)
    fmt.Println(b.args)

    return nil
}

func (b *DataBuffer) clear() {
    //clear args
    nOldArg := len(b.args)
    for k := 0; k < nOldArg; k++ {
        b.args[k] = nil
    }
    b.args = b.args[:0]
    b.query = ""
}

func (b *DataBuffer) buildQuery() bool {
    ndata := len(b.incomingData)
    if ndata == 0 {
        return false
    }

    k := 0
    b.clear()

    //Build the query, adjust as needed
    b.query = "QUERY:"
    for data := range b.incomingData {
        b.query += fmt.Sprintf(" q%d", k) //build the query
        b.args = append(b.args, data)

        k++
        if k >= ndata {
            break
        }

    }
    return true
}

func (b *DataBuffer) doInsert() {
    if b.buildQuery() {
        b.execTransaction()
    }
}

func (b *DataBuffer) runAsync() {
    defer func() {
        b.doInsert()
        fmt.Println("Last insert")
        close(b.done)
    }()

    timer := time.NewTimer(b.duration)
    for {
        select {
        case <- timer.C:
            b.doInsert()
            fmt.Println("Timer Expired")
            timer.Reset(b.duration)
        case <- b.full:
            if !timer.Stop() {
                <-timer.C
            }
            b.doInsert()
            fmt.Println("Full")
            timer.Reset(b.duration)
        case <- b.mustExit:
            if !timer.Stop() {
                <-timer.C
            }
            return  
        }
    }
}

func (b *DataBuffer) Run() {
    go b.runAsync()
}
func (b *DataBuffer) Stop() {
    b.mustExit <- true
}

func (b *DataBuffer) WaitDone() {
    <- b.done
}

func main() {
    buf := NewDataBuffer(5, 1*time.Second)
    buf.Run()

    //simulate incoming data
    for k := 0; k < 30; k++ {
        buf.Append(k)
        time.Sleep(time.Duration(10*k)*time.Millisecond)
    }
    buf.Stop()
    buf.WaitDone()  
}

注意:

  • 您需要实现正确的错误处理。
  • incomingData 的类型可以根据您的需要进行调整

关于database - 使用 Golang 及时提交到数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48402047/

相关文章:

database - 创建 Informix 函数时出错

java - 无法在mysql中的java程序中设置法语字符

php - 用 php 分隔逗号分隔的 mySql 数据库字段值

methods - 是否可以在运行时绑定(bind)方法?

go - 在 Go 中的文件顶部创建 channel

php - 我应该使用 txt 文件还是存储在模块 "who is online"的数据库中?

使用在线数据库的 iPhone 应用程序

map - 如何使用存储在 Go 变量中的字符串键访问映射条目?

go - 我是否应该始终将结构属性定义为可测试性的接口(interface)?

mysql - GORM : Always return empty results,即使记录存在