python - 在 Python 中使用 LombScargle 对 HRV 数据进行频谱分析

标签 python signal-processing frequency-analysis astropy

我正在处理 RR 峰值,并希望导出 HRV 的频域测量值,以重新创建 Physionet(WFDB 工具)的 native C 软件包的结果。信号处理和频谱分析对我来说都是新领域,但经过漫长的一周尝试错误后,我已经根据 Astropy 编写了一些代码。尝试了其他几种解决方案后。

from astropy.stats import LombScargle
import random
dy = 0.1 * random.randint(1,100)
t = drive01["time"].values
y = drive01["intervals"].values
frequency, power = LombScargle(t, y,dy).autopower(minimum_frequency=0.0,maximum_frequency=4)

plt.plot(frequency, power)  

这将创建一个看起来与 Physionets 包中的图非常相似的图。 Matplot lib

Physionets HRV 工具使用代码 get_hrv 绘制了此图 enter image description here

然后通过计算常见的频域测量,我得到了截然不同的结果。

Pxx = np.nan_to_num(power)
Fxx = np.nan_to_num(frequency)

ulf = 0.003
vlf = 0.04
lf = 0.15
hf = 0.4
Fs = 15.5 # the sampling rate of the drive file
# find the indexes corresponding to the VLF, LF, and HF bands
vlf_freq_band = (Fxx >= ulf) & (Fxx <= vlf)
lf_freq_band = (Fxx >= vlf) & (Fxx <= lf)
hf_freq_band = (Fxx >= lf) & (Fxx <= hf)
tp_freq_band = (Fxx >= 0) & (Fxx <= hf)
# Calculate the area under the given frequency band
dy = 1.0 / Fs
VLF = np.trapz(y=abs(Pxx[vlf_freq_band]), x=None, dx=dy)
LF = np.trapz(y=abs(Pxx[lf_freq_band]), x=None, dx=dy)
HF = np.trapz(y=abs(Pxx[hf_freq_band]), x=None, dx=dy)
TP = np.trapz(y=abs(Pxx[tp_freq_band]), x=None, dx=dy)
LF_HF = float(LF) / HF

Python

 'HF': 0.10918703853414605,
 'LF': 0.050074418080717789,
 'LF/HF': 0.45861137689028925,
 'TP': 0.20150514290250854,
 'VLF': 0.025953350304821571

来自 Physionet 包:

TOT PWR  = 0.0185973
VLF PWR  = 0.00372733
LF PWR   = 0.00472635
HF PWR   = 0.0101436
LF/HF    = 0.465944

比较结果时,结果如下所示:

    Python  Physionet   
TP  0.201505143 0.0185973   Quite similar  + decimal dif
HF  0.109187039 0.0101436   Quite similar  + decimal dif
LF  0.050074418 0.00472635  Quite similar  + decimal dif
VLF 0.02595335  0.00372733  Not similar
LF/HF   0.458611377 0.465944    Quite similar

Python中的计算基于另一个Stackoverflow post的代码但他从受访者那里得到的修复是基于一个 python 模块,我无法工作,而且他没有使用 Lomb periodgram。我也非常愿意尝试其他东西,只要它能处理不均匀的样本。 我正在使用的数据是 drivedb from Physionet我使用 Physionet 包制作了一个包含 RR 峰值和时间的文本文件,该文件被读入 Pandas DataFrame。 The textfile can be found here

最佳答案

LombScargle 基于 Astropy 计算功率,与 Physionet 的 C 软件包(WFDB 工具)不同。我再次用 python 编写 lombscargle,结果与 Physionet(WFDB 工具)的 C 包相同。

