csv - 同时写入多个 csv 文件,在 Golang 中的分区列上拆分

标签 csv go concurrency channel

我的目标是读取一个或多个共享通用格式的 csv 文件,并根据 csv 数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定分区。一个文件的示例:

fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02

如果这种方法听起来像可怕的 XY 问题,我很乐意进行调整。

到目前为止我已经尝试过:

  • 读入数据集并迭代每一行
  • 如果分区有 已经看到,衍生出一个新的工作例程(这将包含一个文件/csv 作家)。将该行发送到 chan []string 中。
  • 由于每个工作线程都是文件编写器,因此它应该仅通过其输入 channel 接收一个分区的行。

这显然还不起作用,因为我不知道如何根据给定行上看到的分区值将行发送到正确的工作人员。

我已经为每个工作人员提供了每个分区值的id string,但我不知道如何选择要发送到的工作人员(如果我应该创建一个单独的chan []) string 为每个工作人员,并使用 select 发送到该 channel ,或者一个结构是否应该使用某种池和路由功能来容纳每个工作人员。

TLDR;我不知道如何根据某些分类 string 值有条件地将数据发送到给定的 go 例程或 channel ,其中唯一的数量可以是任意的,但可能不超过 24 个唯一分区值.

我要警告的是,我注意到这样的问题确实会被否决,因此,如果您认为这是反建设性的或不完整的,足以否决,请评论原因,以便我可以避免重复进攻。

感谢您提前提供的任何帮助!

Playground

片段:

  package main

    import (
        "encoding/csv"
        "fmt"
        "log"
        "strings"
        "time"
    )

    func main() {

        // CSV
        r := csv.NewReader(csvFile1)
        lines, err := r.ReadAll()
        if err != nil {
            log.Fatalf("error reading all lines: %v", err)
        }

        // CHANNELS
        lineChan := make(chan []string)

        // TRACKER
        var seenPartitions []string

        for _, line := range lines {

            hour := line[6]
            if !stringInSlice(hour, seenPartitions) {
                seenPartitions = append(seenPartitions, hour)
                go worker(hour, lineChan)
            }
            // How to send to the correct worker/channel? 
            lineChan <- line

        }
        close(lineChan)
    }

    func worker(id string, lineChan <-chan []string) {
        for j := range lineChan {
            fmt.Println("worker", id, "started  job", j)
            // Write to a new file here and wait for input over the channel
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
        }
    }

    func stringInSlice(str string, list []string) bool {
        for _, v := range list {
            if v == str {
                return true
            }
        }
        return false
    }

    // DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)

最佳答案

同步版本首先不进行并发魔法(请参阅下面的并发版本)。

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
)

func main() {

    // CSV
    r := csv.NewReader(csvFile1)
    partitions := make(map[string][][]string)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                err = nil

                save_partitions(partitions)

                return
            }
            log.Fatal(err)
        }

        process(rec, partitions)
    }

}

// prints only
func save_partitions(partitions map[string][][]string) {
    for part, recs := range partitions {
        fmt.Println(part)
        for _, rec := range recs {
            fmt.Println(rec)
        }
    }
}

// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
    l := len(rec)
    part := rec[l-1]
    if p, ok := partitions[part]; ok {
        partitions[part] = append(p, rec)
    } else {
        partitions[part] = [][]string{rec}
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/--iqZGzxCF

并发版本:

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
)

var (
    // list of channels to communicate with workers
    // workers accessed synchronousely no mutex required
    workers = make(map[string]chan []string)

    // wg is to make sure all workers done before exiting main
    wg = sync.WaitGroup{}

    // mu used only for sequential printing, not relevant for program logic
    mu = sync.Mutex{}
)

func main() {

    // wait for all workers to finish up before exit
    defer wg.Wait()

    r := csv.NewReader(csvFile1)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                savePartitions()
                return
            }
            log.Fatal(err) // sorry for the panic
        }
        process(rec)
    }

}

func process(rec []string) {
    l := len(rec)
    part := rec[l-1]

    if c, ok := workers[part]; ok {
        // send rec to worker
        c <- rec
    } else {
        // if no worker for the partition

        // make a chan
        nc := make(chan []string)
        workers[part] = nc

        // start worker with this chan
        go worker(nc)

        // send rec to worker via chan
        nc <- rec
    }
}

func worker(c chan []string) {

    // wg.Done signals to main worker completion
    wg.Add(1)
    defer wg.Done()

    part := [][]string{}
    for {
        // wait for a rec or close(chan)
        rec, ok := <-c
        if ok {
            // save the rec
            // instead of accumulation in memory
            // this can be saved to file directly
            part = append(part, rec)
        } else {
            // channel closed on EOF

            // dump partition
            // locks ensures sequential printing
            // not a required for independent files
            mu.Lock()
            for _, p := range part {
                fmt.Printf("%+v\n", p)
            }
            mu.Unlock()

            return
        }
    }
}

// simply signals to workers to stop
func savePartitions() {
    for _, c := range workers {
        // signal to all workers to exit
        close(c)
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/oBTPosy0yT

玩得开心!

关于csv - 同时写入多个 csv 文件,在 Golang 中的分区列上拆分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47295259/

相关文章:

jquery - 如何在数据表的csv导出选项中添加公司的自定义页眉或标题和页脚信息?

ruby - 尝试使用 Ruby 1.8/FasterCSV 解析带有变音符号等的 CSV 文件时出现问题

python - 读取 csv 文件时的混合类型。原因、修复和后果

go - 为什么 Go 有一个 "bit clear (AND NOT)"运算符?

linux - 仅替换特定列中的字符 (CSV)

json - 在 Golang 中解析 JSON 不会填充对象

go - 使用WaitGroup从不同的go例程阻塞当前变量写入堆栈变量是否安全?

c# - 使用仅包含静态方法而不包含变量的 C# 类时会出现并发问题吗?

java - 为什么有人会在方法中制作本地最终锁定?

concurrency - cudaDeviceScheduleBlockingSync 和 cudaDeviceScheduleYield 有什么区别?