hive - 使用 Apache Hive session 化我的日志数据的更好方法?

标签 hive

是否有更好的方法使用 Apache Hive 来 session 化我的日志数据?我不确定我是否以最佳方式在下面这样做:

日志数据存储在序列文件中;单个日志条目是一个 JSON 字符串;例如:

{"source": {"api_key": "app_key_1", "user_id": "user0"}, "events": [{"timestamp": 1330988326, "event_type": "high_score", "event_params": {"score": "1123", "level": "9"}}, {"timestamp": 1330987183, "event_type": "some_event_0", "event_params": {"some_param_00": "val", "some_param_01": 100}}, {"timestamp": 1330987775, "event_type": "some_event_1", "event_params": {"some_param_11": 100, "some_param_10": "val"}}]}

格式化后,如下所示:

{'source': {'api_key': 'app_key_1', 'user_id': 'user0'},
 'events': [{'event_params': {'level': '9', 'score': '1123'},
             'event_type': 'high_score',
             'timestamp': 1330988326},
            {'event_params': {'some_param_00': 'val', 'some_param_01': 100},
             'event_type': 'some_event_0',
             'timestamp': 1330987183},
            {'event_params': {'some_param_10': 'val', 'some_param_11': 100},
             'event_type': 'some_event_1',
             'timestamp': 1330987775}]
}

“source”包含一些关于“events”中包含的事件源的信息(user_id 和 api_key); “事件”包含源生成的事件列表;每个事件都有“event_params”、“event_type”和“timestamp”(timestamp 是 GMT 中的 Unix 时间戳)。请注意,单个日志条目中的时间戳以及跨日志条目的时间戳可能是乱序的。

请注意,我受到限制,无法更改日志格式,无法最初将数据记录到分区的单独文件中(尽管我可以在记录数据后使用 Hive 执行此操作),等等。

最后,我想要一个 session 表,其中 session 与应用程序 ( api_k ) 和用户相关联,并且具有开始时间和 session 长度(或结束时间); session 被拆分,对于给定的应用程序和用户,事件之间有 30 分钟或更长时间的间隔。

我的解决方案执行以下操作(Hive 脚本和 python 转换脚本在下面;显示 SerDe 源似乎没有用,但如果有用请告诉我):

[1] 以非规范化格式将数据加载到 log_entry_tmp

[2] 将数据分解为 log_entry,这样,例如,上面的单个条目现在将有多个条目:

{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"high_score","event_params":{"score":"1123","level":"9"},"event_timestamp":1330988326}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_0","event_params":{"some_param_00":"val","some_param_01":"100"},"event_timestamp":1330987183}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_1","event_params":{"some_param_11":"100","some_param_10":"val"},"event_timestamp":1330987775}

[3] 将数据转换并写入session_info_0,其中每个条目包含事件的app_id、user_id和时间戳

[4] 将数据转换并写入session_info_1,其中条目按app_id、user_id、event_timestamp排序;每个条目都包含一个 session_id ; python 转换脚本找到拆分,并将数据分组到 session 中

[5] 将最终 session 数据转换并写入 session_info_2 ; session 的应用程序 + 用户、开始时间和以秒为单位的长度


[ hive 脚本]

drop table if exists app_info;
create external table app_info ( app_id int, app_name string, api_k string )
location '${WORK}/hive_tables/app_info';

add jar ../build/our-serdes.jar;

-- [1] load the data into log_entry_tmp, in a denormalized format

drop table if exists log_entry_tmp;
create external table log_entry_tmp
row format serde 'com.company.TestLogSerde'
location '${WORK}/hive_tables/test_logs';

drop table if exists log_entry;
create table log_entry (
    entry struct<source_api_key:string,
                 source_user_id:string,
                 event_type:string,
                 event_params:map<string,string>,
                 event_timestamp:bigint>);

-- [2] explode the data into log_entry

insert overwrite table log_entry
select explode (trans0_list) t
from log_entry_tmp;

drop table if exists session_info_0;
create table session_info_0 (
    app_id string,
    user_id string,
    event_timestamp bigint
);

-- [3] transform and write data into session_info_0, where each entry contains events' app_id, user_id, and timestamp

