c++ - 基于 cuda 推力的方法对 TCP 流中的数据包进行分组

标签 c++ cuda gpu thrust

我有包含数百万个数据包的捕获数据包的 tcpdumps (.pcap) 文件。我需要将这些网络数据包分组到 TCP 流中。

例子: 让我们考虑以下数据包 no => source_ip, destination_ip,source_port,destination_port

1 => ip1, ip2, s1, s2

2 => ip1, ip3, s3, s4

3 => ip2,ip1,s2,s1

4 => ip3,ip1,s4,s3

现在在上面的四个数据包的例子中,数据包 1,3 和 2,4 是同一个流的数据包。即我需要将以下数据包解析为 [[1,3],[2,4]]。

我的方法:

由于 (ip1, ip2, s1, s2) 和 (ip2, ip1, s2, s1) 表示相同的流,所以我决定对它们进行哈希处理并将其命名为 forward_hash 和 reverse hash,因为它们表示流过相同流的数据包在相反的方向。

我使用索引数组在替换和排序期间跟踪数据包。最终排序后,提取相同哈希值的开始和结束,并将其用于索引数组以获取表示该流的数据包索引

keys is the forward_hash of each packets, 
count is number of packets, 
packet_ids is the id of each packet corresponding to each of the hash

    thrust::device_vector<unsigned long long> d_keys(keys,(keys+count));
            thrust::device_vector<unsigned long long> d_ids(packet_ids,(packet_ids+count));
            // now sort the ids according to the keys
            thrust::sort_by_key(d_keys.begin(), d_keys.end(), d_ids.begin());
// after sorting, now we need to find the index of each hash
thrust::device_vector<unsigned long long> u_keys(count);
        thrust::device_vector<unsigned long long> output(count);

        thrust::pair<thrust::device_vector<unsigned long long>::iterator, thrust::device_vector<unsigned long long>::iterator> new_end;
        new_end = thrust::reduce_by_key(d_keys.begin(), d_keys.end(),thrust::make_constant_iterator(1),u_keys.begin(),output.begin());
// now we need to find starting index to each hash
....

我已经尝试为唯一的正向和反向哈希实现哈希表查找,但是在排序之前用正向哈希替换每个反向哈希......但是性能很慢。我 有帮助吗?

谢谢

最佳答案

我提出另一种方法,首先对每个数据包进行排序,然后对数据包进行排序。

示例代码执行以下步骤:

  1. 为了识别同一TCP流的数据包,我们需要对数据包进行排序。 在此之前,我们需要确保中每个发送的数据包源和目标都已排序。 示例:20:1 -> 10:4 变为 10:4 -> 20:1

  2. 现在我们可以对数据包进行排序,以便将同一流的数据包分组。 此代码假定输入数据包按时间排序。我们应用稳定排序,以便在每个流中保持排序。

  3. 我们需要找出每个 TCP 流的起始位置。此步骤的结果是指向已排序数据包列表中 TCP 流开头的索引。

  4. 根据您需要结果的方式,我们可以生成有关流的其他信息,例如每个流的数据包数。

可能的改进:

如果您知道 IP 地址仅在某个有限范围内,则可能仅使用 16 位来表示它们。 然后,您可以将发送方地址、发送方端口、接收方地址、接收方端口压缩为一个 64 位整数,这将提高排序性能。


编译运行

nvcc -std=c++11 sort_packets.cu -o sort_packets && ./sort_packets

输出

input data
d_src_addr: 20  10  20  20  30  30  10  20  30  20  
d_src_port: 1   2   3   1   2   2   6   1   1   1   
d_dst_addr: 10  20  30  10  20  20  30  10  10  10  
d_dst_port: 4   2   3   4   5   5   1   4   6   4   

packets after sort_within_packet
d_src_addr: 10  10  20  10  20  20  10  10  10  10  
d_src_port: 4   2   3   4   5   5   6   4   6   4   
d_dst_addr: 20  20  30  20  30  30  30  20  30  20  
d_dst_port: 1   2   3   1   2   2   1   1   1   1   

after stable_sort
d_orig_ind: 1   0   3   7   9   6   8   2   4   5   

packets after stable_sort
d_src_addr: 10  10  10  10  10  10  10  20  20  20  
d_src_port: 2   4   4   4   4   6   6   3   5   5   
d_dst_addr: 20  20  20  20  20  30  30  30  30  30  
d_dst_port: 2   1   1   1   1   1   1   3   2   2   

after copy_if
d_start_indices:    0   1   5   7   8   
d_stream_lengths:   1   4   2   1   2   

group of streams referencing the original indices
[1] [0,3,7,9]   [6,8]   [2] [4,5]

sort_packets.cu

#include <stdint.h>
#include <iostream>
#include <thrust/device_vector.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/sort.h>
#include <thrust/sequence.h>
#include <thrust/copy.h>
#include <thrust/functional.h>
#include <thrust/adjacent_difference.h>
#include <thrust/scatter.h>


#define PRINTER(name) print(#name, (name))
template <template <typename...> class V, typename T, typename ...Args>
void print(const char* name, const V<T,Args...> & v)
{
    std::cout << name << ":\t";
    thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, "\t"));
    std::cout << std::endl;
}

typedef thrust::tuple<uint32_t, uint16_t, uint32_t, uint16_t> Packet;

struct sort_within_packet : public thrust::unary_function<Packet, Packet>
{
    __host__ __device__
    Packet operator()(Packet p) const
    {
        if (thrust::get<0>(p) > thrust::get<2>(p))
        {
            Packet copy(p);
            thrust::get<0>(p) = thrust::get<2>(copy);
            thrust::get<1>(p) = thrust::get<3>(copy);
            thrust::get<2>(p) = thrust::get<0>(copy);
            thrust::get<3>(p) = thrust::get<1>(copy);
        }
        return p;
    }
};

