Source code for brainspy.processors.hardware.drivers.ni.tasks

"""
This file contains drivers to handle nidaqmx.Tasks on different environments, that include regular
National Instrument racks or National Instrument real-time racks.

Both drivers work seamlessly, they declare the following nidaqmx.Task instances:
1. activation_task: It handles sending signals to the (DNPU) device through electrodes
declared as activation electrodes.

2. readout_task: It handles reading signlas comming out from the (DNPU) device from
electrodes declared as readout electrodes.

Both nidaqmx.Task instances will declare a channel per electrode, and in the case of the cdaq
to nidaq connection, they will also declare an extra synchronization channel.
It can alo be used to set the shape variables according to the requiremnets.
"""
import sys
import nidaqmx
import nidaqmx.constants as constants
import nidaqmx.system.device as device

import numpy as np
from datetime import datetime
from brainspy.processors.hardware.drivers.ni.channels import init_channel_data

RANGE_MARGIN = 0.01


[docs] class IOTasksManager: """ Class to initialise and handle the "nidaqmx.Task"s required for brains-py drivers. More information about NI tasks can be found at: https://nidaqmx-python.readthedocs.io/en/latest/task.html """ def __init__(self, configs): """ It declares the following nidaqmx.Task instances: 1. activation_task: It handles sending signals to the (DNPU) device through electrodes declared as activation electrodes. 2. readout_task: It handles reading signlas comming out from the (DNPU) device from electrodes declared as readout electrodes. Both nidaqmx.Task instances will declare a channel per electrode, and in the case of the cdaq to nidaq connection, they will also declare an extra synchronization channel. It can alo be used to set the shape variables according to the requiremnets. These tasks will be updated as the method calls are made to do different types of tasks. It also initializes the device to Acquire or generate a finite number of samples """ assert type(configs) == dict, "The configs should be of type - dict" self.acquisition_type = constants.AcquisitionType.FINITE self.activation_task = None self.readout_task = None try: self.init_tasks(configs) except Exception as e: if not 'devices' in dir(self): self.devices = [] self.close_tasks() raise e
[docs] def init_activation_channels(self, channel_names, voltage_ranges=None): """ Initialises the activation channels connected to the activation electrodes of the device. These are being sent as list of voltage values and channel names which are the start values for device. Parameters ---------- channel_names : list List of the names of the activation channels voltage_ranges : Optional[list] or numpy.ndarray List/numpy.ndarray of maximum and minimum voltage ranges that will be allowed to be sent through each channel. The dimension of the list should be (channel_no,2) where the second dimension stands for min and max values of the range, respectively. When set to None, there will be no specific limitations on what can be sent through the device. This could potentially cause damages to the device. By default is None. """ assert type(channel_names ) == list, "The channel_names should be of type - list" if voltage_ranges is not None: assert type(voltage_ranges) == list or type( voltage_ranges ) == np.ndarray, "The voltage_ranges should be of type - list or numpy array" assert len(channel_names) == len( voltage_ranges ), "The length of channel_names should be equal to the length of voltage ranges" for voltage_range in voltage_ranges: assert type(voltage_range) == list or type( voltage_range ) == np.ndarray, "Each voltage range should be a list of 2 values" assert len( voltage_range ) == 2, "Voltage range should contain 2 values : max and min" assert isinstance( voltage_range[0], (np.floating, float, int) ), "Volatge range can contain only int or float type values" assert isinstance( voltage_range[1], (np.floating, float, int) ), "Volatge range can contain only int or float type values" self.activation_task = nidaqmx.Task( "activation_task_" + datetime.utcnow().strftime("%Y_%m_%d_%H%M%S_%f")) for i in range(len(channel_names)): channel_name = str(channel_names[i]) if voltage_ranges is not None: assert ( voltage_ranges[i][0] > -2 and voltage_ranges[i][0] < 2 ), "Minimum voltage ranges configuration outside of the allowed values -2 and 2" assert ( voltage_ranges[i][1] > -2 and voltage_ranges[i][1] < 2 ), "Maximum voltage ranges configuration outside of the allowed values -2 and 2" self.activation_task.ao_channels.add_ao_voltage_chan( channel_name, min_val=voltage_ranges[i][0].item() - RANGE_MARGIN, max_val=voltage_ranges[i][1].item() + RANGE_MARGIN, ) else: print( "WARNING! READ CAREFULLY THIS MESSAGE. Activation channels have been" + "initialised without a security voltage range, they will be automatically set" + "up to a range between -2 and 2 V. This may result in damaging the device." + "Press ENTER only if you are sure that you want to proceed, otherwise STOP " + "the program. Voltage ranges can be defined in the instruments setup" + "configurations.") input() self.activation_task.ao_channels.add_ao_voltage_chan( channel_name, min_val=-2, max_val=2, )
[docs] def init_readout_channels(self, readout_channels): """ Initializes the readout channels corresponding to the readout electrodes of the device. The range of the readout channels depends on setup​ and on the feedback resistance produced. Parameters ---------- readout_channels : list[str] List containing all the readout channels of the device. """ assert type(readout_channels ) == list, "The readout_channels should be of type - list" for readout_channel in readout_channels: assert type( readout_channel ) == str, "Each readout_channel should be of type - str" self.readout_task = nidaqmx.Task( "readout_task_" + datetime.utcnow().strftime("%Y_%m_%d_%H%M%S_%f")) for i in range(len(readout_channels)): channel = readout_channels[i] self.readout_task.ai_channels.add_ai_voltage_chan(channel)
[docs] def set_sampling_frequencies(self, activation_sampling_frequency: int, readout_sampling_frequency: int, points_to_write: int, points_to_read: int): """ Sets the sampling frequency for the activation and readout tasks. The activation task is in charge to send signals from the computer to the NI module, and the readout task is in charge of reading signals from the NI module. Parameters ---------- activation_sampling_frequency : int The number of samples that the activation task will obtain in one second. readout_sampling_frequency : int The number of samples that the readout task will obtain in one second. points_to_write : int Number of points that will be written. points_to_read: int Number of points that are expected to be read given the number of points to be written, and the activation and readout frequencies. """ assert type( activation_sampling_frequency ) == int, "The activation_sampling_frequency value should be of type - int" assert type( readout_sampling_frequency ) == int, "The readout_sampling_frequency value should be of type - int" assert type( points_to_write ) == int, "The points_to_write value should be of type - int" assert type( points_to_read ) == int, "The points_to_read value should be of type - int" self.activation_task.timing.cfg_samp_clk_timing( activation_sampling_frequency, sample_mode=self.acquisition_type, samps_per_chan=points_to_write, ) self.readout_task.timing.cfg_samp_clk_timing( readout_sampling_frequency, sample_mode=self.acquisition_type, samps_per_chan=points_to_read, )
[docs] def add_synchronisation_channels( self, readout_instrument, activation_instrument, activation_channel_no=7, readout_channel_no=7, ): """ The method is used to add a synchronized activation and readout channel to the device when using cdaq to nidaq devices. Activation channels send signals to the activation electrodes while an additional synchronisation channel is created to communicate with the nidaq readout module. A spike is sent through this synchronisation channel to make the nidaq and the cdaq write and read syncrhonously. Parameters ---------- readout_instrument: str Name of the instrument from which the read will be performed on the readout electrodes. activation_instrument : str Name of the instrument writing signals to the activation electrodes. activation_channel_no : int, optional Channel through which voltages will be sent for activating the device (with both data inputs and control voltages), by default 7. readout_channel_no : int, optional Channel for reading the output current values, by default 7. """ assert type(readout_instrument ) == str, "The readout_instrument should be of type - str" assert type( activation_instrument ) == str, "The actication_instrument should be of type - str" assert type( activation_channel_no ) == int, "The activation_channel_no should be of type - int" assert type(readout_channel_no ) == int, "The readout_channel_no should be of type - int" # Define ao7 as sync signal for the NI 6216 ai0 self.activation_task.ao_channels.add_ao_voltage_chan( activation_instrument + "/ao" + str(activation_channel_no), name_to_assign_to_channel="activation_synchronisation_channel", min_val=-5, max_val=5, ) self.readout_task.ai_channels.add_ai_voltage_chan( readout_instrument + "/ai" + str(readout_channel_no), name_to_assign_to_channel="readout_synchronisation_channel", min_val=-5, max_val=5, )
[docs] def read(self, number_of_samples_per_channel, timeout=None): """ Reads samples from the task or virtual channels you specify. This read method is dynamic, and is capable of inferring an appropriate return type based on these factors: - The channel type of the task. - The number of channels to read. - The number of samples per channel. The data type of the samples returned is independently determined by the channel type of the task. For digital input measurements, the data type of the samples returned is determined by the line grouping format of the digital lines. If the line grouping format is set to “one channel for all lines”, the data type of the samples returned is int. If the line grouping format is set to “one channel per line”, the data type of the samples returned is boolean. If you do not set the number of samples per channel, this method assumes one sample was requested. This method then returns either a scalar (1 channel to read) or a list (N channels to read). If you set the number of samples per channel to ANY value (even 1), this method assumes multiple samples were requested. This method then returns either a list (1 channel to read) or a list of lists (N channels to read). Original documentation from: https://nidaqmx-python.readthedocs.io/en/latest/task.html Parameters ---------- number_of_samples_per_channel : int Specifies the number of samples to read. If this input is not set, assumes samples to read is 1. Conversely, if this input is set, assumes there are multiple samples to read. If you set this input to nidaqmx.constants. READ_ALL_AVAILABLE, NI-DAQmx determines how many samples to read based on if the task acquires samples continuously or acquires a finite number of samples. If the task acquires samples continuously and you set this input to nidaqmx.constants.READ_ALL_AVAILABLE, this method reads all the samples currently available in the buffer. If the task acquires a finite number of samples and you set this input to nidaqmx.constants.READ_ALL_AVAILABLE, the method waits for the task to acquire all requested samples, then reads those samples. If you set the “read_all_avail_samp” property to True, the method reads the samples currently available in the buffer and does not wait for the task to acquire all requested samples. timeout : Optional[float] Specifies the amount of time in seconds to wait for samples to become available. If the time elapses, the method returns an error and any samples read before the timeout elapsed. The default timeout is 10 seconds. If you set timeout to nidaqmx.constants.WAIT_INFINITELY, the method waits indefinitely. If you set timeout to 0, the method tries once to read the requested samples and returns an error if it is unable to. Returns ------- list The samples requested in the form of a scalar, a list, or a list of lists. See method docstring for more info. NI-DAQmx scales the data to the units of the measurement, including any custom scaling you apply to the channels. Use a DAQmx Create Channel method to specify these units. """ if number_of_samples_per_channel is not None: assert ( type(number_of_samples_per_channel) ) == int, "number_of_samples_per_channel should be of type - int" assert (number_of_samples_per_channel > 0 ), "number_of_samples_per_channel value cannot be negative" if timeout is not None: assert type(timeout) == float or type( timeout) == int, "timeout should be of type float or int" assert timeout >= 0, "timeout value cannot be negative" return self.readout_task.read( number_of_samples_per_channel=number_of_samples_per_channel, timeout=timeout)
[docs] def start_trigger(self, trigger_source): """ To synchronise cdaq to cdaq modules a start trigger can be set, in such a way that when a write operation is done, a read operation is triggered. More information can be found in: https://nidaqmx-python.readthedocs.io/en/latest/start_trigger.html Parameters ---------- trigger_source: str Source trigger name. It can be found on the NI max program. Click on the device rack inside devices and interfaces, and then click on the Device Routes tab. """ assert type( trigger_source ) == str, "The name of the trigger source should be of type - str" self.activation_task.triggers.start_trigger.cfg_dig_edge_start_trig( "/" + trigger_source + "/ai/StartTrigger")
[docs] def write(self, y, auto_start): """ This method writes samples to the task or virtual channels specified in the tasks of the tasks driver. It also catches DaqError exceptions. It supports automatic start of tasks. This write method is dynamic, and is capable of accepting the samples to write in the various forms for most operations: Scalar: Single sample for 1 channel. List/1D numpy.ndarray: Multiple samples for 1 channel or 1 sample for multiple channels. List of lists/2D numpy.ndarray: Multiple samples for multiple channels. The data type of the samples passed in must be appropriate for the channel type of the task. For counter output pulse operations, this write method only accepts samples in these forms: Scalar CtrFreq, CtrTime, CtrTick (from nidaqmx.types): Single sample for 1 channel. List of CtrFreq, CtrTime, CtrTick (from nidaqmx.types): Multiple samples for 1 channel or 1 sample for multiple channels. If the task uses on-demand timing, this method returns only after the device generates all samples. On-demand is the default timing type if you do not use the timing property on the task to configure a sample timing type. If the task uses any timing type other than on- demand, this method returns immediately and does not wait for the device to generate all samples. Your application must determine if the task is done to ensure that the device generated all samples. Both of these tasks occur simulataneously and are synchronized. The tasks will start automatically if the "auto_start" option is set to True in the configuration dictionary used to intialize this device. Parameters ---------- y : np.array Contains the samples to be written to the activation task. auto_start : Bool True to enable auto-start from nidaqmx drivers. False to start the tasks immediately after writing. """ assert type( y ) == np.ndarray, "The sample data: y should be of type - numpy array" assert type( auto_start) == bool, "auto_start param should be of type - bool" y = np.require(y, dtype=y.dtype, requirements=["C", "W"]) try: self.activation_task.write(y, auto_start=auto_start) if not auto_start: self.activation_task.start() self.readout_task.start() except nidaqmx.errors.DaqError as error: print("There was an error writing to the activation task: " + self.activation_task.name + "\n" + str(error)) self.close_tasks() sys.exit(1)
[docs] def stop_tasks(self): """ To stop the all tasks on this device namely - the activation tasks and the readout tasks to and from the device. """ self.readout_task.stop() self.activation_task.stop()
[docs] def init_tasks(self, configs): """ To Initialize the tasks on the device based on the configurations dictionary provided. the method initializes the activation and readout tasks from the device by setting the voltage ranges and choosing the instruments that have been specified. Parameters ---------- configs : dict Configs dictionary for the device model The configs should have the following keys: 1. sampling_frequency: int The average number of samples to be obtained in one second, when transforming the signal from analogue to digital. 2. amplification: float The output current (nA) of the device is converted by the readout hardware to voltage (V), because it is easier to do the readout of the device in voltages. This output signal in nA is amplified by the hardware when doing this current to voltage conversion, as larger signals are easier to detect. In order to obtain the real current (nA) output of the device, the conversion is automatically corrected in software by multiplying by the amplification value again. The amplification value depends on the feedback resistance of each of the setups. Below, there is a guide of the amplification value needed for each of the setups: Setup 1 - Darwin: Variable amplification levels: A: 1000 Amplification Feedback resistance: 1 MOhm B: 100 Amplification Feedback resistance 10 MOhms C: 10 Amplification Feedback resistance: 100 MOhms D: 1 Amplification Feedback resistance 1 GOhm Setup 2 - Pinky: - PCB 1 (6 converters with): A. Amplification 10 Feedback resistance 100 MOhm B. PCB 2 (6 converters with): Amplification 100 tims 10 mOhm Feedback resistance Setup 3 - Brains: Amplfication 28.5 Feedback resistance, 33.3 MOhm Setup 4 - Switch: (Information to be completed) If no correction is desired, the amplification can be set to 1. 3. instruments_setup: 3.1 multiple_devices: boolean False will initialise the drivers to read from a single hardware DNPU. True, will enable to read from more than one DNPU device at the same time. 3.2 activation_instrument: str Name of the activation instrument as observed in the NI Max software. E.g., cDAQ1Mod3 3.3 activation_channels: list Channels through which voltages will be sent for activating the device (both data inputs and control voltage electrodes). The channels can be checked in the schematic of the DNPU device. E.g., [8,10,13,11,7,12,14] 3.4 activation_voltage_ranges: list Minimum and maximum voltage for the activation electrodes. E.g., [[-1.2, 0.6], [-1.2, 0.6], [-1.2, 0.6], [-1.2, 0.6], [-1.2, 0.6], [-0.7, 0.3], [-0.7, 0.3]] 3.5 readout_instrument: str Name of the readout instrument as observed in the NI Max software. E.g., cDAQ1Mod4 3.6 readout_channels: [2] list Channels for reading the output current values. The channels can be checked in the schematic of the DNPU device. 3.7 trigger_source: str For synchronisation purposes, sending data for the activation voltages on one NI Task can trigger the readout device of another NI Task. In these cases,the trigger source name should be specified in the configs. This is only applicable for CDAQ to CDAQ setups (with or without real-time rack). E.g., cDAQ1/segment1 More information at https://nidaqmx-python.readthedocs.io/en/latest/start_trigger.html 4. plateau_length: float - Length of the plateau that is being sent through the forward call of the HardwareProcessor slope_length : float - Length of the slopes in the waveforms sent to the device through the drivers Returns ------- list list of voltage ranges """ assert type(configs) == dict, "The configs should be of type - dict" self.configs = configs ( self.activation_channel_names, self.readout_channel_names, instruments, self.voltage_ranges, ) = init_channel_data(configs) devices = [] for instrument in instruments: devices.append(device.Device(name=instrument)) self.devices = devices # TODO: add a maximum and a minimum to the activation channels self.init_activation_channels(self.activation_channel_names, self.voltage_ranges) self.init_readout_channels(self.readout_channel_names) return self.voltage_ranges.tolist()
[docs] def close_tasks(self): """ Close all the task on this device - both activation and readout tasks by deleting them. Note - This method is different from the stop_tasks() method which only stops the current tasks temporarily. """ if self.readout_task is not None: self.readout_task.close() del self.readout_task self.readout_task = None if self.activation_task is not None: self.activation_task.close() del self.activation_task self.activation_task = None for dev in self.devices: dev.reset_device()