我的目标是读取一个或多个共享通用格式的 csv 文件,并根据 csv 数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定分区。一个文件的示例:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
如果这种方法听起来像可怕的 XY 问题,我很乐意进行调整。
到目前为止我已经尝试过:
- 读入数据集并迭代每一行
- 如果分区有
已经看到,衍生出一个新的工作例程(这将包含一个文件/csv
作家)。将该行发送到
chan []string
中。 - 由于每个工作线程都是文件编写器,因此它应该仅通过其输入 channel 接收一个分区的行。
这显然还不起作用,因为我不知道如何根据给定行上看到的分区值将行发送到正确的工作人员。
我已经为每个工作人员提供了每个分区值的id string
,但我不知道如何选择要发送到的工作人员(如果我应该创建一个单独的chan []) string
为每个工作人员,并使用 select
发送到该 channel ,或者一个结构是否应该使用某种池和路由功能来容纳每个工作人员。
TLDR;我不知道如何根据某些分类 string
值有条件地将数据发送到给定的 go 例程或 channel ,其中唯一的数量可以是任意的,但可能不超过 24 个唯一分区值.
我要警告的是,我注意到这样的问题确实会被否决,因此,如果您认为这是反建设性的或不完整的,足以否决,请评论原因,以便我可以避免重复进攻。
感谢您提前提供的任何帮助!
片段:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
最佳答案
同步版本首先不进行并发魔法(请参阅下面的并发版本)。
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
partitions := make(map[string][][]string)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
err = nil
save_partitions(partitions)
return
}
log.Fatal(err)
}
process(rec, partitions)
}
}
// prints only
func save_partitions(partitions map[string][][]string) {
for part, recs := range partitions {
fmt.Println(part)
for _, rec := range recs {
fmt.Println(rec)
}
}
}
// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
l := len(rec)
part := rec[l-1]
if p, ok := partitions[part]; ok {
partitions[part] = append(p, rec)
} else {
partitions[part] = [][]string{rec}
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/--iqZGzxCF
并发版本:
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
"sync"
)
var (
// list of channels to communicate with workers
// workers accessed synchronousely no mutex required
workers = make(map[string]chan []string)
// wg is to make sure all workers done before exiting main
wg = sync.WaitGroup{}
// mu used only for sequential printing, not relevant for program logic
mu = sync.Mutex{}
)
func main() {
// wait for all workers to finish up before exit
defer wg.Wait()
r := csv.NewReader(csvFile1)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
savePartitions()
return
}
log.Fatal(err) // sorry for the panic
}
process(rec)
}
}
func process(rec []string) {
l := len(rec)
part := rec[l-1]
if c, ok := workers[part]; ok {
// send rec to worker
c <- rec
} else {
// if no worker for the partition
// make a chan
nc := make(chan []string)
workers[part] = nc
// start worker with this chan
go worker(nc)
// send rec to worker via chan
nc <- rec
}
}
func worker(c chan []string) {
// wg.Done signals to main worker completion
wg.Add(1)
defer wg.Done()
part := [][]string{}
for {
// wait for a rec or close(chan)
rec, ok := <-c
if ok {
// save the rec
// instead of accumulation in memory
// this can be saved to file directly
part = append(part, rec)
} else {
// channel closed on EOF
// dump partition
// locks ensures sequential printing
// not a required for independent files
mu.Lock()
for _, p := range part {
fmt.Printf("%+v\n", p)
}
mu.Unlock()
return
}
}
}
// simply signals to workers to stop
func savePartitions() {
for _, c := range workers {
// signal to all workers to exit
close(c)
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/oBTPosy0yT
玩得开心!
关于csv - 同时写入多个 csv 文件,在 Golang 中的分区列上拆分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47295259/