使用多处理程序在后台运行功能,直到其他功能停止运行

问题描述

我有一些代码(我也在StackOverflow上找到了,并做了一些修改以满足我的需要),这些代码连续不断地从模数转换器读取数据流。我希望能够

  • 通过调用函数轻松开始测量和绘图
  • 连续记录所有传入值
  • 通过调用另一个函数轻松停止测量

今天,我将以下代码重写为一个类,因为在主函数中更易于阅读,该函数还控制其他测量设备,然后尝试使用多处理使测量在后台运行,只是注意到多处理并没有不能使用实例方法。

当前的实现使用线程,但是我想使用多处理,因为代码的不同部分并行运行非常重要,因为代码的其他部分(例如泵)也将必须在其中执行代码。在代码持续运行的同时不断地后台运行。 我要实现的示例实现如下所示:

initialize_measurement_device1(params)
global recorded_data
start_measurement_device1(params) # continuously plot & record incoming data
# start,control,stop other devices such as
pump1 = Pump(params)
valve1 = Valve(params)
pump1.infuse(5,"mL")
valve1.open()
sleep(5)
valve1.close()
# finally stop the measurement
stop_measurement_device1()

我找到的代码(有效)如下:

# Analog data acquisition via National Instruments' cDAQ unit
# The following assumes:

# TODO:
# Implement median filter

# Imports
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
# from nidaqmx import stream_readers  # not needed in this script
# from nidaqmx import stream_writers  # not needed in this script

import threading
import multiprocessing
import pickle
from datetime import datetime
import scipy.io

# Parameters
sampling_freq_in = 1000  # in Hz
buffer_in_size = 100
bufsize_callback = buffer_in_size
buffer_in_size_cfg = round(buffer_in_size * 1)  # clock configuration
chans_in = 1  # set to number of active OPMs (x2 if By and Bz are used,but that is not recommended)
refresh_rate_plot = 10  # in Hz
crop = 10  # number of seconds to drop at acquisition start before saving
my_filename = 'test_3_opms'  # with full path if target folder different from current folder (do not leave trailing /)

# Initialize data placeholders
buffer_in = np.zeros((chans_in,buffer_in_size))
data = np.zeros((chans_in,1))  # will contain a first column with zeros but that's fine

# Definitions of basic functions
def ask_user():
    global running
    input("Press ENTER/RETURN to stop acquisition and coil drivers.")
    running = False

def stop():
    global running
    running = False

def cfg_read_task(acquisition):  # uses above parameters
    acquisition.ai_channels.add_ai_voltage_chan("Dev1/ai0")  # has to match with chans_in
    acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)

def reading_task_callback(task_idx,event_type,num_samples,callback_data):  # bufsize_callback is passed to num_samples
    global data
    global buffer_in

    if running:
        # It may be wiser to read slightly more than num_samples here,to make sure one does not miss any sample,# see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
        buffer_in = np.zeros((chans_in,num_samples))  # double definition ???
        stream_in.read_many_sample(buffer_in,timeout=constants.WAIT_INFINITELY)

        data = np.append(data,buffer_in,axis=1)  # appends buffered data to total variable data

    return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

if __name__ == '__main__':
    # Configure and setup the tasks
    task_in = nidaqmx.Task()
    cfg_read_task(task_in)
    stream_in = AnalogMultiChannelReader(task_in.in_stream)
    task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,reading_task_callback)

    # Start threading to prompt user to stop
    thread_user = threading.Thread(target=ask_user)
    thread_user.start()

    # Main loop
    running = True
    time_start = datetime.now()
    task_in.start()

    # Plot a visual feedback for the user's mental health
    f,ax1 = plt.subplots(1,1,sharex='all',sharey='none')
    while running:  # make this adapt to number of channels automatically
        ax1.clear()
        ax1.plot(data[0,-sampling_freq_in * 5:].T)  # 5 seconds rolling window
        # Label and axis formatting
        ax1.set_xlabel('time [s]')
        ax1.set_ylabel('voltage [V]')
        xticks = np.arange(0,data[0,-sampling_freq_in * 5:].size,sampling_freq_in)
        xticklabels = np.arange(0,xticks.size,1)
        ax1.set_xticks(xticks)
        ax1.set_xticklabels(xticklabels)

        plt.pause(1/refresh_rate_plot)  # required for dynamic plot to work (if too low,nulling performance bad)


    # Close task to clear connection once done
    task_in.close()
    duration = datetime.now() - time_start

    # Some messages at the end
    num_samples_acquired = data[0,:].size
    print("\n")
    print("Acquisition ended.\n")
    print("Acquisition duration: {}.".format(duration))
    print("Acquired samples: {}.".format(num_samples_acquired - 1))


    # Final plot of whole time course the acquisition
    plt.close('all')
    f_tot,sharey='none')
    ...

我将如何以可以在后台开始运行测量并保持运行直到决定终止的方式实施多处理?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...