import numpy as np
    import os
    import math
    import csv
    from itertools import zip_longest
    import time


    DATA_PATH = '/home/quangpc/Desktop/Data/PhysionetData/mitdb/'


    class FreqDomainClass:

        @staticmethod
        def power(freq, mag):
            lo = [0, 0.0033, 0.04, 0.15]
            hi = [0.0033, 0.04, 0.15, 0.4]
            pr = np.zeros(4)
            nbands = 4
            for index in range(0, len(freq)):
                pwr = np.power(mag[index], 2)
                for n in range(0, nbands):
                    if (freq[index] >= lo[n]) and freq[index] <= hi[n]:
                        pr[n] += pwr
                        break
            return pr

        @staticmethod
        def avevar(y):
            var = 0.0
            ep = 0.0
            ave = np.mean(y)
            for i in range(len(y)):
                s = y[i] - ave
                ep += s
                var += s * s
            var = (var - ep * ep / len(y)) / (len(y) - 1)
            return var

        def lomb(self, t, h, ofac, hifac):
            period = max(t) - min(t)
            z = h - np.mean(h)
            f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
            f = f[:int(len(f) / 2) + 1]
            f = np.reshape(f, (len(f), -1))

            w = 2 * np.pi * f
            lenght_t = len(t)
            t = np.reshape(t, (lenght_t, -1))
            t = np.transpose(t)
            tau = np.arctan2(np.sum(np.sin(2 * w * t), axis=1), np.sum(np.cos(2 * w * t), axis=1)) / (2 * w)
            tau = np.diag(tau)
            tau = np.reshape(tau, (len(tau), -1))
            tau = np.tile(tau, (1, lenght_t))
            cos = np.cos(w * (t - tau))
            sin = np.sin(w * (t - tau))
            pc = np.power(np.sum(z * cos, axis=1), 2)
            ps = np.power(np.sum(z * sin, axis=1), 2)
            cs = pc / np.sum(np.power(cos, 2), axis=1)
            ss = ps / np.sum(np.power(sin, 2), axis=1)
            p = cs + ss

            pwr = self.avevar(h)
            nout = len(h)
            p = p / (2 * pwr)
            p = p / (nout / (2.0 * pwr))

            return f, np.sqrt(p)

        def lomb_for(self, t, h, ofac, hifac):
            period = max(t) - min(t)
            f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
            f = f[:int(len(f) / 2) + 1]
            z = h - np.mean(h)
            p = np.zeros(len(f))
            for i in range(len(f)):
                w = 2 * np.pi * f[i]
                if w > 0:
                    twt = 2 * w * t
                    y = sum(np.sin(twt))
                    x = sum(np.cos(twt))
                    tau = math.atan2(y, x) / (2 * w)
                    wtmt = w * (t - tau)
                    cs = np.power(sum(np.multiply(z, np.cos(wtmt))), 2) / sum(np.power((np.cos(wtmt)), 2))
                    ss = np.power(sum(np.multiply(z, np.sin(wtmt))), 2) / sum(np.power((np.sin(wtmt)), 2))
                    p[i] = cs + ss
                else:
                    p[i] = np.power(sum(np.multiply(z, t)), 1) / sum(np.power(t), 1)

            pwr = self.avevar(h)
            nout = len(h)
            p = p / (2 * pwr)
            p = p / (nout / (2.0 * pwr))

            return f, np.sqrt(p)

        def freq_domain(self, time, rr_intervals):
            frequency, mag0 = self.lomb(time, rr_intervals, 4.0, 2.0)
            frequency = np.round(frequency, 8)
            mag0 = mag0 / 2.0
            mag0 = np.round(mag0, 8)
            result = self.power(frequency, mag0)
            return result[0], result[1], result[2], result[3], result[0] + result[1] + result[2] + result[3], \
                   result[2] / result[3]


    def time_domain(time, rr_intervals, ann):
        sum_rr = 0.0
        sum_rr2 = 0.0
        rmssd = 0.0
        totnn = 0
        totnnn = 0
        nrr = 1
        totrr = 1
        nnx = 0
        nnn = 0
        lastann = ann[0]
        lastrr = int(rr_intervals[0])
        lenght = 300
        t = float(time[0])
        end = t + lenght
        i = 0
        ratbuf = np.zeros(2400)
        avbuf = np.zeros(2400)
        sdbuf = np.zeros(2400)
        for x in range(1, len(ann)):
            t = float(time[x])
            while t > (end+lenght):
                i += 1
                end += lenght
            if t >= end:
                if nnn > 1:
                    ratbuf[i] = nnn/nrr
                    sdbuf[i] = np.sqrt(((sdbuf[i] - avbuf[i]*avbuf[i]/nnn) / (nnn-1)))
                    avbuf[i] /= nnn
                i += 1
                nnn = nrr = 0
                end += lenght
            nrr += 1
            totrr += 1
            if ann[x] == 'N' and ann[x-1] == 'N':
                rr_intervals[x] = int(rr_intervals[x])
                nnn += 1
                avbuf[i] += rr_intervals[x]
                sdbuf[i] += (rr_intervals[x] * rr_intervals[x])
                sum_rr += rr_intervals[x]
                sum_rr2 += (rr_intervals[x] * rr_intervals[x])
                totnn += 1
                if lastann == 'N':
                    totnnn += 1
                    rmssd += (rr_intervals[x] - lastrr) * (rr_intervals[x] - lastrr)
                    #  nndif[0] = NNDIF
                    if abs(rr_intervals[x] - lastrr) - 0.05 > (10 ** -10):
                        nnx += 1
            lastann = ann[x-1]
            lastrr = rr_intervals[x]
        if totnn == 0:
            return 0, 0, 0, 0
        sdnn = np.sqrt((sum_rr2 - sum_rr * sum_rr / totnn) / (totnn - 1))
        rmssd = np.sqrt(rmssd/totnnn)
        pnn50 = nnx / totnnn
        if nnn > 1:
            ratbuf[i] = nnn / nrr
            sdbuf[i] = np.sqrt((sdbuf[i] - avbuf[i] * avbuf[i] / nnn) / (nnn - 1))
            avbuf[i] /= nnn
        nb = i + 1
        sum_rr = 0.0
        sum_rr2 = 0.0
        k = 0
        h = 0
        while k < nb:
            if ratbuf[k] != 0:
                h += 1
                sum_rr += avbuf[k]
                sum_rr2 += (avbuf[k] * avbuf[k])
            k += 1
        sdann = np.sqrt((sum_rr2 - sum_rr * sum_rr / h) / (h - 1))
        return sdnn, sdann, rmssd, pnn50


    def get_result_from_get_hrv(filename):
        with open(filename, 'r') as f:
            csv_reader = csv.reader(f, delimiter=',')
            index = 0
            for row in csv_reader:
                if index > 0:
                    output = [s.strip() for s in row[0].split('=') if s]
                    # print('output = ', output)
                    if output[0] == 'SDNN':
                        sdnn = output[1]
                    if output[0] == 'SDANN':
                        sdann = output[1]
                    if output[0] == 'rMSSD':
                        rmssd = output[1]
                    if output[0] == 'pNN50':
                        pnn50 = output[1]
                    if output[0] == 'ULF PWR':
                        ulf = output[1]
                    if output[0] == 'VLF PWR':
                        vlf = output[1]
                    if output[0] == 'LF PWR':
                        lf = output[1]
                    if output[0] == 'HF PWR':
                        hf = output[1]
                    if output[0] == 'TOT PWR':
                        tp = output[1]
                    if output[0] == 'LF/HF':
                        ratio_lf_hf = output[1]

                index += 1

        return float(sdnn), float(sdann), float(rmssd), float(pnn50), float(ulf), float(vlf), \
               float(lf), float(hf), float(tp), float(ratio_lf_hf)


    def save_file():
        extension = "atr"

        result_all = []
        file_process = ['File']
        sdnn_l = ['sdnn']
        sdann_l = ['sdann']
        rmssd_l = ['rmssd']
        pnn50_l = ['pnn50']
        ulf_l = ['ulf']
        vlf_l = ['vlf']
        lf_l = ['lf']
        hf_l = ['hf']
        tp_l = ['tp']
        ratio_lf_hf_l = ['ratio_lf_hf']

        sdnn_l_p = ['sdnn']
        sdann_l_p = ['sdann']
        rmssd_l_p = ['rmssd']
        pnn50_l_p = ['pnn50']
        ulf_l_p = ['ulf']
        vlf_l_p = ['vlf']
        lf_l_p = ['lf']
        hf_l_p = ['hf']
        tp_l_p = ['tp']
        ratio_lf_hf_l_p = ['ratio_lf_hf']

        test_file = ['103',  '113', '117', '121', '123', '200', '202', '210', '212', '213',
                     '219', '221', '213', '228', '231', '233', '234',
                     '101', '106', '108',  '112', '114', '115', '116', '119', '122', '201', '203',
                     '205', '208', '209', '215', '220', '223', '230',
                     '105', '100']
        file_dis = ['109', '111', '118', '124', '207', '214', '232']
        for root, dirs, files in os.walk(DATA_PATH):
            files = np.sort(files)

        for name in files:
            if extension in name:
                if os.path.basename(name[:-4]) not in test_file:
                    continue
                print('Processing file...', os.path.basename(name))

                cur_dir = os.getcwd()
                os.chdir(DATA_PATH)
                os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))

                time_m = []
                rr_intervals = []
                ann = []
                with open(name.split('.')[0] + '.rr', 'r') as rr_file:
                    for line in rr_file:
                        time_m.append(line.split(' ')[0])
                        rr_intervals.append(line.split(' ')[1])
                        ann.append(line.split(' ')[2].split('\n')[0])

                time_m = np.asarray(time_m, dtype=float)
                rr_intervals = np.asarray(rr_intervals, dtype=float)

                sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)

                if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
                    print('No result hrv')
                    file_dis.append(os.path.basename(name[:-4]))
                    continue

                print('sdnn', sdnn)
                print('rmssd', rmssd)
                print('pnn50', pnn50)
                print('sdann', sdann)

                time_m = time_m - time_m[0]
                time_m = np.round(time_m, 3)
                time_nn = []
                nn_intervals = []
                for i in range(1, len(ann)):
                    if ann[i] == 'N' and ann[i - 1] == 'N':
                        nn_intervals.append(rr_intervals[i])
                        time_nn.append(time_m[i])
                time_nn = np.asarray(time_nn, dtype=float)
                nn_intervals = np.asarray(nn_intervals, dtype=float)

                fc = FreqDomainClass()
                ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)

                sdnn_l.append(sdnn)
                sdann_l.append(sdann)
                rmssd_l.append(rmssd)
                pnn50_l.append(pnn50)
                ulf_l.append(ulf)
                vlf_l.append(vlf)
                lf_l.append(lf)
                hf_l.append(hf)
                tp_l.append(tp)
                ratio_lf_hf_l.append(ratio_lf_hf)

                print('ULF PWR: ', ulf)
                print('VLF PWR: ', vlf)
                print('LF PWR: ', lf)
                print('HF PWR: ', hf)
                print('TOT PWR: ', tp)
                print('LF/HF: ', ratio_lf_hf)

                if os.path.exists('physionet_hrv.txt'):
                    os.remove('physionet_hrv.txt')
                os.system('get_hrv -R ' + name.split('.')[0] + '.rr >> ' + 'physionet_hrv.txt')
                sdnn, sdann, rmssd, pnn50, ulf, vlf, lf, hf, tp, ratio_lf_hf = \
                    get_result_from_get_hrv('physionet_hrv.txt')

                file_process.append(os.path.basename(name[:-4]))
                sdnn_l_p.append(sdnn)
                sdann_l_p.append(sdann)
                rmssd_l_p.append(rmssd)
                pnn50_l_p.append(pnn50)
                ulf_l_p.append(ulf)
                vlf_l_p.append(vlf)
                lf_l_p.append(lf)
                hf_l_p.append(hf)
                tp_l_p.append(tp)
                ratio_lf_hf_l_p.append(ratio_lf_hf)

                os.chdir(cur_dir)

        result_all.append(file_process)
        result_all.append(sdnn_l)
        result_all.append(sdnn_l_p)
        result_all.append(sdann_l)
        result_all.append(sdann_l_p)
        result_all.append(rmssd_l)
        result_all.append(rmssd_l_p)
        result_all.append(pnn50_l)
        result_all.append(pnn50_l_p)
        result_all.append(ulf_l)
        result_all.append(ulf_l_p)
        result_all.append(vlf_l)
        result_all.append(vlf_l_p)
        result_all.append(lf_l)
        result_all.append(lf_l_p)
        result_all.append(hf_l)
        result_all.append(hf_l_p)
        result_all.append(tp_l)
        result_all.append(tp_l_p)
        result_all.append(ratio_lf_hf_l)
        result_all.append(ratio_lf_hf_l_p)
        print(file_dis)
        with open('hrv2.csv', 'w+') as f:
            writer = csv.writer(f)
            for values in zip_longest(*result_all):
                writer.writerow(values)


    def main():
        extension = "atr"
        for root, dirs, files in os.walk(DATA_PATH):
            files = np.sort(files)
            for name in files:
                if extension in name:
                    if name not in ['101.atr']:
                        continue
                    cur_dir = os.getcwd()
                    os.chdir(DATA_PATH)
                    os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))
                    time_m = []
                    rr_intervals = []
                    ann = []
                    with open(name.split('.')[0] + '.rr', 'r') as rr_file:
                        for line in rr_file:
                            time_m.append(line.split(' ')[0])
                            rr_intervals.append(line.split(' ')[1])
                            ann.append(line.split(' ')[2].split('\n')[0])
                    time_m = np.asarray(time_m, dtype=float)
                    rr_intervals = np.asarray(rr_intervals, dtype=float)
                    sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)
                    if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
                        print('No result hrv')
                        return 0
                    print('sdnn', sdnn)
                    print('rmssd', rmssd)
                    print('pnn50', pnn50)
                    print('sdann', sdann)

                    time_m = time_m - time_m[0]
                    time_m = np.round(time_m, 3)
                    time_nn = []
                    nn_intervals = []
                    for i in range(1, len(ann)):
                        if ann[i] == 'N' and ann[i - 1] == 'N':
                            nn_intervals.append(rr_intervals[i])
                            time_nn.append(time_m[i])
                    time_nn = np.asarray(time_nn, dtype=float)
                    nn_intervals = np.asarray(nn_intervals, dtype=float)

                    start = time.time()
                    fc = FreqDomainClass()
                    ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)
                    end = time.time()

                    print('ULF PWR: ', ulf)
                    print('VLF PWR: ', vlf)
                    print('LF PWR: ', lf)
                    print('HF PWR: ', hf)
                    print('TOT PWR: ', tp)
                    print('LF/HF: ', ratio_lf_hf)
                    print('finish', end - start)
                    os.chdir(cur_dir)

关于python - 在 Python 中使用 LombScargle 对 HRV 数据进行频谱分析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44863146/

相关文章:

python - 使用 Google App Engine 如何在 google docs (python) 中上传文件

android - android中的吉他音高检测

python - 用于计算邮件日志文件 python 中的电子邮件地址的字典

python - Django:如何在 settings.py 中退出

python - DSP : audio processing : squart or log to leverage fft?

python - 如何估计噪声层后面的高斯分布?

python - python tensorflow信号处理MFCC功能

matlab - 获取麦克风的频率响应

opencv - 使用离散傅立叶变换 [OpenCV] 查找高频

python - 如何确定字符串是否转义为unicode