c - 为什么Linux C串口只读可以在高CPU下工作?

标签 c linux serial-port

我正在使用以下代码从串行端口读取(在阻塞模式下)



    int fd;

    int res;
    res = open_port(O_RDONLY | O_NOCTTY, &fd);


    while (num_bytes_read < C_BUFFER_ENTRY_SIZE)
    {
        num_buf_byte_remaining = C_BUFFER_ENTRY_SIZE - num_bytes_read;      
        rc = read(fd, &buf[num_bytes_read], num_buf_byte_remaining);

        if (rc > 0)
        {
            num_bytes_read += rc;
        }

        if (rc == -1)
        {
            printf("Read error %s\n",strerror(errno));
        }
    }

我正在使用的端口是/dev/ttyUSB0(串行端口),它是使用以下配置进行配置的。

    // BAUDRATE B3000000
int open_port(int flags, int *fd)
{
    struct termios options;
    *fd = open("/dev/ttyUSB0", flags | O_NOCTTY);

    if(*fd < 0){
        printf("Error opening port\n");
        return 1;
    } else{
        printf("Port handle is %d\n", *fd);
    }

    options.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(*fd, TCIFLUSH);
    tcsetattr(*fd, TCSANOW, &options);

    return 0;
}

通过该串行链路传输的数据量非常高(大约 1-2Mbps),因此读取速度需要很快。

我看到的行为是,当系统正常运行时,读取循环经常丢失数据。但是,当我使用 nice -n 20 cat/dev/zero >/dev/null 增加加载时,所有错误都会突然停止(我有一个 header + count + CRCpacket 结构,因此可以检查损坏和连续性)。

  1. 为什么增加 CPU 负载实际上会提高该循环的性能?
  2. 我需要更改终端设置以提高性能吗? (我尝试过将 VMIN 设置为 0)

编辑

根据下面的评论,我将扩展上述内容。 data的buf变量是放置从串口读取的数据的地方。这个 while 循环一直持续到 buf 中有 C_BUFFER_ENTRY_SIZE (4096) 字节为止。

在 while 循环中断后(因为字节数等于 4906),buf 被添加到队列中,以便从第二个线程读取。队列受互斥体和条件变量的保护,请参见下文。

/// function is after above while 1 loop
add_to_queue(buf,num_bytes_read);

// ring buff struct
typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t fill;
    pthread_cond_t empty;
    int s_icm_idx; // InComing Message index
    int s_ogm_idx; // OutGoing Message index
    int s_entry_count;
    int s_entry_count_hi;
    int used_bytes[10000];
    ring_buffer_data_t buffer[10000];
}ring_buffer_t;

typedef struct
{
    char data[4096];
}ring_buffer_data_t;


///
int add_to_queue(void* a_data, int a_bytes)
{
    int ret_val = 0;

    if (a_bytes > C_BUFFER_ENTRY_SIZE)
    {
        return 2;
    }

    pthread_mutex_lock(&(circ_buf.mutex));

    while (circ_buf.s_entry_count == 10000) 
    {
        ret_val = 1;
        pthread_cond_wait(&(circ_buf.empty), &(circ_buf.mutex));
    }

    /* Put the data on the queue, set how many bytes of the queue */
    /* are used and then incoming message index and the queue     */
    /* entry counter.                                             */
    memcpy(&(circ_buf.buffer[circ_buf.s_icm_idx]), a_data, a_bytes);
    circ_buf.used_bytes[circ_buf.s_icm_idx] = a_bytes;

    circ_buf.s_icm_idx = (circ_buf.s_icm_idx + 1) % 10000;
    crc_buf.s_entry_count++;

    pthread_cond_signal(&(circ_buf.fill));
    pthread_mutex_unlock(&(circ_buf.mutex));

    return ret_val;
}

第二个线程从此队列中读取数据,然后使用简单的状态机解析每个 4906 字节段。 IE。循环遍历字节,直到找到 header 字节,然后检查 idx + 5 以获得 CRC,如果 crc 有效,则对整个 6 个字节执行 CRC,然后读取消息。

我看到从串行读取的数据已损坏,这是由 CRC 失败指示的。当我加载 CPU 时,CRC 检查不会失败。而且因为 header 和 CRC 之间的 4 个字节是一个计数,所以我可以看到没有消息被丢弃(连续计数)。

我确信当负载较低时,CRC 检查不是错误的。

编辑2:

完整的可编译代码

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <termio.h>
#include <linux/serial.h>
#include <assert.h>
#include <string.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>

