javascript - StreamSets 将 Text 转换为 Json

标签 javascript pipeline streamsets

我正在尝试将文本数据从本地目录提取到 HDFS,在提取之前我需要将文本转换为有效的 json。为此,我正在使用 JavaScript Evaluator 处理器。

在 javascript 评估器中我无法读取任何记录。

这是我的示例代码:

for(var i = 0; i < records.length; i++) {
 try {  
   output.write(records[i]);
 } catch (e) {
   error.write(records[i], e);
 }
}

除了 JavaScript 求值器还有其他更好的选择吗?

这是我的示例输入数据:

{
    1046=
    1047=
    1048=5324800
    1049=20180508194648
    1095=2297093400,
    1111=up_default
    1118=01414011002101251
    1139=1
}
{
    1140=1
    1176=mdlhggsn01_1.mpt.com;3734773893;2472;58907
    1183=4
    1211=07486390
    1214=0
    1227=51200
    1228=111
    1229=0
    1250=614400,
}

更新:

根据@metadaddy 的回答,我尝试使用 Groovy insted JavaScript。对于@metadaddy 在他的回答中显示的相同数据,我得到以下异常。

这是我的错误截图。 enter image description here

最佳答案

您的 JavaScript 需要通读输入,构建输出记录。

使用 Text 格式,Directory origin 将为每一行输入创建一个包含 /text 字段的记录。

此 JavaScript 将构建您需要的记录结构:

for(var i = 0; i < records.length; i++) {
  try {
    // Start of new input record
    if (records[i].value.text.trim() === '{') {
      // Use starting input record as output record
      // Save in state so it persists across batches
      state.outRecord = records[i];
      // Clean out the value
      state.outRecord.value = {};
      // Move to next line
      i++;
      // Read values to end of input record
      while (i < records.length && records[i].value.text.trim() !== '}') {
        // Split the input line on '='
        var kv = records[i].value.text.trim().split('=');
        // Check that there is something after the '='
        if (kv[1].length > 0) {
          state.outRecord.value[kv[0]] = kv[1];   
        } else if (kv[0].length > 0) {
          state.outRecord.value[kv[0]] = NULL_STRING;
        }
        // Move to next line of input
        i++;
      }

      // Did we hit the '}' before the end of the batch?
      if (i < records.length) {
        // Write record to processor output
        output.write(state.outRecord);
        log.debug('Wrote a record with {} fields', 
            Object.keys(state.outRecord.value).length);
        state.outRecord = null;        
      }
    }
  } catch (e) {
    // Send record to error
    log.error('Error in script: {}', e);
    error.write(records[i], e);
  }
}

下面是样本输入数据转换的预览:

enter image description here

现在,要将整个记录作为 JSON 写入 HDFS,只需将 Hadoop FS 目标中的数据格式设置为 JSON。

关于javascript - StreamSets 将 Text 转换为 Json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50348197/

相关文章:

javascript - 使用javascript查找未标记的元素

azure - 如何在包含许多 Dockerfile 的单一存储库中构建一个特定的 Docker 镜像

c - C Minishell添加管道

python - 在 Jython StreamSets 中导入 python 模块 - ImportError : No module named

mysql - 流集中 jdbc 生产者的问题

javascript - crypto-js 如何隐藏类方法?

javascript - jquery 无法识别 html 元素的 id

javascript - ES6 类中的 Promises

r - 我看到了,但我不相信。 R 中的合法名称、管道操作和点

kubernetes - 在streamset-ns命名空间中找不到资源