问题 - 我有 150GB 的 csv 文件,其中包含标题并且每行的列数相同。我只需要第一列,减去标题,只需要唯一的项目。 csv 不能在我的本地机器上,因为我没有空间。它在苹果机场。我会尝试使用 USB 数据线连接。
我已经在互联网上搜索了大约 3 天的解决方案。我听说过几个解决方案,但不确定哪个是最好的方法。哪个最好,为什么?
Shell:我听说我可以用 shell 来做到这一点,但我没有这方面的 shell 编写经验
Python 脚本:我确实创建了一个脚本,但在运行 4 小时后放弃了。这可能是因为我是通过 wifi 访问它的。
Elixir:我目前正在学习 elixir,有人告诉我 Flow 是一个很好的选择,可以在我阅读新信息时将我的工作分配给 cpu。比较 Stream 和 Flow。在 100 万个相似数据列表中,使用 Stream 花费了 8 秒,使用 Flow 花费了 2 秒来获取文件中的所有唯一项目。
def stream_parse(file_path, chunk_size) 做 文件路径 |> 文件流! |> Stream.drop(1) |> Stream.map(&String.split(&1, ",") |> List.first) |> Stream.chunk(chunk_size, chunk_size, []) |> Stream.map(&MapSet.new(&1)) |> 枚举.to_list 结束
def flow_parse(file_path, chunk_size) 做 文件路径 |> File.stream!(read_ahead: chunk_size) |> Stream.drop(1) |> Flow.from_enumerable |> Flow.map(&String.split(&1, ",") |> List.first) |> Flow.partition |> Flow.uniq |> 枚举.to_list 结束
虽然我对流解决方案没有特别的问题
它具有高内存使用率,使用 1 个线程,并在一个内核上运行。
流解决方案是多线程的,使用多个内核,但存在将所有内容最终创建到一个 Enum.to_list
中的问题,这可能最终会成为谁知道多长时间
- SQL Server:最后我被告知只创建一个 linux 服务器并使用 SQL 并将数据加载到 sql 中并运行查询以获取数据。
什么是最好的方法,如果是,是否有更好的解决方案。除了写 C.
编辑 1 12/6/2017 。日/月/年
我能够用 elixir 完成流和流示例。我还获得了一个可以完成所需结果的 shell 脚本。至此,shell脚本与流运行速度一致,以流取胜。但是,由于它不是我的机器流本地的,所以不会有任何区别,因为我是 IO 绑定(bind)的。
def stream_parse(file_path, chunk_size, output_file) do
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.drop(1)
|> Stream.map(&String.split(&1, ",") |> List.first)
|> Stream.uniq
|> Stream.map(&"#{&1}\n")
|> Stream.into(File.stream!(output_file, [:write, :utf8]))
|> Stream.run
end
然而,这缺乏为每个 block 写入结果文件的能力,并将整个 150g 的唯一项目存储在内存中(不是一个选项)。
shell 脚本(也在内存中存储所有唯一项)
tail -n +2 my.csv | cut -d , -f 1 | sort -u > OUTPUT.csv
最佳答案
终于搜索了很多论坛,elixir slack channel。我们终于想出了一个解决方案。首先是拆分文件,因为已经有一个 shell 命令用于此,没有必要使 Elixir 代码过于复杂。我将所有内容分解为方法,以更好地解释正在发生的事情。
将文件分成 1000 万行子文件
$ mkdir split-files
$ split -a 8 -l 10000000 big_file.csv ./split-files
$ cd split-files
$ for f in *; do mv "$f" "$f.csv"; done
接下来我们需要从每个文件中获取唯一的项目,并为输出编写一个唯一的文件。我能够实际使用 Flow.uniq,因为 chunk_size 将为 1000 万,可以放入内存。
def flow_parse_dir(path, chunk_size) do
Path.wildcard(path <> "/*.csv")
|> Flow.from_enumerable
|> Flow.map(fn filename ->
[dir, file] = String.split(filename,"/")
flow_parse(filename, chunk_size, dir <> "/unique_"<> file)
end)
|> Flow.run
end
def flow_parse(file_path, chunk_size, output_file) do
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.drop(1)
|> Flow.from_enumerable
|> Flow.map(&String.split(&1, ",") |> List.first)
|> Flow.partition
|> Flow.uniq
|> Flow.map(&"#{&1}\n")
|> Stream.into(File.stream!(output_file, [:write, :utf8]))
|> Stream.run
end
创建所有唯一文件后,我们需要创建一个完整的唯一文件。
def concat_files(path, totol_unique_file_name) do
sum_file = File.open!(path <> "/" <> totol_unique_file_name, [:read, :utf8, :write])
Path.wildcard(path <> "/*.csv")
|> Stream.map(fn filename ->
[_, file] = String.split(filename, "/")
if String.contains?(file, "unique") do
write_concat_of_unique_files(file, path, sum_file)
end
end)
|> Stream.run
File.close(sum_file)
end
def write_concat_of_unique_files(file, path, totol_unique_file_name) do
# read in file contents line by line
path <> "/" <> file
|> File.stream!()
|> Stream.map(&String.trim(&1,"\n"))
|> Stream.map(fn line ->
IO.puts(totol_unique_file_name, line)
end)
|> Stream.run
end
最后一个应该完成工作的方法。
def unique_column(file_path, chunk_size, output) do
total_file = File.open!(output, [:read, :utf8, :write])
file_path
|> File.stream!(read_ahead: chunk_size)
|> Stream.map(&String.trim(&1,"\n"))
|> Stream.chunk(chunk_size, chunk_size, [])
|> Flow.from_enumerable
|> Flow.map(fn chunk ->
chunk
|> MapSet.new
|> MapSet.to_list
|> List.flatten
end)
|> Flow.partition
|> Flow.map(fn line ->
Enum.map(line, fn item ->
IO.puts(total_file, item)
end)
end)
|> Flow.run
File.close(total_file)
end
检查最终文件是否完全唯一。以前文件中的唯一项目的数量被认为不会太大并且完全适合内存。如果内容都是唯一的,那么您将获得列表作为返回。如果你得到一个错误,它不是唯一的。
def check_unique(file_path) do
original_length = file_path
|> File.stream!
|> Enum.to_list
unique_length = file_path
|> File.stream!
|> Stream.uniq
|> Enum.to_list
^unique_length = original_length
end
关于python - 你如何使用 Elixir 解析 150g csv 文件? shell ? SQL数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44488622/