python - 存储和检索大量小型非结构化消息的最快方法

标签 python c++ json optimization msgpack

我正在开发一个IOT应用程序,它需要我处理许多小的非结构化消息(这意味着它们的字段会随着时间而改变-有些字段可能会出现,而另一些字段可能会消失)。这些消息通常具有2到15个字段,其值属于基本数据类型(int/long,字符串, bool 值)。这些消息非常适合JSON数据格式(或msgpack)。

消息按到达顺序进行处理至关重要(请理解:它们需要由单个线程处理-无法并行化此部分)。我有自己的逻辑来实时处理这些消息(吞吐量相对较小,最多每秒几十万条消息),但是引擎越来越需要能够通过重播消息历史。尽管起初并不是为了达到这个目的而编写的,但是如果我能够在一个事件处理引擎中向其提供历史数据,那么我的事件处理引擎(用Go编写)可以很好地每秒处理数十条消息(也许是几百条)。足够的速度。

这正是问题所在。我已经在很长一段时间(数年)内存储了许多(数千亿)这些消息,目前以分隔的msgpack格式(https://github.com/msgpack/msgpack-python#streaming-unpacking)存储。在此设置和其他设置下(参见下文),我能够基准化每秒200万条消息的峰值解析速度(在2019年的Macbook Pro上,仅解析),这远没有使磁盘IO饱和。

即使不谈论IO,也请执行以下操作:

import json
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
json_message = json.dumps(message)

%%timeit
json.loads(json_message)

给我一个3微秒/消息的解析时间,略高于30万消息/秒。与ujson,rapidjson和orjson相比,而不是标准库的json模块,我能够获得1微秒/消息(使用ujson)的峰值速度,大约为1M消息/秒。

Msgpack稍微好一点:

import msgpack
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
msgpack_message = msgpack.packb(message)

%%timeit
msgpack.unpackb(msgpack_message)

给我大约750ns/条消息(大约100ns/场)的处理时间,大约是130万条消息/秒。我最初以为C++可能会更快。这是一个使用nlohmann/json的示例,尽管它不能直接与msgpack相提并论:

#include <iostream>
#include "json.hpp"

using json = nlohmann::json;

const std::string message = "{\"value\": \"hello\"}";

int main() {
  auto jsonMessage = json::parse(message);
  for(size_t i=0; i<1000000; ++i) {
    jsonMessage = json::parse(message);
  }
  std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away. 
};

使用clang 11.0.3(std = c++ 17,-O3)进行编译,这在同一Macbook上以〜1.4s的速度运行,也就是说,解析速度为〜700k消息/秒,甚至比Python示例还要小。 。我知道nlohmann/json可能非常慢,并且使用simdjson的DOM API能够获得大约2M消息/秒的解析速度。

对于我的用例来说,这仍然太慢了。我愿意接受所有建议,以提高潜在的Python,C++,Java(或任何JVM语言)或Go应用程序的消息解析速度。

笔记:
  • 我不一定要关心磁盘上消息的大小(如果您建议的存储方法对内存有效,请考虑加号)。
  • 我需要的只是基本数据类型的键值模型-我不需要嵌套的字典或列表。
  • 转换现有数据根本不是问题。我只是在寻找经过优化的内容。
  • 我不一定需要将整个事物解析为一个结构或自定义对象,而仅在需要时访问某些字段(我通常需要每条消息的一小部分字段)-如果这样做的话,这很好只要不破坏整个应用程序的吞吐量,就会带来损失。
  • 我愿意接受自定义/稍微不安全的解决方案。
  • 从消息将被串行写入文件的意义上来说,我选择使用的任何格式都需要自然定界(我目前每天使用一个文件,这足以满足我的用例)。过去,我遇到了不正确分隔消息的问题(请参阅Java Protobuf API中的writeDelimitedTo-丢失单个字节,整个文件都被破坏了)。

  • 我已经探索过的东西:
  • JSON:尝试了Rapidjson,simdjson,nlohmann/json等...)
  • 带有分隔msgpack的
  • 平面文件(请参阅此API:https://github.com/msgpack/msgpack-python#streaming-unpacking):我当前用于存储消息的内容。
  • Protocol Buffer :速度稍快,但实际上不适合数据的非结构化性质。

  • 谢谢!!

    最佳答案

    我假设消息仅包含一些基本类型的命名属性(在运行时定义),并且这些基本类型例如是字符串,整数和浮点数。

    为了使实现更快,最好:

  • 避免文本解析(速度慢,因为顺序且充满条件);
  • 避免检查消息是否格式错误(这里不需要,因为它们都应该格式正确);
  • 尽量避免分配;
  • 处理消息块。

  • 因此,我们首先需要设计一个简单而快速的二进制消息协议(protocol):

    二进制消息包含其属性数量(以1个字节编码),后跟属性列表。每个属性都包含一个字符串,该字符串的前缀是其大小(以1个字节编码),然后是属性的类型(std::variant中的类型的索引,以1个字节编码)以及属性值(大小为-前缀字符串,64位整数或64位浮点数)。

    每个编码的消息都是一个字节流,可以容纳一个较大的缓冲区(分配一次,然后再用于多个传入消息)。

    这是用于解码来自原始二进制缓冲区的消息的代码:

    #include <unordered_map>
    #include <variant>
    #include <climits>
    
    // Define the possible types here
    using AttrType = std::variant<std::string_view, int64_t, double>;
    
    // Decode the `msgData` buffer and write the decoded message into `result`.
    // Assume the message is not ill-formed!
    // msgData must not be freed or modified while the resulting map is being used.
    void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
    {
        static_assert(CHAR_BIT == 8);
    
        const size_t attrCount = msgData[0];
        size_t cur = 1;
    
        result.clear();
    
        for(size_t i=0 ; i<attrCount ; ++i)
        {
            const size_t keyLen = msgData[cur];
            std::string_view key(msgData+cur+1, keyLen);
            cur += 1 + keyLen;
            const size_t attrType = msgData[cur];
            cur++;
    
            // A switch could be better if there is more types
            if(attrType == 0) // std::string_view
            {
                const size_t valueLen = msgData[cur];
                std::string_view value(msgData+cur+1, valueLen);
                cur += 1 + valueLen;
    
                result[key] = std::move(AttrType(value));
            }
            else if(attrType == 1) // Native-endian 64-bit integer
            {
                int64_t value;
    
                // Required to not break the strict aliasing rule
                std::memcpy(&value, msgData+cur, sizeof(int64_t));
                cur += sizeof(int64_t);
    
                result[key] = std::move(AttrType(value));
            }
            else // IEEE-754 double
            {
                double value;
    
                // Required to not break the strict aliasing rule
                std::memcpy(&value, msgData+cur, sizeof(double));
                cur += sizeof(double);
    
                result[key] = std::move(AttrType(value));
            }
        }
    }
    

    您可能还需要编写编码功能(基于相同的想法)。

    这是用法示例(基于与json相关的代码):

    const char* message = "\x01\x05value\x00\x05hello";
    
    void bench()
    {
        std::unordered_map<std::string_view, AttrType> decodedMsg;
        decodedMsg.reserve(16);
    
        decode(message, decodedMsg);
    
        for(size_t i=0; i<1000*1000; ++i)
        {
            decode(message, decodedMsg);
        }
    
        visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
    }
    

    在我的机器(带有Intel i7-9700KF处理器)上,并根据您的基准,使用nlohmann json库的代码获得了2.7M消息/秒,使用新代码获得了35.4M消息/秒。

    请注意,此代码可以更快。确实,大多数时间都花在了有效的哈希和分配上。您可以通过使用更快的哈希映射实现(例如boost::container::flat_map或ska::bytell_hash_map)和/或使用自定义分配器来缓解此问题。一种替代方法是构建自己的精心调整的哈希图实现。另一种选择是使用键值对的 vector 并使用线性搜索来执行查找(这应该很快,因为您的消息应该没有很多属性,并且因为您说每条消息只需要一小部分属性)。
    但是,消息越大,解码速度越慢。因此,您可能需要利用并行性来更快地解码消息块。
    综上所述,这有可能达到超过100 M消息/秒。

    关于python - 存储和检索大量小型非结构化消息的最快方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61609733/

    相关文章:

    python - 如何将执行环境传递给 SGE

    python - Python 中的 for 循环中的 continue 和 pass 之间有区别吗?

    python - 用 beautifulsoup/python 解析公共(public) facebook 帖子

    python - Azure 应用服务上的 FastAPI-Docker 镜像引发 [CRITICAL] 工作超时

    c++ - OpenCV calibrateCamera() 断言失败

    c++ - 在原子上存储并释放内存顺序后调用notify_all方法

    c++ - Halide Jit 编译

    android测试访问json文件

    ios - 从 JSON 中提取值并传递给 Swift 中的另一个 ViewController

    ruby-on-rails - 如何在 ActionController::TestCase 请求中设置内容类型