python - 对音频流使用多处理

标签 python audio stream multiprocessing python-sounddevice

我想连续录音和录音。这很容易使用 python 的 sounddevice 模块完成。但是,当音频超过 20 帧时,我还想开始将 block 发送到在后台工作的线程。当我这样做时,声音设备的输入溢出,你能帮我解决这个问题,还是找到其他解决方案?

def callbackAmbient(indata, frames, time, status):
    if (status):
        print(status)
    VadFrames.append(indata)        

    if (len(VadFrames) > 19):
        Process(target = start_vad, args = (numpy.array(numpy.multiply(VadFrames, 0.5)), numpy.array(VadFrames), RATE, RATE)).start()
        VadFrames.pop(0)

print("System Recording...")

try:
    with sd.InputStream(samplerate=192000, blocksize=6144, channels=1, device=sd.query_devices(kind='input')['name'], callback=callbackAmbient):
        while True: 
            pass
except KeyboardInterrupt:
    print("System stopped...")
    sys.exit()

最佳答案

您可以创建 Stream 的实例,并将其作为参数传递给 python 线程,该线程将负责数据管理、抛出错误或任何您想要的。

audio_stream = AudioStream() #create an audio stream w. my soundcard
websocket = Websocket(audio_stream) #create a websocket connection passing actual stream
websocket.start()

audio_stream.run()

Websocket 类在这里负责管理音频流和前端之间的错误(请注意,在这种情况下我使用了 websocket,但您可以简单地选择另一种通信协议(protocol)):
class Websocket(threading.Thread):
    def __init__(self, stream):
        super().__init__()
        self.audio_stream = stream

    def run(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        start_server = websockets.serve(self.handler, "localhost", 8083)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

    #implementation.....

您在这篇文章中提出的逻辑应该在 AudioStream 类 中实现初始化 为了工作的方法

关于python - 对音频流使用多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56517696/

相关文章:

java - 在 Scala/Java 中连接 MP3

java - 能否提供有关如何使用 Java 中的 FMJ 或 JMF 流式传输音频文件的示例

python - Python 之禅 'Explicit is better than implicit'

javascript - Click 事件不会在 Chrome 中的音频元素上触发

java - 如何将音频文件播放到麦克风输入

c++ - 使用 ios_base::register_callback() 和 ios_base::event 检测流关闭

java.io.IOException : Stream closed using BouncyCaSTLe

python从两个列表中创建一个新列表,第二个列表表示重复第一个列表中的元素多少次

Python单元测试: respond to failure within the test method

python - TensorFlow - 无法保存检查点 : Python exit code 139