后台运行Python多进程音频

问题描述

我正在尝试用基本的UI编写一个简单的数字合成器。目前,它会创建具有所需频率和幅度的正弦波发生器,然后在发生器上无限循环,将块发送到pyaudio流。这样可以产生稳定的音调,听起来不错。在我的简单UI上,是音符频率的滑块,而声音循环无限进行,很显然,由于控件无限循环,我无法与UI交互以更改音符。如果我尝试在UI_update循环中而不是在其自己的无限循环中运行音频流,则它会变得很混乱,因为它花费很短的时间进行音符更改和UI渲染的计算,听起来很糟糕。

我的解决方案是简单地将音频渲染循环发送到多进程进程,并在后台运行无限循环。然后,在移动滑块时,取消该过程并进行新的过程以弹奏新音符。我已经完成了此代码功能,可以看到它创建了播放声音的过程,进行了调试,可以看到它执行了声音创建功能,可以更新UI并看着它使过程循环,但是没有声音产生。我知道声音创建功能可以正常工作,因为我可以将其从流程中删除并将其放置在主循环中,并且会发出良好的声音,由于无限循环,我无法与UI交互。

我在使用python 3.8.5的Mac Catalina 10.15.6上,pySimpleGUI是我正在使用的GUI库,而pyaudio是我正在玩的GUI库。有没有办法为我的计算机提供多进程处理音频控制?我是否可以使用更好的设计来获得平稳的声音,同时仍然能够连续改变声音?

这是我代码的相关部分:

def sin_wave(frequency=440.0,samplerate=44100,amplitude=0.5):
  period = int(samplerate / frequency)
  amplitude = clip_amplitude(amplitude)
  lookup = [float(amplitude) * math.sin(2.0*math.pi*float(frequency)*(float(i%period)/float(samplerate))) for i in range(period)]
  return (lookup[i%period] for i in count(0))

def grouper(n,iterable,fillvalue=None):
  # "grouper(3,'ABCDEFG','x') --> zip_longest((A,B,C),(D,E,F),(G,x,x))"
  args = [iter(iterable)] * n
  return zip_longest(fillvalue=fillvalue,*args)

def compute_samples(channels,nsamples=None):
  return islice(zip(*(map(sum,zip(*channel)) for channel in channels)),nsamples)

def update_samples(values):
  channel = ()
  for i in range(50):
    if f'Osc{i}Freq' in values and f'Osc{i}Vol' in values:
      # construct sound,then convert tuple to list,append our sound,back to tuple
      sound = sin_wave(frequency=values[f'Osc{i}Freq'],amplitude=values[f'Osc{i}Vol'])
      # operations cannot be combined or NoneType error
      channel = list(channel)
      channel.append(sound)
      channel = tuple(channel)

  # mono for now 
  channels = (
    channel,channel
  )
  return compute_samples(channels)

def write_audiostream(stream,samples,nchannels=2,sampwidth=2,framerate=44100,bufsize=2048):
  max_amplitude = float(int((2 ** (sampwidth * 8)) / 2) - 1) / 100

  logging.info('test')
  for chunk in grouper(bufsize,samples):
    frames = b''.join(b''.join(struct.pack('h',int(max_amplitude * sample)) for sample in channels) for channels in chunk if channels is not None)
    stream.write(frames)

def main():
  num_channels = 2
  sampwidth = 2
  framerate = 44100

  pya = pyaudio.PyAudio()
  stream = pya.open(format = pya.get_format_from_width(width=sampwidth),channels=num_channels,rate=framerate,output=True)
  ### GUI
  layout = [
    [
      sg.Text('Oscilator 1'),sg.Slider(
        key="Osc1Freq",range=(20.0,500.0),default_value=440.0,orientation='h'),sg.Slider(
        key="Osc1Vol",range=(0.0,1.0),default_value=0.5,resolution=0.01,orientation='h')
    ],[
      sg.Text('Oscilator 2'),sg.Slider(
        key="Osc2Freq",sg.Slider(
        key="Osc2Vol",[sg.Output(size=(250,50))],[sg.Submit(),sg.Cancel()]
  ]

  window = sg.Window('Shit',layout)

  event_old,values_old = None,None
  while True:
    event,values = window.read(timeout = 50)
    if event in (None,'Exit','Cancel'):
      break

    if values_old != values:
      print(event,values)
      event_old,values_old = event,values

      samples = update_samples(values)
      # sound functions here,it just takes control away while in infinite loop
      # write_audiostream(stream,samples)

      # no sound plays if executed in a Process
      if not 'audio_proc' in locals():
        audio_proc = multiprocess.Process(target=write_audiostream,args=(stream,samples),daemon=True)
        audio_proc.start()
      else:
        audio_proc.terminate()
        audio_proc = multiprocess.Process(target=write_audiostream,daemon=True)
        audio_proc.start()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...