logging - 如何使用golang客户端写入连续写入influxdb

标签 logging go influxdb

我正在使用 influxDB 来存储我的时间序列数据。

我编写了一个简单的 golang 应用程序来从名为 time.log 的文件中读取行。

文档位于 https://github.com/influxdata/influxdb/blob/master/client/README.md#inserting-data说:

Inserting Data

Time series data aka points are written to the database using batch inserts. The mechanism is to create one or more points and then create a batch aka batch points and write these to a given database and series. A series is a combination of a measurement (time/values) and a set of tags.

In this sample we will create a batch of a 1,000 points. Each point has a time and a single value as well as 2 tags indicating a shape and color. We write these points to a database called square_holes using a measurement named shapes.

NOTE: You can specify a RetentionPolicy as part of the batch points. If not provided InfluxDB will use the database default retention policy.

func writePoints(clnt client.Client) {
    sampleSize := 1000
    rand.Seed(42)

    bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
        Database:  "systemstats",
        Precision: "us",
    })

    for i := 0; i < sampleSize; i++ {
        regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
        tags := map[string]string{
            "cpu":    "cpu-total",
            "host":   fmt.Sprintf("host%d", rand.Intn(1000)),
            "region": regions[rand.Intn(len(regions))],
        }

        idle := rand.Float64() * 100.0
        fields := map[string]interface{}{
            "idle": idle,
            "busy": 100.0 - idle,
        }

        bp.AddPoint(client.NewPoint(
            "cpu_usage",
            tags,
            fields,
            time.Now(),
        ))
    }

    err := clnt.Write(bp)
    if err != nil {
        log.Fatal(err)
    }
}

但是因为我一直在从日志中读取数据。我从来没有读完日志。那我把积分写到influx服务器的最佳方式是什么?

这是我当前的代码:

cmdBP := client.NewBatchPoints(...)
for line := range logFile.Lines {
    pt := parseLine(line.Text)
    cmdBP.AddPoint(pt)
}

influxClient.Write(cmdBP)

基本上范围 logFile.Lines 永远不会终止,因为它基于 channel 。

最佳答案

使用批处理点和超时的组合(作为 goroutine 运行):

func (h *InfluxDBHook) loop() {
    var coll []*client.Point
    tick := time.NewTicker(h._batchInterval)

    for {
        timeout := false

        select {
        case pt := <-h._points:
            coll = append(coll, pt)
        case <-tick.C:
            timeout = true
        }

        if (timeout || len(coll) >= h._batchSize) && len(coll) > 0 {
            bp, err := client.NewBatchPoints(h._batchPointsConfig)
            if err != nil {
                //TODO:
            }
            bp.AddPoints(coll)
            err = h._client.Write(bp)
            if err != nil {
                //TODO:
            } else {
                coll = nil
            }
        }
    }
}

顺便说一句,您可以使用带有 logrus 日志包的钩子(Hook),将日志发送到 InfluxDB(示例代码来自 logrus InfluxDB hook)。

关于logging - 如何使用golang客户端写入连续写入influxdb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38067435/

相关文章:

c# - ASP.Net Core Serilog 如何在运行时读取日志文件

mysql - 在 INFLUX DB 中为重复条目更新现有记录并为新条目插入记录(类似于 MySQL 中的 ON DUPLICATE KEY UPDATE)

jmeter - TextGraphiteMetricsSender : Error writing to Graphite: connect timed out While sending metrics from jmeter

elasticsearch - 哪个时间序列数据库支持这些特定要求?

google-app-engine - 如何在 Google App Engine Standard Env for Go 中获取 request.RemoteAddr 和 X-AppEngine-Country、Region 等的输出?

go - Heroku 托管应用程序上的 Webhook 未更新

c++ - 启动/运行外部可执行文件并查看其输出日志

apache - 在 apache 日志中显示子域

hadoop - YARN 上的 Spark 中的日志在哪里?

unit-testing - 比较单元测试中的当前时间