我正在使用 influxDB 来存储我的时间序列数据。
我编写了一个简单的 golang 应用程序来从名为 time.log
的文件中读取行。
文档位于 https://github.com/influxdata/influxdb/blob/master/client/README.md#inserting-data说:
Inserting Data
Time series data aka points are written to the database using batch inserts. The mechanism is to create one or more points and then create a batch aka batch points and write these to a given database and series. A series is a combination of a measurement (time/values) and a set of tags.
In this sample we will create a batch of a 1,000 points. Each point has a time and a single value as well as 2 tags indicating a shape and color. We write these points to a database called square_holes using a measurement named shapes.
NOTE: You can specify a RetentionPolicy as part of the batch points. If not provided InfluxDB will use the database default retention policy.
func writePoints(clnt client.Client) { sampleSize := 1000 rand.Seed(42) bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ Database: "systemstats", Precision: "us", }) for i := 0; i < sampleSize; i++ { regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"} tags := map[string]string{ "cpu": "cpu-total", "host": fmt.Sprintf("host%d", rand.Intn(1000)), "region": regions[rand.Intn(len(regions))], } idle := rand.Float64() * 100.0 fields := map[string]interface{}{ "idle": idle, "busy": 100.0 - idle, } bp.AddPoint(client.NewPoint( "cpu_usage", tags, fields, time.Now(), )) } err := clnt.Write(bp) if err != nil { log.Fatal(err) } }
但是因为我一直在从日志中读取数据。我从来没有读完日志。那我把积分写到influx服务器的最佳方式是什么?
这是我当前的代码:
cmdBP := client.NewBatchPoints(...)
for line := range logFile.Lines {
pt := parseLine(line.Text)
cmdBP.AddPoint(pt)
}
influxClient.Write(cmdBP)
基本上范围 logFile.Lines 永远不会终止,因为它基于 channel 。
最佳答案
使用批处理点和超时的组合(作为 goroutine 运行):
func (h *InfluxDBHook) loop() {
var coll []*client.Point
tick := time.NewTicker(h._batchInterval)
for {
timeout := false
select {
case pt := <-h._points:
coll = append(coll, pt)
case <-tick.C:
timeout = true
}
if (timeout || len(coll) >= h._batchSize) && len(coll) > 0 {
bp, err := client.NewBatchPoints(h._batchPointsConfig)
if err != nil {
//TODO:
}
bp.AddPoints(coll)
err = h._client.Write(bp)
if err != nil {
//TODO:
} else {
coll = nil
}
}
}
}
顺便说一句,您可以使用带有 logrus 日志包的钩子(Hook),将日志发送到 InfluxDB(示例代码来自 logrus InfluxDB hook)。
关于logging - 如何使用golang客户端写入连续写入influxdb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38067435/