sql - 为嵌套的 Elasticsearch 结构格式化 Postgres JSON 输出

标签 sql json postgresql elasticsearch

我已经意识到,使用 SQL 数据库 (Postgres) 是将我的关系数据(40+ GB,跨 24 个 CSV 文件)移植到 Elasticsearch 中的最有效方法之一 nested structure .但是,我在 Postgres 的 JSON 输出格式方面仍然存在一些问题:1) 不需要的换行符 (\n),2) 不需要的标题行和 3) 不需要的日期格式。这是一个基本的示例来演示:

file1
id,age,gender,wave
1,49,M,1
2,72,F,0

file2
id,time,event1
1,2095-04-20 12:28:55,V39
1,2095-04-21 2:27:45,T21
2,2094-05-17 18:17:25,V39

file3
id,time,event2
1,2095-04-22 3:48:53,P90
2,2094-05-18 1:28:23,RT4
2,2094-05-18 4:23:53,W3

将这些 CSV 添加到名为 forum 的模式并运行此 SQL 代码后:

with f_1 as(
   SELECT id, json_agg(file1.*) AS tag
   FROM forum.file1
   GROUP BY id
), f_2 as (
   SELECT id, json_agg(file2.*) AS tag
   FROM forum.file2
   GROUP BY id
), f_3 as (
   SELECT id, json_agg(file3.*) AS tag
   FROM forum.file3
   GROUP BY id
)
SELECT ('{"id":' || a.id), ('"file1":' || a.tag), ('"file2":' || b.tag), ('"file3":' || c.tag ||'}') 
FROM f_1 AS a, f_2 AS b, f_3 AS c
WHERE b.id = a.id AND c.id = a.id;

我得到这个输出(pgAdminIII - 导出到文件 - 没有引用):

?column?,?column?,?column?,?column?
{"id":1,"file1":[{"id":1,"age":49,"gender":"M","wave":1}],"file2":[{"id":1,"time":"2095-04-20T12:28:55","event1":"V39"}, 
 {"id":1,"time":"2095-04-21T02:27:45","event1":"T21"}],"file3":[{"id":1,"time":"2095-04-22T03:48:53","event2":"P90"}]}
{"id":2,"file1":[{"id":2,"age":72,"gender":"F","wave":0}],"file2":[{"id":2,"time":"2094-05-17T18:17:25","event1":"V39"}],"file3":[{"id":2,"time":"2094-05-18T01:28:23","event2":"RT4"}, 
 {"id":2,"time":"2094-05-18T04:23:53","event2":"W3"}]}

您可以看到对于给定的 id,有多行数据。对于给定的 ID(即没有\n),我需要将所有数据都放在一行上。我没有花太多时间但想更改的其他几个小问题是不需要第一行,我想摆脱 ?column?,?column?,?column ?,?column? 完成处理后无需打开文件。理想情况下,我还希望日期输出中没有 T。我应该能够在 Elasticsearch 中容纳 T 但到目前为止还没有让它接受它。这是我希望从 Postgres 得到的输出,它用于 Elasticsearch 的输入(使用 stream2es 和嵌套映射结构):

{"id":1,"file1":[{"id":1,"age":49,"gender":"M","wave":1}],"file2":[{"id":1,"time":"2095-04-20 12:28:55","event1":"V39"},{"id":1,"time":"2095-04-21 02:27:45","event1":"T21"}],"file3":[{"id":1,"time":"2095-04-22 03:48:53","event2":"P90"}]}
{"id":2,"file1":[{"id":2,"age":72,"gender":"F","wave":0}],"file2":[{"id":2,"time":"2094-05-17 18:17:25","event1":"V39"}],"file3":[{"id":2,"time":"2094-05-18 01:28:23","event2":"RT4"},{"id":2,"time":"2094-05-18 04:23:53","event2":"W3"}]}

添加 to_json 确实修复了不需要的换行,但它添加了 \" 代替 stream2es 解析器不喜欢的 ":

SELECT to_json('{"id":' || a.id), to_json('"file1":' || a.tag::json), to_json('"file2":' | | b.tag::json), to_json('"file3":' || c.tag::json ||'}')

"{\"id\":1","\"file1\":[{\"id\":1,\"age\":49,\"gender\":\"M\",\"wave\":1}]"...

es2stream 异常: 线程“stream dispatcher”中的异常 java.lang.ClassCastException: java.lang.String cannot be cast to clojure.lang.IPersistentMap

最佳答案

选择一列中的所有内容(而不是四列)。函数format()会帮助你把它写得更清楚。使用

regexp_replace (str, '(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g')

更正日期格式和

replace (str, e' \n ', '')

跳过换行符。

使用COPY command简化问题:

COPY (
    with f_1 as(
       SELECT id, json_agg(file1.*) AS tag
       FROM forum.file1
       GROUP BY id
    ), f_2 as (
       SELECT id, json_agg(file2.*) AS tag
       FROM forum.file2
       GROUP BY id
    ), f_3 as (
       SELECT id, json_agg(file3.*) AS tag
       FROM forum.file3
       GROUP BY id
    )
    SELECT
        replace(
            regexp_replace(
                format('{"id":%s,"file1":%s,"file2":%s,"file3":%s}', 
                    a.id, a.tag, b.tag, c.tag),
                '(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g'),
            e' \n ', '')
    FROM f_1 AS a, f_2 AS b, f_3 AS c
    WHERE b.id = a.id AND c.id = a.id
) TO '/full/path/to/your/file';

要在每行数据前添加一个命令行,您可以使用一个函数返回两行的技巧。 有的时候可以把格式化的一部分移到函数中。

create or replace function format_data_line(command text, data_str text)
returns setof text language plpgsql as $$
begin
    return next command;
    return next             
        replace(
            regexp_replace(data_str,
                '(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g'),
            e' \n ', '');
end $$;

COPY (
    with f_1 as(
       SELECT id, json_agg(file1.*) AS tag
       FROM forum.file1
       GROUP BY id
    ), f_2 as (
       SELECT id, json_agg(file2.*) AS tag
       FROM forum.file2
       GROUP BY id
    ), f_3 as (
       SELECT id, json_agg(file3.*) AS tag
       FROM forum.file3
       GROUP BY id
    )
    SELECT 
        format_data_line(
            'my command', 
            format('{"id":%s,"file1":%s,"file2":%s,"file3":%s}', 
                a.id, a.tag, b.tag, c.tag))
    FROM f_1 AS a, f_2 AS b, f_3 AS c
    WHERE b.id = a.id AND c.id = a.id
) TO '/full/path/to/your/file';

关于sql - 为嵌套的 Elasticsearch 结构格式化 Postgres JSON 输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33662850/

相关文章:

java - sql语句在jsp中不起作用

sql - ActiveRecord语法,用于在使用联接时查找平均评分为x或更高的所有项目

c# - 将对象转换为 JSON,然后将该 JSON 下载为文本文件 [Asp.net core]

android - 在 Android 中发布 JSON 并检索响应

ruby-on-rails - 如何将排序移动到数据库级别

python - 将 NULL/Empty 值插入实际字段 Postgresql

mysql - 使用大小写按计算列进行过滤和排序(未找到列)

PHP 代码不更新数据库,没有 sql 错误,我的 IDE 没有返回任何错误,但是数据库没有更新

python - 基于 pyspark 中的键有效推断数据帧模式

sql - PostgreSQL - 完成一个系列