我在 Linux 上运行的库中有一个状态机实现。程序的主循环只是简单地等待,直到经过足够的时间要求下一次执行状态机。
此时我有一个类似于以下伪代码的循环:
while( 1 )
{
while( StateTicks() > 0 )
StateMachine();
Pause( 10ms );
}
其中 StateTicks 可能每 50 毫秒左右返回一个滴答声。我使 Pause() 越短,我在程序中使用的 CPU 时间就越多。
有没有更好的方法来测试一段时间的流逝,也许是基于信号?我宁愿停止执行直到 StateTicks() > 0 而不是调用 Pause()。
在状态机实现的底层,StateTicks 使用 clock_gettime(PFT_CLOCK ...)效果很好。我很想保持这种计时,因为如果 StateMachine() 调用花费的时间比状态机滴答的时间长,则此实现将 catch 。
暂停使用nanosleep实现合理准确的暂停时间。
也许这已经是最好的方式了,只是显得不是特别优雅。
最佳答案
使用 timer_create()
创建周期性计时器, 并让它调用 sem_post()
在 “计时器滴答信号量” 上。
为避免丢失报价,我建议使用 real-time signal ,可能是 SIGRTMIN+0
或 SIGRTMAX-0
。 sem_post()
是async-signal-safe ,因此您可以在信号处理程序中安全地使用它。
您的状态机只是等待信号量;不需要其他计时。如果您处理报价的时间太长,则以下 sem_wait()
不会阻塞,而是立即返回。本质上,信号量计算“丢失”的滴答声。
示例代码(未经测试!):
#define _POSIX_C_SOURCE 200809L
#include <semaphore.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#define TICK_SIGNAL (SIGRTMIN+0)
static timer_t tick_timer;
static sem_t tick_semaphore;
static void tick_handler(int signum, siginfo_t *info, void *context)
{
if (info && info->si_code == SI_TIMER) {
const int saved_errno = errno;
sem_post((sem_t *)info->si_value.sival_ptr);
errno = saved_errno;
}
}
static int tick_setup(const struct timespec interval)
{
struct sigaction act;
struct sigevent evt;
struct itimerspec spec;
if (sem_init(&tick_semaphore, 0, 0))
return errno;
sigemptyset(&act.sa_mask);
act.sa_handler = tick_handler;
act.sa_flags = 0;
if (sigaction(TICK_SIGNAL, &act, NULL))
return errno;
evt.sigev_notify = SIGEV_SIGNAL;
evt.sigev_signo = TICK_SIGNAL;
evt.sigev_value.sival_ptr = &tick_semaphore;
if (timer_create(CLOCK_MONOTONIC, &evt, &tick_timer))
return errno;
spec.it_interval = interval;
spec.it_value = interval;
if (timer_settime(tick_timer, 0, &spec, NULL))
return errno;
return 0;
}
滴答循环很简单
if (tick_setup(some_interval))
/* failed, see errno; abort */
while (!sem_wait(&tick_semaphore)) {
/* process tick */
}
如果您支持多个并发状态,一个信号处理程序就足够了。您所在的州通常包括
timer_t timer;
sem_t semaphore;
struct timespec interval;
唯一棘手的事情是在破坏信号将访问的状态时确保没有挂起的计时器信号。
因为信号传递会中断用于信号传递的线程中的任何阻塞 I/O,您可能希望在您的库中设置一个特殊线程来处理计时器滴答实时信号,实时信号在所有其他线程中阻塞线程。你可以标记你的库初始化函数__attribute__((constructor))
,以便它在 main()
之前自动执行。
最佳情况下,您应该使用执行信号传递的报价处理的同一线程。否则,如果信号是使用与运行滴答处理的内核不同的 CPU 内核传送的,滴答处理中会有一些小的抖动或延迟。
Basile Starynkevitch的回答唤起了我对等待和信号传递所涉及的延迟的内存:如果你使用 nanosleep()
和 clock_gettime(CLOCK_MONOTONIC,)
,您可以调整 sleep 时间以解决典型的延迟。
这是一个使用 clock_gettime(CLOCK_MONOTONIC,)
和 nanosleep()
的快速测试程序:
#define _POSIX_C_SOURCE 200809L
#include <sys/select.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
static const long tick_latency = 75000L; /* 0.75 ms */
static const long tick_adjust = 75000L; /* 0.75 ms */
typedef struct {
struct timespec next;
struct timespec tick;
} state;
void state_init(state *const s, const double ticks_per_sec)
{
if (ticks_per_sec > 0.0) {
const double interval = 1.0 / ticks_per_sec;
s->tick.tv_sec = (time_t)interval;
s->tick.tv_nsec = (long)(1000000000.0 * (interval - (double)s->tick.tv_sec));
if (s->tick.tv_nsec < 0L)
s->tick.tv_nsec = 0L;
else
if (s->tick.tv_nsec > 999999999L)
s->tick.tv_nsec = 999999999L;
} else {
s->tick.tv_sec = 0;
s->tick.tv_nsec = 0L;
}
clock_gettime(CLOCK_MONOTONIC, &s->next);
}
static unsigned long count;
double state_tick(state *const s)
{
struct timespec now, left;
/* Next tick. */
s->next.tv_sec += s->tick.tv_sec;
s->next.tv_nsec += s->tick.tv_nsec;
if (s->next.tv_nsec >= 1000000000L) {
s->next.tv_nsec -= 1000000000L;
s->next.tv_sec++;
}
count = 0UL;
while (1) {
/* Get current time. */
clock_gettime(CLOCK_MONOTONIC, &now);
/* Past tick time? */
if (now.tv_sec > s->next.tv_sec ||
(now.tv_sec == s->next.tv_sec &&
now.tv_nsec >= s->next.tv_nsec - tick_latency))
return (double)(now.tv_sec - s->next.tv_sec)
+ (double)(now.tv_nsec - s->next.tv_nsec) / 1000000000.0;
/* Calculate duration to wait */
left.tv_sec = s->next.tv_sec - now.tv_sec;
left.tv_nsec = s->next.tv_nsec - now.tv_nsec - tick_adjust;
if (left.tv_nsec >= 1000000000L) {
left.tv_nsec -= 1000000000L;
left.tv_sec++;
} else
if (left.tv_nsec < -1000000000L) {
left.tv_nsec += 2000000000L;
left.tv_sec += 2;
} else
if (left.tv_nsec < 0L) {
left.tv_nsec += 1000000000L;
left.tv_sec--;
}
count++;
nanosleep(&left, NULL);
}
}
int main(int argc, char *argv[])
{
double rate, jitter;
long ticks, i;
state s;
char dummy;
if (argc != 3 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s TICKS_PER_SEC TICKS\n", argv[0]);
fprintf(stderr, "\n");
return 1;
}
if (sscanf(argv[1], " %lf %c", &rate, &dummy) != 1 || rate <= 0.0) {
fprintf(stderr, "%s: Invalid tick rate.\n", argv[1]);
return 1;
}
if (sscanf(argv[2], " %ld %c", &ticks, &dummy) != 1 || ticks < 1L) {
fprintf(stderr, "%s: Invalid tick count.\n", argv[2]);
return 1;
}
state_init(&s, rate);
for (i = 0L; i < ticks; i++) {
jitter = state_tick(&s);
if (jitter > 0.0)
printf("Tick %9ld: Delayed %9.6f ms, %lu sleeps\n", i+1L, +1000.0 * jitter, count);
else
if (jitter < 0.0)
printf("Tick %9ld: Premature %9.6f ms, %lu sleeps\n", i+1L, -1000.0 * jitter, count);
else
printf("Tick %9ld: Exactly on time, %lu sleeps\n", i+1L, count);
fflush(stdout);
}
return 0;
}
上面,tick_latency
是你愿意提前接受一个“tick”的纳秒数,tick_adjust
是你从每次 sleep 中减去的纳秒数持续时间。
这些的最佳值是高度特定于配置的,而且我还没有一个可靠的方法来估计它们。对它们进行硬编码(如上所述为 0.75 毫秒)对我来说也不太好;也许使用命令行选项或环境值让用户控制它,默认为零会更好。
无论如何,将上面的编译为
gcc -O2 test.c -lrt -o test
并以 50Hz 的滴答率运行 500 滴答的测试,
./test 50 500 | sort -k 4
显示在我的机器上,滴答声在所需时刻的 0.051 毫秒(51 微秒)内被接受。即使降低优先级似乎也没有太大影响。以 5kHz 速率使用 5000 个滴答(每个滴答 0.2 毫秒)的测试,
nice -n 19 ./test 5000 5000 | sort -k 4
产生类似的结果——尽管我没有费心去检查如果机器负载在运行期间发生变化会发生什么。
换句话说,在一台机器上的初步测试表明它可能是一个可行的选择,因此您可能希望在不同的机器和不同的负载下测试该方案。它比我在自己的机器上预期的要精确得多(x86_64 上的 Ubuntu 3.11.0-24-generic,在 AMD Athlon II X4 640 CPU 上运行)。
这种方法有一个有趣的特性,即您可以轻松地使用单个线程来维护多个状态,即使它们使用不同的节拍率。您只需要检查哪个状态有下一个滴答声(最早的 ->next
时间),nanosleep()
如果将来发生,并处理滴答声,推进那个状态到下一个报价单。
有问题吗?
关于c - 在 c 中有效暂停执行的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24629598/