#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Documentation:
This is a script containing general helper functions.
"""
import numpy as np
import os
import stat
import shutil
import copy
import h5py
from mpi4py import MPI
###################################
# Initialization of MPI stuff #
###################################
COMM = MPI.COMM_WORLD
SIZE = COMM.Get_size()
RANK = COMM.Get_rank()
#######################################
### DATA I/O ###
#######################################
[docs]def read_gdf(fname, skiprows=0):
"""
Fast line-by-line gdf-file reader.
Parameters
----------
fname : str
Path to gdf-file.
skiprows : int
number of skipped rows
Returns
-------
numpy.ndarray
([gid, val0, val1, **]), dtype=object) mixed datatype array
"""
gdf_file = open(fname, 'r')
gdf = []
for i, l in enumerate(gdf_file):
if i < skiprows:
continue
data = l.split()
gdf += [data]
gdf = np.array(gdf, dtype=object)
if gdf.size > 0:
gdf[:, 0] = gdf[:, 0].astype(int)
gdf[:, 1:] = gdf[:, 1:].astype(float)
return np.array(gdf)
[docs]def write_gdf(gdf, fname):
"""
Fast line-by-line gdf-file write function
Parameters
----------
gdf : numpy.ndarray
Column 0 is gids, columns 1: are values.
fname : str
Path to gdf-file.
Returns
-------
None
"""
gdf_file = open(fname, 'w')
for line in gdf:
for i in np.arange(len(line)):
gdf_file.write(str(line[i]) + '\t')
gdf_file.write('\n')
return None
[docs]def load_h5_data(path='', data_type='LFP', y=None, electrode=None,
warmup=0., scaling=1.):
"""
Function loading results from hdf5 file
Parameters
----------
path : str
Path to hdf5-file
data_type : str
Signal types in ['CSD' , 'LFP', 'CSDsum', 'LFPsum'].
y : None or str
Name of population.
electrode : None or int
TODO: update, electrode is NOT USED
warmup : float
Lower cutoff of time series to remove possible transients
scaling : float,
Scaling factor for population size that determines the amount of loaded
single-cell signals
Returns
----------
numpy.ndarray
[electrode id, compound signal] if `y` is None
numpy.ndarray
[cell id, electrode, single-cell signal] otherwise
"""
assert y is not None or electrode is not None
if y is not None:
f = h5py.File(os.path.join(path, '%s_%ss.h5' % (y, data_type)))
data = f['data'][()][:, :, warmup:]
if scaling != 1.:
np.random.shuffle(data)
num_cells = int(len(data) * scaling)
data = data[:num_cells, :, warmup:]
else:
f = h5py.File(os.path.join(path, '%ssum.h5' % data_type))
data = f['data'][()][:, warmup:]
return data
[docs]def dump_dict_of_nested_lists_to_h5(fname, data):
"""
Take nested list structure and dump it in hdf5 file.
Parameters
----------
fname : str
Filename
data : dict(list(numpy.ndarray))
Dict of nested lists with variable len arrays.
Returns
-------
None
"""
# Open file
print('writing to file: %s' % fname)
f = h5py.File(fname)
# Iterate over values
for i, ivalue in list(data.items()):
igrp = f.create_group(str(i))
for j, jvalue in enumerate(ivalue):
jgrp = igrp.create_group(str(j))
for k, kvalue in enumerate(jvalue):
if kvalue.size > 0:
dset = jgrp.create_dataset(str(k), data=kvalue,
compression='gzip')
else:
dset = jgrp.create_dataset(str(k), data=kvalue,
maxshape=(None, ),
compression='gzip')
# Close file
f.close()
[docs]def load_dict_of_nested_lists_from_h5(fname, toplevelkeys=None):
"""
Load nested list structure from hdf5 file
Parameters
----------
fname : str
Filename
toplevelkeys : None or iterable,
Load a two(default) or three-layered structure.
Returns
-------
dict(list(numpy.ndarray))
dictionary of nested lists with variable length array data.
"""
# Container:
data = {}
# Open file object
f = h5py.File(fname, 'r')
# Iterate over partial dataset
if toplevelkeys is not None:
for i in toplevelkeys:
ivalue = f[str(i)]
data[i] = []
for j, jvalue in enumerate(ivalue.values()):
data[int(i)].append([])
for k, kvalue in enumerate(jvalue.values()):
data[i][j].append(kvalue.value)
else:
for i, ivalue in list(f.items()):
i = int(i)
data[i] = []
for j, jvalue in enumerate(ivalue.values()):
data[i].append([])
for k, kvalue in enumerate(jvalue.values()):
data[i][j].append(kvalue.value)
# Close dataset
f.close()
return data
[docs]def setup_file_dest(params, clearDestination=True):
"""
Function to set up the file catalog structure for simulation output
Parameters
----------
params : object
e.g., `cellsim16popsParams.multicompartment_params()`
clear_dest : bool
Savefolder will be cleared if already existing.
Returns
-------
None
"""
if RANK == 0:
if not os.path.isdir(params.savefolder):
os.mkdir(params.savefolder)
assert(os.path.isdir(params.savefolder))
else:
if clearDestination:
print('removing folder tree %s' % params.savefolder)
while os.path.isdir(params.savefolder):
try:
os.system('find %s -delete' % params.savefolder)
except BaseException:
shutil.rmtree(params.savefolder)
os.mkdir(params.savefolder)
assert(os.path.isdir(params.savefolder))
if not os.path.isdir(params.sim_scripts_path):
print('creating %s' % params.sim_scripts_path)
os.mkdir(params.sim_scripts_path)
if not os.path.isdir(params.cells_path):
print('creating %s' % params.cells_path)
os.mkdir(params.cells_path)
if not os.path.isdir(params.figures_path):
print('creating %s' % params.figures_path)
os.mkdir(params.figures_path)
if not os.path.isdir(params.populations_path):
print('creating %s' % params.populations_path)
os.mkdir(params.populations_path)
try:
if not os.path.isdir(params.raw_nest_output_path):
print('creating %s' % params.raw_nest_output_path)
os.mkdir(params.raw_nest_output_path)
except BaseException:
pass
if not os.path.isdir(params.spike_output_path):
print('creating %s' % params.spike_output_path)
os.mkdir(params.spike_output_path)
for f in ['cellsim16popsParams.py',
'cellsim16pops.py',
'example_brunel.py',
'brunel_alpha_nest.py',
'mesocircuit.sli',
'mesocircuit_LFP_model.py',
'binzegger_connectivity_table.json',
'nest_simulation.py',
'microcircuit.sli']:
if os.path.isfile(f):
if not os.path.exists(
os.path.join(
params.sim_scripts_path,
f)):
print(
'copying {} as {}'.format(
f, os.path.join(
params.sim_scripts_path, f)))
shutil.copy(f, os.path.join(params.sim_scripts_path, f))
os.chmod(os.path.join(params.sim_scripts_path, f),
stat.S_IREAD)
print('done preparing file destinations')
COMM.Barrier()
#######################################
### GENERAL ###
#######################################
[docs]def calculate_fft(data, tbin):
"""
Function to calculate the Fourier transform of data.
Parameters
----------
data : numpy.ndarray
1D or 2D array containing time series.
tbin : float
Bin size of time series (in ms).
Returns
-------
freqs : numpy.ndarray
Frequency axis of signal in Fourier space.
fft : numpy.ndarray
Signal in Fourier space.
"""
if len(np.shape(data)) > 1:
n = len(data[0])
return np.fft.fftfreq(n, tbin * 1e-3), np.fft.fft(data, axis=1)
else:
n = len(data)
return np.fft.fftfreq(n, tbin * 1e-3), np.fft.fft(data)
#######################################
### DATA MANIPULATION ###
#######################################
[docs]def centralize(data, time=False, units=False):
"""
Function to subtract the mean across time and/or across units from data
Parameters
----------
data : numpy.ndarray
1D or 2D array containing time series, 1st index: unit, 2nd index: time
time : bool
True: subtract mean across time.
units : bool
True: subtract mean across units.
Returns
-------
numpy.ndarray
1D or 0D array of centralized signal.
"""
assert(time is not False or units is not False)
res = copy.copy(data)
if time is True:
res = np.array([x - np.mean(x) for x in res])
if units is True:
res = np.array(res - np.mean(res, axis=0))
return res
[docs]def normalize(data):
"""
Function to normalize data to have mean 0 and unity standard deviation
(also called z-transform)
Parameters
----------
data : numpy.ndarray
Returns
-------
numpy.ndarray
z-transform of input array
"""
data = data.astype(float)
data -= data.mean()
return data / data.std()
#######################################
### FILTER ###
#######################################
[docs]def movav(y, Dx, dx):
"""
Moving average rectangular window filter:
calculate average of signal y by using sliding rectangular
window of size Dx using binsize dx
Parameters
----------
y : numpy.ndarray
Signal
Dx : float
Window length of filter.
dx : float
Bin size of signal sampling.
Returns
-------
numpy.ndarray
Filtered signal.
"""
if Dx <= dx:
return y
else:
ly = len(y)
r = np.zeros(ly)
n = np.int(np.round((Dx / dx)))
r[0:np.int(n / 2.)] = 1.0 / n
r[-np.int(n / 2.)::] = 1.0 / n
R = np.fft.fft(r)
Y = np.fft.fft(y)
yf = np.fft.ifft(Y * R)
return yf
#######################################
### CORRELATION ANALYSIS ###
#######################################
[docs]def mean(data, units=False, time=False):
"""
Function to compute mean of data
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time
units : bool
Average over units
time : bool
Average over time
Returns
-------
if units=False and time=False:
error
if units=True:
1 dim numpy.ndarray; time series
if time=True:
1 dim numpy.ndarray; series of unit means across time
if units=True and time=True:
float; unit and time mean
Examples
--------
>>> mean(np.array([[1, 2, 3], [4, 5, 6]]), units=True)
array([ 2.5, 3.5, 4.5])
>>> mean(np.array([[1, 2, 3], [4, 5, 6]]), time=True)
array([ 2., 5.])
>>> mean(np.array([[1, 2, 3], [4, 5, 6]]), units=True,time=True)
3.5
"""
assert(units is not False or time is not False)
if units is True and time is False:
return np.mean(data, axis=0)
elif units is False and time is True:
return np.mean(data, axis=1)
elif units is True and time is True:
return np.mean(data)
[docs]def compound_mean(data):
"""
Compute the mean of the compound/sum signal.
Data is first summed across units and averaged across time.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time
Returns
-------
float
time-averaged compound/sum signal
Examples
--------
>>> compound_mean(np.array([[1, 2, 3], [4, 5, 6]]))
7.0
"""
return np.mean(np.sum(data, axis=0))
[docs]def variance(data, units=False, time=False):
"""
Compute the variance of data across time, units or both.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time.
units : bool
Variance across units
time : bool
Average over time
Returns
----------
if units=False and time=False:
Exception
if units=True:
1 dim numpy.ndarray; time series
if time=True:
1 dim numpy.ndarray; series of single unit variances across time
if units=True and time=True:
float; mean of single unit variances across time
Examples
----------
>>> variance(np.array([[1, 2, 3],[4, 5, 6]]), units=True)
array([ 2.25, 2.25, 2.25])
>>> variance(np.array([[1, 2, 3], [4, 5, 6]]), time=True)
array([ 0.66666667, 0.66666667])
>>> variance(np.array([[1, 2, 3], [4, 5, 6]]), units=True, time=True)
0.66666666666666663
"""
assert(units is not False or time is not False)
if units is True and time is False:
return np.var(data, axis=0)
elif units is False and time is True:
return np.var(data, axis=1)
elif units is True and time is True:
return np.mean(np.var(data, axis=1))
[docs]def compound_variance(data):
"""
Compute the variance of the compound/sum signal.
Data is first summed across units, then the variance across time
is calculated.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time
Returns
-------
float
variance across time of compound/sum signal
Examples
--------
>>> compound_variance(np.array([[1, 2, 3], [4, 5, 6]]))
2.6666666666666665
"""
return np.var(np.sum(data, axis=0))
[docs]def powerspec(data, tbin, Df=None, units=False, pointProcess=False):
"""
Calculate (smoothed) power spectra of all timeseries in data.
If units=True, power spectra are averaged across units.
Note that averaging is done on power spectra rather than data.
If pointProcess is True, power spectra are normalized by the length T
of the time series.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time.
tbin : float
Binsize in ms.
Df : float/None,
Window width of sliding rectangular filter (smoothing),
None is no smoothing.
units : bool
Average power spectrum.
pointProcess : bool
If set to True, powerspectrum is normalized to signal length T.
Returns
-------
freq : tuple
numpy.ndarray of frequencies.
POW : tuple
if units=False:
2 dim numpy.ndarray; 1st axis unit, 2nd axis frequency
if units=True:
1 dim numpy.ndarray; frequency series
Examples
--------
>>> powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df)
Out[1]: (freq,POW)
>>> POW.shape
Out[2]: (2,len(analog_sig1))
>>> powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df, units=True)
Out[1]: (freq,POW)
>>> POW.shape
Out[2]: (len(analog_sig1),)
"""
freq, DATA = calculate_fft(data, tbin)
df = freq[1] - freq[0]
T = tbin * len(freq)
POW = np.abs(DATA) ** 2
if Df is not None:
POW = [movav(x, Df, df) for x in POW]
cut = int(Df / df)
freq = freq[cut:]
POW = np.array([x[cut:] for x in POW])
POW = np.abs(POW)
assert(len(freq) == len(POW[0]))
if units is True:
POW = mean(POW, units=units)
assert(len(freq) == len(POW))
if pointProcess:
POW *= 1. / T * 1e3 # Normalization, power independent of T
return freq, POW
[docs]def compound_powerspec(data, tbin, Df=None, pointProcess=False):
"""
Calculate the power spectrum of the compound/sum signal.
data is first summed across units, then the power spectrum is calculated.
If pointProcess=True, power spectra are normalized by the length T of
the time series.
Parameters
----------
data : numpy.ndarray,
1st axis unit, 2nd axis time
tbin : float,
binsize in ms
Df : float/None,
window width of sliding rectangular filter (smoothing),
None -> no smoothing
pointProcess : bool,
if set to True, powerspectrum is normalized to signal length T
Returns
-------
freq : tuple
numpy.ndarray of frequencies
POW : tuple
1 dim numpy.ndarray, frequency series
Examples
--------
>>> compound_powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df)
Out[1]: (freq,POW)
>>> POW.shape
Out[2]: (len(analog_sig1),)
"""
return powerspec([np.sum(data, axis=0)], tbin, Df=Df, units=True,
pointProcess=pointProcess)
[docs]def crossspec(data, tbin, Df=None, units=False, pointProcess=False):
"""
Calculate (smoothed) cross spectra of data.
If `units`=True, cross spectra are averaged across units.
Note that averaging is done on cross spectra rather than data.
Cross spectra are normalized by the length T of the time series -> no
scaling with T.
If pointProcess=True, power spectra are normalized by the length T of the
time series.
Parameters
----------
data : numpy.ndarray,
1st axis unit, 2nd axis time
tbin : float,
binsize in ms
Df : float/None,
window width of sliding rectangular filter (smoothing),
None -> no smoothing
units : bool,
average cross spectrum
pointProcess : bool,
if set to True, cross spectrum is normalized to signal length T
Returns
-------
freq : tuple
numpy.ndarray of frequencies
CRO : tuple
if `units`=True: 1 dim numpy.ndarray; frequency series
if `units`=False:3 dim numpy.ndarray; 1st axis first unit,
2nd axis second unit, 3rd axis frequency
Examples
--------
>>> crossspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df)
Out[1]: (freq,CRO)
>>> CRO.shape
Out[2]: (2,2,len(analog_sig1))
>>> crossspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df, units=True)
Out[1]: (freq,CRO)
>>> CRO.shape
Out[2]: (len(analog_sig1),)
"""
N = len(data)
if units is True:
# smoothing and normalization take place in powerspec
# and compound_powerspec
freq, POW = powerspec(data, tbin, Df=Df, units=True)
freq_com, CPOW = compound_powerspec(data, tbin, Df=Df)
assert(len(freq) == len(freq_com))
assert(np.min(freq) == np.min(freq_com))
assert(np.max(freq) == np.max(freq_com))
CRO = 1. / (1. * N * (N - 1.)) * (CPOW - 1. * N * POW)
assert(len(freq) == len(CRO))
else:
freq, DATA = calculate_fft(data, tbin)
T = tbin * len(freq)
df = freq[1] - freq[0]
if Df is not None:
cut = int(Df / df)
freq = freq[cut:]
CRO = np.zeros((N, N, len(freq)), dtype=complex)
for i in range(N):
for j in range(i + 1):
tempij = DATA[i] * DATA[j].conj()
if Df is not None:
tempij = movav(tempij, Df, df)[cut:]
CRO[i, j] = tempij
CRO[j, i] = CRO[i, j].conj()
assert(len(freq) == len(CRO[0, 0]))
if pointProcess:
CRO *= 1. / T * 1e3 # normalization
return freq, CRO
[docs]def compound_crossspec(a_data, tbin, Df=None, pointProcess=False):
"""
Calculate cross spectra of compound signals.
a_data is a list of datasets (a_data = [data1,data2,...]).
For each dataset in a_data, the compound signal is calculated
and the crossspectra between these compound signals is computed.
If pointProcess=True, power spectra are normalized by the length T of the
time series.
Parameters
----------
a_data : list of numpy.ndarrays
Array: 1st axis unit, 2nd axis time.
tbin : float
Binsize in ms.
Df : float/None,
Window width of sliding rectangular filter (smoothing),
None -> no smoothing.
pointProcess : bool
If set to True, crossspectrum is normalized to signal length `T`
Returns
-------
freq : tuple
numpy.ndarray of frequencies.
CRO : tuple
3 dim numpy.ndarray; 1st axis first compound signal, 2nd axis second
compound signal, 3rd axis frequency.
Examples
--------
>>> compound_crossspec([np.array([analog_sig1, analog_sig2]),
np.array([analog_sig3,analog_sig4])], tbin, Df=Df)
Out[1]: (freq,CRO)
>>> CRO.shape
Out[2]: (2,2,len(analog_sig1))
"""
a_mdata = []
for data in a_data:
a_mdata.append(np.sum(data, axis=0)) # calculate compound signals
return crossspec(np.array(a_mdata), tbin, Df, units=False,
pointProcess=pointProcess)
[docs]def autocorrfunc(freq, power):
"""
Calculate autocorrelation function(s) for given power spectrum/spectra.
Parameters
----------
freq : numpy.ndarray
1 dimensional array of frequencies.
power : numpy.ndarray
2 dimensional power spectra, 1st axis units, 2nd axis frequencies.
Returns
-------
time : tuple
1 dim numpy.ndarray of times.
autof : tuple
2 dim numpy.ndarray; autocorrelation functions, 1st axis units,
2nd axis times.
"""
tbin = 1. / (2. * np.max(freq)) * 1e3 # tbin in ms
time = np.arange(-len(freq) / 2. + 1, len(freq) / 2. + 1) * tbin
# T = max(time)
multidata = False
if len(np.shape(power)) > 1:
multidata = True
if multidata:
N = len(power)
autof = np.zeros((N, len(freq)))
for i in range(N):
raw_autof = np.real(np.fft.ifft(power[i]))
mid = int(len(raw_autof) / 2.)
autof[i] = np.hstack([raw_autof[mid + 1:], raw_autof[:mid + 1]])
assert(len(time) == len(autof[0]))
else:
raw_autof = np.real(np.fft.ifft(power))
mid = int(len(raw_autof) / 2.)
autof = np.hstack([raw_autof[mid + 1:], raw_autof[:mid + 1]])
assert(len(time) == len(autof))
# autof *= T*1e-3 # normalization is done in powerspec()
return time, autof
[docs]def crosscorrfunc(freq, cross):
"""
Calculate crosscorrelation function(s) for given cross spectra.
Parameters
----------
freq : numpy.ndarray
1 dimensional array of frequencies.
cross : numpy.ndarray
2 dimensional array of cross spectra, 1st axis units, 2nd axis units,
3rd axis frequencies.
Returns
-------
time : tuple
1 dim numpy.ndarray of times.
crossf : tuple
3 dim numpy.ndarray, crosscorrelation functions,
1st axis first unit, 2nd axis second unit, 3rd axis times.
"""
tbin = 1. / (2. * np.max(freq)) * 1e3 # tbin in ms
time = np.arange(-len(freq) / 2. + 1, len(freq) / 2. + 1) * tbin
# T = max(time)
multidata = False
# check whether cross contains many cross spectra
if len(np.shape(cross)) > 1:
multidata = True
if multidata:
N = len(cross)
crossf = np.zeros((N, N, len(freq)))
for i in range(N):
for j in range(N):
raw_crossf = np.real(np.fft.ifft(cross[i, j]))
mid = int(len(raw_crossf) / 2.)
crossf[i, j] = np.hstack(
[raw_crossf[mid + 1:], raw_crossf[:mid + 1]])
assert(len(time) == len(crossf[0, 0]))
else:
raw_crossf = np.real(np.fft.ifft(cross))
mid = int(len(raw_crossf) / 2.)
crossf = np.hstack([raw_crossf[mid + 1:], raw_crossf[:mid + 1]])
assert(len(time) == len(crossf))
# crossf *= T*1e-3 # normalization happens in cross spectrum
return time, crossf
[docs]def corrcoef(time, crossf, integration_window=0.):
"""
Calculate the correlation coefficient for given auto- and crosscorrelation
functions. Standard settings yield the zero lag correlation coefficient.
Setting integration_window > 0 yields the correlation coefficient of
integrated auto- and crosscorrelation functions. The correlation
coefficient between a zero signal with any other signal is defined as 0.
Parameters
----------
time : numpy.ndarray
1 dim array of times corresponding to signal.
crossf : numpy.ndarray
Crosscorrelation functions, 1st axis first unit, 2nd axis second unit,
3rd axis times.
integration_window: float
Size of the integration window.
Returns
-------
cc : numpy.ndarray
2 dim array of correlation coefficient between two units.
"""
N = len(crossf)
cc = np.zeros(np.shape(crossf)[:-1])
tbin = abs(time[1] - time[0])
lim = int(integration_window / tbin)
if len(time) % 2 == 0:
mid = len(time) / 2 - 1
else:
mid = np.floor(len(time) / 2.)
for i in range(N):
ai = np.sum(crossf[i, i][mid - lim:mid + lim + 1])
offset_autoi = np.mean(crossf[i, i][:mid - 1])
for j in range(N):
cij = np.sum(crossf[i, j][mid - lim:mid + lim + 1])
offset_cross = np.mean(crossf[i, j][:mid - 1])
aj = np.sum(crossf[j, j][mid - lim:mid + lim + 1])
offset_autoj = np.mean(crossf[j, j][:mid - 1])
if ai > 0. and aj > 0.:
cc[i, j] = (cij - offset_cross) / np.sqrt((ai - offset_autoi) *
(aj - offset_autoj))
else:
cc[i, j] = 0.
return cc
[docs]def coherence(freq, power, cross):
"""
Calculate frequency resolved coherence for given power- and crossspectra.
Parameters
----------
freq : numpy.ndarray
Frequencies, 1 dim array.
power : numpy.ndarray
Power spectra, 1st axis units, 2nd axis frequencies.
cross : numpy.ndarray,
Cross spectra, 1st axis units, 2nd axis units, 3rd axis frequencies.
Returns
-------
freq: tuple
1 dim numpy.ndarray of frequencies.
coh: tuple
ndim 3 numpy.ndarray of coherences, 1st axis units, 2nd axis units,
3rd axis frequencies.
"""
N = len(power)
coh = np.zeros(np.shape(cross))
for i in range(N):
for j in range(N):
coh[i, j] = cross[i, j] / np.sqrt(power[i] * power[j])
assert(len(freq) == len(coh[0, 0]))
return freq, coh
[docs]def cv(data, units=False):
"""
Calculate coefficient of variation (cv) of data. Mean and standard
deviation are computed across time.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time.
units : bool
Average `cv`.
Returns
-------
numpy.ndarray
If units=False, series of unit `cv`s.
float
If units=True, mean `cv` across units.
Examples
--------
>>> cv(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]))
array([ 0.48795004, 0.63887656])
>>> cv(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]), units=True)
0.56341330073710316
"""
mu = mean(data, time=True)
var = variance(data, time=True)
cv = np.sqrt(var) / mu
if units is True:
return np.mean(cv)
else:
return cv
[docs]def fano(data, units=False):
"""
Calculate fano factor (FF) of data. Mean and variance are computed across
time.
Parameters
----------
data : numpy.ndarray
1st axis unit, 2nd axis time.
units : bool
Average `FF`.
Returns
-------
numpy.ndarray
If units=False, series of unit FFs.
float
If units=True, mean FF across units.
Examples
--------
>>> fano(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]))
array([ 0.83333333, 1.9047619 ])
>>> fano(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]), units=True)
1.3690476190476191
"""
mu = mean(data, time=True)
var = variance(data, time=True)
ff = var / mu
if units is True:
return np.mean(ff)
else:
return ff
if __name__ == "__main__":
import doctest
doctest.testmod()