#include <time.h>

//The width and depth of the queue, ie the max size message that can be placed on it.
#define C_BUFFER_ENTRY_SIZE 4096
#define C_FIFO_ENTRIES_MAX 10000

typedef struct
{
    char data[C_BUFFER_ENTRY_SIZE];
}ring_buffer_data_t;


typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t fill;
    pthread_cond_t empty;

    int s_icm_idx; // InComing Message index
    int s_ogm_idx; // OutGoing Message index
    int s_entry_count;
    int s_entry_count_hi;
    int used_bytes[C_FIFO_ENTRIES_MAX];
    ring_buffer_data_t buffer[C_FIFO_ENTRIES_MAX];
} ring_buffer_t;


static ring_buffer_t s_rb = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, 0, 0, {0}, {{{0}}}};
const uint8_t CRC_LUT[256] = {0, 94, 188, 226, 97, 63, 221, 131, 194, 156, 126, 32, 163, 253, 31, 65, 157, 195, 33, 127, 252, 162, 64, 30, 95, 1, 227, 189, 62, 96, 130, 220, 35, 125, 159, 193, 66, 28, 254, 160, 225, 191, 93, 3, 128, 222, 60, 98, 190, 224, 2, 92, 223, 129, 99, 61, 124, 34, 192, 158, 29, 67, 161, 255, 70, 24, 250, 164, 39, 121, 155, 197, 132, 218, 56, 102, 229, 187, 89, 7, 219, 133, 103, 57, 186, 228, 6, 88, 25, 71, 165, 251, 120, 38, 196, 154, 101, 59, 217, 135, 4, 90, 184, 230, 167, 249, 27, 69, 198, 152, 122, 36, 248, 166, 68, 26, 153, 199, 37, 123, 58, 100, 134, 216, 91, 5, 231, 185, 140, 210, 48, 110, 237, 179, 81, 15, 78, 16, 242, 172, 47, 113, 147, 205, 17, 79, 173, 243, 112, 46, 204, 146, 211, 141, 111, 49, 178, 236, 14, 80, 175, 241, 19, 77, 206, 144, 114, 44, 109, 51, 209, 143, 12, 82, 176, 238, 50, 108, 142, 208, 83, 13, 239, 177, 240, 174, 76, 18, 145, 207, 45, 115, 202, 148, 118, 40, 171, 245, 23, 73, 8, 86, 180, 234, 105, 55, 213, 139, 87, 9, 235, 181, 54, 104, 138, 212, 149, 203, 41, 119, 244, 170, 72, 22, 233, 183, 85, 11, 136, 214, 52, 106, 43, 117, 151, 201, 74, 20, 246, 168, 116, 42, 200, 150, 21, 75, 169, 247, 182, 232, 10, 84, 215, 137, 107, 53};
int id;

FILE *log_fh;

void *dequeue_and_log(void* fd)
{
    #define HDR_START 'X'

    // FILE *fd;

    //fd = fopen("log.log", "wb");
    uint8_t parse_buffer[2 * C_BUFFER_ENTRY_SIZE] = {0};
    int bytes_to_write = 0;
    while (1)
    {
        pthread_mutex_lock(&(s_rb.mutex));

        while (s_rb.s_entry_count == 0)
        {
            pthread_cond_wait(&(s_rb.fill), &(s_rb.mutex));
        }
        // Get the incoming data from the buffer
        bytes_to_write = s_rb.used_bytes[s_rb.s_ogm_idx];
        memcpy(parse_buffer, &(s_rb.buffer[s_rb.s_ogm_idx]),bytes_to_write);

        s_rb.s_ogm_idx = (s_rb.s_ogm_idx + 1) % C_FIFO_ENTRIES_MAX;
        s_rb.s_entry_count--;

        pthread_cond_signal(&(s_rb.empty));
        pthread_mutex_unlock(&(s_rb.mutex));
        // binary write
        fwrite(parse_buffer,bytes_to_write,1,(FILE*)fd);
    }
}

