对于一个项目,我想为 CSV 的大约 5000 万行中的每一行手动创建结构。为此,我逐行迭代文件并将每个结构附加到一个 slice 。这是简化的方法:
func readCSV(filePath string) DataFrame {
file, _ := os.Open(filePath)
defer file.Close()
var rows []Row
scanner := bufio.NewScanner(file)
scanner.Scan()
for scanner.Scan() {
parts := strings.Split(scanner.Text(), ",")
if len(parts) < 7 {
continue
}
column1, _ := strconv.Atoi(parts[0])
column2, _ := strconv.ParseFloat(parts[1], 32)
column3, _ := strconv.ParseFloat(parts[2], 32)
column4 := parts[3]
column5, _ := strconv.ParseFloat(parts[4], 32)
column6 := parts[5]
column7 := parts[6]
row := Row{
Column1: column1,
Column2: column2,
Column3: column3,
Column4: column4,
Column5: column5,
Column6: column6,
Column7: column7,
}
rows = append(rows, row)
}
return DataFrame{
Rows: rows,
}
}
生成的 DataFrame 大约有 3 GB 内存。问题是在方法执行期间 RAM 消耗急剧增加,并且 Go 进程使用 15GB 以上的内存,使得该函数无法用于我的目的。一旦 slice 返回,进程的 RAM 消耗就会下降到预期的 3GB。
堆配置文件如下所示:
3.26GB 5.81GB (flat, cum) 100% of Total
. . 62: scanner := bufio.NewScanner(file)
. . 63: scanner.Scan()
. . 64: for scanner.Scan() {
. 2.55GB 65: parts := strings.Split(scanner.Text(), ",")
. . 66: if len(parts) < 7 {
. . 67: continue
. . 68: }
. . 69: column1, _ := strconv.Atoi(parts[0])
. . 70: column2, _ := strconv.ParseFloat(parts[1], 32)
. . 71: column3, _ := strconv.ParseFloat(parts[2], 32)
. . 72: column4 := parts[3]
. . 73: column5, _ := strconv.ParseFloat(parts[4], 32)
. . 74: column6 := parts[5]
. . 75: column7 := parts[6]
. . 76: row := Row{
. . 77: Column1: column1,
. . 78: Column2: column2,
. . 79: Column3: column3,
. . 80: Column4: column4,
. . 81: Column5: column5,
. . 82: Column6: column6,
. . 83: Column7: column7,
. . 84: }
3.26GB 3.26GB 85: rows = append(rows, row)
. . 86: }
. . 87:
. . 88: return DataFrame{
. . 89: Rows: rows,
我不知道高内存消耗从何而来。我尝试手动调用垃圾收集器但没有成功。谁能给我提示吗?
最佳答案
rows
是 Row 结构体的数组,而不是指针。每行 float 和整数占用 32 个字节,加上字符串的长度。 5000 万行可能会变得相当大。更糟糕的是,append
将使 rows
增长约 1.5 倍,因此最终会分配大量额外内存,同时还会丢弃许多需要的较小版本。被垃圾收集。然后 append(rows, row)
是一个副本,意味着更多的分配和释放。而且它必须等待被垃圾收集,从而导致内存使用量膨胀。
可以通过存储引用来避免这种情况。这应该意味着更少的分配并使行
显着更小。
var rows []*Row
...
rows = append(rows, &row)
然而,真正的问题是一次性吞掉所有东西。这就是走吧!我们可以使用channels和 goroutines在我们的处理过程中一次同时读取一行。
CSV 看似棘手。 Go 已经有一个 CSV 库,encoding/csv ,所以我们将使用它。
# A handy function to make ignoring errors a bit less laborious.
func IgnoreError(value interface{}, err error) interface{} {
return value
}
# Its more flexible to take an io.Reader.
# It returns a channel of individual rows.
func readCSV(input io.Reader) chan Row {
rows := make(chan Row)
go func() {
defer close(rows)
# Use encoding/csv.
# Let it reuse its backing array for each row.
# Ignore rows with the wrong number of columns.
reader := csv.NewReader(input)
reader.FieldsPerRecord = 7
reader.ReuseRecord = true
for {
parts, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
continue
}
# Send each row down the channel.
rows <- Row{
Column1: IgnoreError(strconv.Atoi(parts[0])).(int),
Column2: IgnoreError(strconv.ParseFloat(parts[1], 32)).(float64),
Column3: IgnoreError(strconv.ParseFloat(parts[2], 32)).(float64),
Column4: parts[3],
Column5: IgnoreError(strconv.ParseFloat(parts[4], 32)).(float64),
Column6: parts[5],
Column7: parts[6],
}
}
}();
return rows;
}
func main() {
file, err := os.Open("test.csv")
if err != nil {
log.Fatal(err)
}
rows := readCSV(file)
for row := range rows {
fmt.Println(row)
}
}
现在一次仅加载一行。内存使用量应该是恒定的。
关于go - 方法执行期间内存消耗高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64882033/