insert overwrite table session_info_0
select ai.app_id, le.entry.source_user_id, le.entry.event_timestamp
from log_entry le
join app_info ai on (le.entry.source_api_key = ai.api_k);

add file ./TestLogTrans.py;

drop table if exists session_info_1;
create table session_info_1 (
    session_id string,
    app_id string,
    user_id string,
    event_timestamp bigint,
    session_start_datetime string,
    session_start_timestamp bigint,
    gap_secs int
);

-- [4] tranform and write data into session_info_1, where entries are ordered by app_id, user_id, event_timestamp ; and each entry contains a session_id ; the python tranform script finds the splits, and groups the data into sessions

insert overwrite table session_info_1
select
    transform (t.app_id, t.user_id, t.event_timestamp)
        using './TestLogTrans.py'
        as (session_id, app_id, user_id, event_timestamp, session_start_datetime, session_start_timestamp, gap_secs)
from
    (select app_id as app_id, user_id as user_id, event_timestamp as event_timestamp from session_info_0 order by app_id, user_id, event_timestamp ) t;

drop table if exists session_info_2;
create table session_info_2 (
    session_id string,
    app_id string,
    user_id string,
    session_start_datetime string,
    session_start_timestamp bigint,
    len_secs int
);

-- [5] transform and write final session data to session_info_2 ; the sessions' app + user, start time, and length in seconds

insert overwrite table session_info_2
select session_id, app_id, user_id, session_start_datetime, session_start_timestamp, sum(gap_secs)
from session_info_1
group by session_id, app_id, user_id, session_start_datetime, session_start_timestamp;

[测试日志传输.py]

#!/usr/bin/python

import sys, time

def buildDateTime(ts):

    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(ts))

curGroup = None
prevGroup = None
curSessionStartTimestamp = None
curSessionStartDatetime = None
prevTimestamp = None

for line in sys.stdin.readlines():

    fields = line.split('\t')
    if len(fields) != 3:
        raise Exception('fields = %s', fields)

    app_id = fields[0]
    user_id = fields[1]
    event_timestamp = int(fields[2].strip())

    curGroup = '%s-%s' % (app_id, user_id)
    curTimestamp = event_timestamp

    if prevGroup == None:
        prevGroup = curGroup
        curSessionStartTimestamp = curTimestamp
        curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)
        prevTimestamp = curTimestamp

    isNewGroup = (curGroup != prevGroup)

    gapSecs = 0 if isNewGroup else (curTimestamp - prevTimestamp)

    isSessionSplit = (gapSecs >= 1800)

    if isNewGroup or isSessionSplit:
        curSessionStartTimestamp = curTimestamp
        curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)

    session_id = '%s-%s-%d' % (app_id, user_id, curSessionStartTimestamp)

    print '%s\t%s\t%s\t%d\t%s\t%d\t%d' % (session_id, app_id, user_id, curTimestamp, curSessionStartDatetime, curSessionStartTimestamp, gapSecs)

    prevGroup = curGroup
    prevTimestamp = curTimestamp

最佳答案

我认为您可以轻松地放弃第 3 步,并将您在那里使用的查询作为子查询放入第 4 步中的 from 子句。物理化该转换似乎没有给您任何东西。

否则,我认为对于您要在这里实现的目标,这似乎是一种合理的方法。

您可以使用自定义映射器实现第 2 步,将输出作为客户化简器传递到第 4 步(将第 3 步作为子查询内置)。这将使您的 mapreduce 作业减少 1 个,因此可以为您节省大量时间。

关于hive - 使用 Apache Hive session 化我的日志数据的更好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9610637/

相关文章:

python - 如何获得在 Spark 1.5.2 中使用 HiveContext 制作的 PySpark DataFrame?

scala - Spark JDBC仅返回带有列名的数据帧

apache-spark - 如何在 Spark-Shell 中运行时添加 Hive 属性

python - 如何使用python客户端访问远程服务器上的Hive

windows-7 - Hive 和 PIG/Grunt shell 卡在 cygwin 上

sql - 使用 unixODBC 找不到 hive odbc 连接器错误消息

hadoop - 有人可以向我解释 orcfiledump 的输出吗?

sql - hive 查询 : Ambiguous column reference acct_nbr in stage

Hadoop:级联 FlowException

hive - 使用 hive 连接到远程服务器