int add_to_queue(void* a_data, int a_bytes)
{
    int ret_val = 0;

    if (a_bytes > C_BUFFER_ENTRY_SIZE)
    {
        printf("add_to_queue oversized send %i\n", a_bytes);
        return 2;
    }

    pthread_mutex_lock(&(s_rb.mutex));  

    while (s_rb.s_entry_count == C_FIFO_ENTRIES_MAX) 
    {
        ret_val = 1;
        printf("Queue is FULL\n");
        pthread_cond_wait(&(s_rb.empty), &(s_rb.mutex));
    }

    /* Put the data on the queue, set how many bytes of the queue */
    /* are used and then incoming message index and the queue     */
    /* entry counter.                                             */
    memcpy(&(s_rb.buffer[s_rb.s_icm_idx]), a_data, a_bytes);
    s_rb.used_bytes[s_rb.s_icm_idx] = a_bytes;

    s_rb.s_icm_idx = (s_rb.s_icm_idx + 1) % C_FIFO_ENTRIES_MAX;
    s_rb.s_entry_count++;

    pthread_cond_signal(&(s_rb.fill));
    pthread_mutex_unlock(&(s_rb.mutex));

    return ret_val;
}

int open_port(char *port_name, int flags, int *fd)
{
    struct termios options;

    //*fd = open(port_name, O_RDWR | O_NOCTTY);
    *fd = open(port_name, flags | O_NOCTTY);

    if(*fd < 0){
        printf("Error opening port\n");
        return 1;
    } else{
        printf("Port handle is %d\n", *fd);
    }

    //Assume port has already been setup by receiver
    #if(1)
    options.c_cflag = B3000000 | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(*fd, TCIFLUSH);
    tcsetattr(*fd, TCSANOW, &options);
    #endif

    return 0;
}


int main(int argc, char **argv)
{
    int fd;
    int res;
    uint8_t buf[C_BUFFER_ENTRY_SIZE];
    int rc = 0;

    pthread_t p_parse;
    setbuf(stdout, NULL);

    int num_bytes_read = 0;
    int num_buf_byte_remaining = 0;

    #define HB_PRINT_INTERVAL 200

    unsigned long int heartbeat_seq_count = 0;
    unsigned long int heartbeat_seq_count_last_print = 0;

    pthread_mutex_init(&(s_rb.mutex), NULL);

    log_fh = fopen(argv[2], "wb");

    res = open_port(argv[1], O_RDONLY, &fd);
    pthread_create(&p_parse, NULL, dequeue_and_log, (void *)log_fh);

    while(1){
        heartbeat_seq_count++;

        if (heartbeat_seq_count - heartbeat_seq_count_last_print >= HB_PRINT_INTERVAL)
        {
            printf("Sequence ID %10ld\n", heartbeat_seq_count);             
            heartbeat_seq_count_last_print = heartbeat_seq_count;
        }

        num_bytes_read = 0;
        while (num_bytes_read < C_BUFFER_ENTRY_SIZE)
        {
            num_buf_byte_remaining = C_BUFFER_ENTRY_SIZE - num_bytes_read;      
            rc = read(fd, &buf[num_bytes_read], num_buf_byte_remaining);


            if (rc > 0)
            {
                num_bytes_read += rc;
            }
            else if (rc == -1)
            {
                printf("Read error %s\n",strerror(errno));
            }
            else 
            {}
        }

        //add data to logging queue
        add_to_queue(buf, num_bytes_read);
    }

    fclose(log_fh);
    return 0;
}

  1. 从串行读取添加到队列
  2. 第二个线程从队列中读取数据并写入磁盘

Python 脚本 post 处理输出文件以检查数据是否损坏。测试完成后,它会在单独的 PC 上运行,因此不会影响负载。我知道这个脚本可以很好地解析数据。

有三件事我已经测试过但无法解释

  1. VMIN = 200 并且无后台加载 = 二进制文件中的数据已损坏
  2. VMIN = 0 并且无后台负载 = 二进制文件中的数据已损坏
  3. VMIN = 200 且后台加载 = NO 二进制文件中的数据已损坏

将 VMIN 设置为零会增加 CPU 负载(因为读取循环运行得更快,因为读取函数非常频繁地返回 0)。因此,我不确定是否实际上是 VMIN 导致了问题,或者更多的是增加 CPU 负载使读取行为正常。

重申一下,这不是解码函数。在上述没有损坏数据的情况下,该函数可以完美地从文件中运行。

最佳答案

由于您的代码无法在无法阻止发生的常见情况下工作,因此“为什么它只能在高 CPU 情况下工作?”的原因没关系。花费大量时间和精力找出“为什么?”可能很有趣,但我认为您必须更改代码,因为当 CPU 负载下降时,任何停止工作的东西都是,IMO,waaaaay任何时候都无法信任工作。

