python - 你如何使用 Elixir 解析 150g csv 文件? shell ? SQL数据库?

标签 python sql-server shell csv elixir

问题 - 我有 150GB 的 csv 文件,其中包含标题并且每行的列数相同。我只需要第一列,减去标题,只需要唯一的项目。 csv 不能在我的本地机器上,因为我没有空间。它在苹果机场。我会尝试使用 USB 数据线连接。

我已经在互联网上搜索了大约 3 天的解决方案。我听说过几个解决方案,但不确定哪个是最好的方法。哪个最好,为什么?

  • Shell:我听说我可以用 shell 来做到这一点,但我没有这方面的 shell 编写经验

  • Python 脚本:我确实创建了一个脚本,但在运行 4 小时后放弃了。这可能是因为我是通过 wifi 访问它的。

  • Elixir:我目前正在学习 elixir,有人告诉我 Flow 是一个很好的选择,可以在我阅读新信息时将我的工作分配给 cpu。比较 Stream 和 Flow。在 100 万个相似数据列表中,使用 Stream 花费了 8 秒,使用 Flow 花费了 2 秒来获取文件中的所有唯一项目。

    def stream_parse(file_path, chunk_size) 做 文件路径 |> 文件流! |> Stream.drop(1) |> Stream.map(&String.split(&1, ",") |> List.first) |> Stream.chunk(chunk_size, chunk_size, []) |> Stream.map(&MapSet.new(&1)) |> 枚举.to_list 结束

    def flow_parse(file_path, chunk_size) 做 文件路径 |> File.stream!(read_ahead: chunk_size) |> Stream.drop(1) |> Flow.from_enumerable |> Flow.map(&String.split(&1, ",") |> List.first) |> Flow.partition |> Flow.uniq |> 枚举.to_list 结束

虽然我对流解决方案没有特别的问题 它具有高内存使用率,使用 1 个线程,并在一个内核上运行。 流解决方案是多线程的,使用多个内核,但存在将所有内容最终创建到一个 Enum.to_list 中的问题,这可能最终会成为谁知道多长时间

  • SQL Server:最后我被告知只创建一个 linux 服务器并使用 SQL 并将数据加载到 sql 中并运行查询以获取数据。

什么是最好的方法,如果是,是否有更好的解决方案。除了写 C.

编辑 1 12/6/2017 。日/月/年

我能够用 elixir 完成流和流示例。我还获得了一个可以完成所需结果的 shell 脚本。至此,shell脚本与流运行速度一致,以流取胜。但是,由于它不是我的机器流本地的,所以不会有任何区别,因为我是 IO 绑定(bind)的。

def stream_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Stream.map(&String.split(&1, ",") |> List.first)
    |> Stream.uniq
    |> Stream.map(&"#{&1}\n")
    |> Stream.into(File.stream!(output_file, [:write, :utf8]))
    |> Stream.run
end

然而,这缺乏为每个 block 写入结果文件的能力,并将整个 150g 的唯一项目存储在内存中(不是一个选项)。

shell 脚本(也在内存中存储所有唯一项)

tail -n +2 my.csv | cut -d , -f 1 | sort -u > OUTPUT.csv

最佳答案

终于搜索了很多论坛,elixir slack channel。我们终于想出了一个解决方案。首先是拆分文件,因为已经有一个 shell 命令用于此,没有必要使 Elixir 代码过于复杂。我将所有内容分解为方法,以更好地解释正在发生的事情。

将文件分成 1000 万行子文件

$ mkdir split-files 
$ split -a 8 -l 10000000 big_file.csv ./split-files
$ cd split-files 
$ for f in *; do mv "$f" "$f.csv"; done

接下来我们需要从每个文件中获取唯一的项目,并为输出编写一个唯一的文件。我能够实际使用 Flow.uniq,因为 chunk_size 将为 1000 万,可以放入内存。

def flow_parse_dir(path, chunk_size) do
  Path.wildcard(path <> "/*.csv")
    |> Flow.from_enumerable
    |> Flow.map(fn filename ->
        [dir, file] = String.split(filename,"/")
        flow_parse(filename, chunk_size, dir <> "/unique_"<> file)
      end)
    |> Flow.run
end
def flow_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Flow.from_enumerable
    |> Flow.map(&String.split(&1, ",") |> List.first)
    |> Flow.partition
    |> Flow.uniq
    |> Flow.map(&"#{&1}\n")
    |> Stream.into(File.stream!(output_file, [:write, :utf8]))
    |> Stream.run
end

创建所有唯一文件后,我们需要创建一个完整的唯一文件。

def concat_files(path, totol_unique_file_name) do
  sum_file =  File.open!(path <> "/" <> totol_unique_file_name, [:read, :utf8, :write])

  Path.wildcard(path <> "/*.csv")
    |> Stream.map(fn filename ->
        [_, file] = String.split(filename, "/")
        if String.contains?(file, "unique") do
          write_concat_of_unique_files(file, path, sum_file)
        end
      end)
    |> Stream.run

  File.close(sum_file)
end
def write_concat_of_unique_files(file, path, totol_unique_file_name) do
  # read in file contents line by line
  path <> "/" <> file
    |> File.stream!()
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.map(fn line ->
        IO.puts(totol_unique_file_name, line)
      end)
    |> Stream.run
end

最后一个应该完成工作的方法。

def unique_column(file_path, chunk_size, output) do
  total_file = File.open!(output, [:read, :utf8, :write])

  file_path
    |> File.stream!(read_ahead: chunk_size)
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.chunk(chunk_size, chunk_size, [])
    |> Flow.from_enumerable
    |> Flow.map(fn chunk ->
        chunk
          |> MapSet.new
          |> MapSet.to_list
          |> List.flatten
      end)
    |> Flow.partition
    |> Flow.map(fn line ->
        Enum.map(line, fn item ->
            IO.puts(total_file, item)
          end)
        end)
    |> Flow.run

  File.close(total_file)
end

检查最终文件是否完全唯一。以前文件中的唯一项目的数量被认为不会太大并且完全适合内存。如果内容都是唯一的,那么您将获得列表作为返回。如果你得到一个错误,它不是唯一的。

def check_unique(file_path) do
  original_length = file_path
    |> File.stream!
    |> Enum.to_list

  unique_length = file_path
    |> File.stream!
    |> Stream.uniq
    |> Enum.to_list

  ^unique_length = original_length
end

关于python - 你如何使用 Elixir 解析 150g csv 文件? shell ? SQL数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44488622/

相关文章:

linux - 如何用文件中的新行替换每 6 次出现的空格?

c# - 是否可以在 MSSQL 过程本身中格式化日期,以便它可以显示为非正式日期?

sql - 在字母索引中查找候选人的上一个和下一个条目

python - 在没有 shell=True 的情况下使用 python 运行外部程序?

python - 有什么方法可以远程发出 telnet 命令和双 telnet session 吗?

sql - 存储过程中动态形成的 SQL 是否否定了存储过程的真正目的?

shell - .Sh脚本在特定位置安装java

python - OpenCV - 在 Python 中读取 16 位 TIFF 图像(sentinel-1 数据)

python - Scikit 学习(Python 3.5): Do I need to import a library to make this work?

python - 为什么命令字符串没有完全从 Python 传递到系统?