struct find_start_indices : public thrust::unary_function<thrust::tuple<Packet, Packet>, bool>
{
    __host__ __device__
    bool operator()(thrust::tuple<Packet, Packet> p)
    {
       return (thrust::get<0>(p) != thrust::get<1>(p));
    }
};

template<typename... Iterators>
__host__ __device__
thrust::zip_iterator<thrust::tuple<Iterators...>> zip(Iterators... its)
{
    return thrust::make_zip_iterator(thrust::make_tuple(its...));
}


int main()
{
    // in this example we just have 10 packets
    const int N = 10;

    // demo data
    // this example uses very simple "IP addresses"
    uint32_t srcAddrArray[N] = {20, 10, 20, 20, 30, 30, 10, 20, 30, 20};
    uint16_t srcPortArray[N] = {1 , 2 , 3 , 1 , 2 , 2 , 6 , 1 , 1 , 1 };

    uint32_t dstAddrArray[N] = {10, 20, 30, 10, 20, 20, 30, 10, 10, 10};
    uint16_t dstPortArray[N] = {4 , 2 , 3 , 4 , 5 , 5 , 1 , 4 , 6 , 4 };

    // upload data to GPU
    thrust::device_vector<uint32_t> d_src_addr(srcAddrArray, srcAddrArray+N);
    thrust::device_vector<uint16_t> d_src_port(srcPortArray, srcPortArray+N);

    thrust::device_vector<uint32_t> d_dst_addr(dstAddrArray, dstAddrArray+N);
    thrust::device_vector<uint16_t> d_dst_port(dstPortArray, dstPortArray+N);

    thrust::device_vector<uint32_t> d_orig_ind(N);
    thrust::sequence(d_orig_ind.begin(), d_orig_ind.end());

    std::cout << "input data" << std::endl;
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;

    // 1. sort within packet
    auto zip_begin = zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin());
    auto zip_end   = zip(d_src_addr.end(),   d_src_port.end(),   d_dst_addr.end(),   d_dst_port.end());
    thrust::transform(zip_begin, zip_end, zip_begin, sort_within_packet());

    std::cout << "packets after sort_within_packet" << std::endl;
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;

    // 2. sort packets
    thrust::stable_sort(zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin(), d_orig_ind.begin()),
                        zip(d_src_addr.end(),   d_src_port.end(),   d_dst_addr.end(),   d_dst_port.end(),   d_orig_ind.end()));

    std::cout << "after stable_sort" << std::endl;
    PRINTER(d_orig_ind); std::cout << std::endl;

    std::cout << "packets after stable_sort" << std::endl;
    PRINTER(d_src_addr); PRINTER(d_src_port); PRINTER(d_dst_addr); PRINTER(d_dst_port); std::cout << std::endl;

    // 3. find stard indices of each stream
    thrust::device_vector<uint32_t> d_start_indices(N);

    using namespace thrust::placeholders;
    thrust::device_vector<uint32_t>::iterator copyEnd = thrust::copy_if(thrust::make_counting_iterator(1), thrust::make_counting_iterator(N),
                                                                            thrust::make_transform_iterator(
                                                                               zip(
                                                                                   zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()),
                                                                                   zip(d_src_addr.begin()+1, d_src_port.begin()+1, d_dst_addr.begin()+1, d_dst_port.begin()+1)  
                                                                                ),
                                                                                find_start_indices()
                                                                            ),
                                                                            d_start_indices.begin()+1, _1);

    uint32_t streamCount = copyEnd-d_start_indices.begin();
    d_start_indices.resize(streamCount);

    std::cout << "after copy_if" << std::endl;
    PRINTER(d_start_indices);

    // 4. generate some additional information about the result and print result formatted
    thrust::device_vector<uint32_t> d_stream_lengths(streamCount+1);
    thrust::adjacent_difference(d_start_indices.begin(), d_start_indices.end(), d_stream_lengths.begin());
    d_stream_lengths.erase(d_stream_lengths.begin());
    d_stream_lengths.back() = N-d_start_indices.back();
    PRINTER(d_stream_lengths);

    thrust::host_vector<uint32_t> h_start_indices = d_start_indices;
    thrust::host_vector<uint32_t> h_orig_ind = d_orig_ind;

    auto index = h_start_indices.begin();
    index++;

    std::cout << std::endl << "group of streams referencing the original indices"<< std::endl <<  "[" <<  h_orig_ind[0];
    for(int i=1; i<N;++i)
    {
      if (i == *index)
      {
         index++;
         std::cout << "]\t[";
      }
      else
      {
         std::cout << ",";
      }
      std::cout << h_orig_ind[i];
    }
    std::cout << "]" << std::endl;

    return 0;
}

关于c++ - 基于 cuda 推力的方法对 TCP 流中的数据包进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31305929/

相关文章:

c++ - 命名空间作为 CUDA 中的模板参数

cuda - 使用多个 CUDA GPU

tensorflow - Keras CNN 如何减少大图像尺寸的 GPU 内存使用?

c++ - Simpson 的 Thrust 集成代码在两台使用 NVC++ 的机器上输出不同的结果

c++ - 制作 C 项目而不是 C++

c++ - 从未知类型的 cv::Mat 获取 cv::Scalar

c++ - C++20 范围的切片 View

c++ - 使用可重定位设备代码时的 CUDA 8.0 Visual Studio 2012 链接器选项

android - Opencl 与 Android 的集成

c++ - 在 autoconf 中检查仅 header 库