首先,线程在您的系统上有用吗?如果只有一个CPU一次只能运行一个线程,那么创建多个线程将会适得其反。您是否尝试过简单的单线程解决方案,但实际上发现它不起作用?

如果您尝试过单线程解决方案并且它不起作用,我注意到的第一件事是您当前发布的代码正在做大量不需要做的额外工作,并且它可能会竞争超过一个锁,这根本没有多大帮助。

因此,请消除无关的数据复制以及所有不必要的簿记工作。

您可能还会因单个互斥体和条件变量而产生很多争用。没有必要因为日志记录线程正在做某事而不读取,或者因为读取线程正在做一些簿记而日志记录线程没有处理。您几乎肯定会受益于更精细的锁定粒度。

我会做这样的事情:

#define CHUNK_SIZE ( 4 * 1024 )
#define NUM_BUFFERS 16

struct dataStruct
{
    sem_t full;
    sem_t empty;
    ssize_t count;
    char data[ CHUNK_SIZE ]
};

struct dataStruct dataArray[ NUM_BUFFERS ];

void initDataArray( void )
{
    for ( int ii = 0; ii < NUM_BUFFERS; ii++ )
    {
        // buffers all start empty
        sem_init( &( dataArray[ ii ].full ), 0, 0 );
        sem_init( &( dataArray[ ii ].empty ), 0, 1 );
    }
}

void *readPortThread( void *arg )
{
    unsigned currBuffer = 0;

    // get portFD from arg
    int portFD = ???
    for ( ;; )
    {
        sem_wait( &( dataArray[ currBuffer  ].empty ) );

        // probably should loop and read more, and don't
        // infinite loop on any error (hint...)
        dataArray[ currBuffer  ].count = read( portFD, 
            dataArray[ currBuffer  ].data,
            sizeof( dataArray[ currBuffer  ].data ) );
        sem_post( &( dataArray[ currBuffer  ].full ) );
        currBuffer++;
        currBuffer  %= NUM_BUFFERS;
    }
    return( NULL );
}

void *processData( char *data, ssize_t count )
{
    ...
}

void *logDataThread( void *arg )
{
    for ( ;; )
    {
        sem_wait( &( dataArray[ currBuffer  ].full ) );

        processData( dataArray[ currBuffer  ].data,
            dataArray[ currBuffer  ].count );

        sem_post( &( dataArray[ currBuffer  ].empty ) );
        currBuffer++;
        currBuffer  %= NUM_BUFFERS;
    }
    return( NULL );
}

请注意更精细的锁定粒度,并且完全没有无关的数据复制。正确的 header 、所有错误检查和完整实现都作为练习......

您必须进行测试才能找到 CHUNK_SIZENUM_BUFFERS 的最佳值。动态设置缓冲区的数量也是一个很好的改进。

OT:int open_port(int flags, int *fd) 函数中不需要任何间接寻址。只需返回 fd 值 - 对于 open() 本身来说就足够了:

int open_port(int flags )
{
    struct termios options;
    int fd = open("/dev/ttyUSB0", flags | O_NOCTTY);

    if(fd < 0){
        printf("Error opening port\n");
        return fd;
    }

    // no need for else - the above if statement returns
    printf("Port handle is %d\n", fd);

    // did you set **ALL** the fields???
    memset( options, 0, sizeof( options ) );

    options.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
    options.c_iflag = 0;
    options.c_oflag = 0;
    options.c_lflag = 0;
    options.c_cc[VTIME] = 0;
    options.c_cc[VMIN] = 200;
    tcflush(fd, TCIFLUSH);
    tcsetattr(fd, TCSANOW, &options);

    return fd;
}

关于c - 为什么Linux C串口只读可以在高CPU下工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58956423/

相关文章:

linux - bash脚本的奇怪行为

python - 无法使用 python 和 pyserial 打开/dev/ttyusb0

c - 为什么在递归函数中修剪字符串时会出现段错误?

objective-c - 除了一个源文件之外的所有源文件中的前缀头?

c - 将文本文件中的一行分配给 C 中的变量

linux - 使用图标运行Android Studio - Linux Mint

ruby-on-rails - SSH 和 -bash : fork: Cannot allocate memory Ubuntu , rails、Passenger、redis、sidekiq

c - 警告 : ‘struct tag’ declared inside parameter list will not be visible outside of this definition or declaration void func(struct tag v);

linux - pxa255 中的 RS485 支持

string - PySerial 和 readline() 返回二进制字符串 - 转换为常规字母数字字符串?