Python scipy.signal 模块,welch() 实例源码
我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用scipy.signal.welch()。
def getPSD( df , dw = 0.05, roverlap = 0.5, window='hanning', detrend='constant') :
"""
Compute the power spectral density
"""
if type(df) == pd.Series : df = pd.DataFrame(df)
nfft = int ( (2*pi / dw) / dx(df) )
nperseg = 2**int(log(nfft)/log(2))
noverlap = nperseg * roverlap
""" Return the PSD of a time signal """
try :
from scipy.signal import welch
except :
raise Exception("Welch function not found,please install scipy > 0.12")
data = []
for iSig in range(df.shape[1]) :
test = welch( df.values[:,iSig] , fs = 1. / dx(df) , window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density')
data.append( test[1] / (2*pi) )
xAxis = test[0][:] * 2*pi
return pd.DataFrame( data = np.transpose(data), index = xAxis , columns = [ "psd("+ str(x) +")" for x in df.columns ] )
def compute_power_spectral_density(self, windowed_signal):
# Windowed signal of shape [channel][sample] [16 x 12000]
ret = []
# Welch parameters
sliding_window = 512
overlap = 0.25
n_overlap = int(sliding_window * overlap)
# compute psd using Welch method
freqs, power = signal.welch(windowed_signal, fs=SAMPLING_FREQUENCY,
nperseg=sliding_window, noverlap=n_overlap)
for psd_freq in PSD_FREQ:
tmp = (freqs >= psd_freq[0]) & (freqs < psd_freq[1])
ret.append(power[:,tmp].mean(1))
return(np.log(np.array(ret) / np.sum(ret, axis=0)))
def get(self, x, **kwargs):
"""
"""
# Check size of x:
if len(x.shape) == 1:
x = x.reshape(1, len(x), 1)
elif len(x.shape) == 2:
x = x[np.newaxis, ...]
self._nelec, self._npts, self._ntrials = x.shape
# Split x in window:
if self._window is not None:
x = np.transpose(
np.array([x[:, k[0]:k[1], :] for k in self._window]), (
1, 2, 0, 3))
# Compute PSD:
return welch(x, fs=self._sf, axis=1, **kwargs)
def getPSD(self, ens):
"""
Calculate and return average power spectral density for ensemble
`ens` using Welch's method
"""
print('in getpsd,ens=', ens, self)
if not self.segyfile:
raise Exception("File not opened")
enstrcs = [t for t in self.segyfile.traces if t.header.original_field_record_number == ens]
psds = [sig.welch(t.data, fs=1/self.dt, scaling='spectrum') for t in enstrcs]
# psds is [(f,psd),(f,psd)...]
freqs = psds[0][0]
psdonly = [p for (f, p) in psds]
psdonly = np.array(psdonly).transpose()
psdavg = np.mean(psdonly, 1)
print(freqs.shape, psdavg.shape)
return(freqs, psdavg)
def plot_psd(X, label, Fs, nfft, color=None):
freqs, psd = welch(X, fs=Fs, nperseg=nfft,
noverlap=int(nfft * 0.8))
freqs = freqs[freqs > 0]
psd = psd[freqs > 0]
plt.plot(np.log10(freqs), 10 * np.log10(psd.ravel()), label=label,
color=color)
###############################################################################
# Now we read in the data
#
# Then we plot the power spectrum of the MEG and reference channels,
# apply the reference correction and add the resulting cleaned MEG channels
# to our comparison.
def cwplot(fb_est,rx,t,fs:int,fn) -> None:
#%% time
fg,axs = subplots(1,2,figsize=(12,6))
ax = axs[0]
ax.plot(t, rx.T.real)
ax.set_xlabel('time [sec]')
ax.set_ylabel('amplitude')
ax.set_title('Noisy,jammed receive signal')
#%% periodogram
if DTPG >= (t[-1]-t[0]):
dt = (t[-1]-t[0])/4
else:
dt = DTPG
dtw = 2*dt # seconds to window
tstep = ceil(dt*fs)
wind = ceil(dtw*fs);
nfft = zeropadfactor*wind
f,Sraw = signal.welch(rx.ravel(), fs,
nperseg=wind,noverlap=tstep,nfft=nfft,
return_onesided=False)
if np.iscomplex(rx).any():
f = np.fft.fftshift(f); Sraw = np.fft.fftshift(Sraw)
ax=axs[1]
ax.plot(f,Sraw,'r',label='raw signal')
fc_est = f[Sraw.argmax()]
#ax.set_yscale('log')
ax.set_xlim([fc_est-200,fc_est+200])
ax.set_xlabel('frequency [Hz]')
ax.set_ylabel('amplitude')
ax.legend()
esttxt=''
if fn is None: # simulation
ax.axvline(ft+fb0,color='red',linestyle='--',label='true freq.')
esttxt += f'true: {ft+fb0} Hz '
for e in fb_est:
ax.axvline(e,color='blue',label='est. freq.')
esttxt += ' est: ' + str(fb_est) +' Hz'
ax.set_title(esttxt)
def _psd_welch(x, sfreq, fmin=0, fmax=np.inf, n_fft=256, n_overlap=0,
n_jobs=1):
"""Compute power spectral density (PSD) using Welch's method.
x : array,shape=(...,n_times)
The data to compute PSD from.
sfreq : float
The sampling frequency.
fmin : float
The lower frequency of interest.
fmax : float
The upper frequency of interest.
n_fft : int
The length of the tapers ie. the windows. The smaller
it is the smoother are the PSDs. The default value is 256.
If ``n_fft > len(inst.times)``,it will be adjusted down to
``len(inst.times)``.
n_overlap : int
The number of points of overlap between blocks. Will be adjusted
to be <= n_fft. The default value is 0.
n_jobs : int
Number of cpus to use in the computation.
Returns
-------
psds : ndarray,shape (...,n_freqs) or
The power spectral densities. All dimensions up to the last will
be the same as input.
freqs : ndarray,shape (n_freqs,)
The frequencies.
"""
from scipy.signal import welch
dshape = x.shape[:-1]
n_times = x.shape[-1]
x = x.reshape(-1, n_times)
# Prep the PSD
n_fft, n_overlap = _check_nfft(n_times, n_fft, n_overlap)
win_size = n_fft / float(sfreq)
logger.info("Effective window size : %0.3f (s)" % win_size)
freqs = np.arange(n_fft // 2 + 1, dtype=float) * (sfreq / n_fft)
freq_mask = (freqs >= fmin) & (freqs <= fmax)
freqs = freqs[freq_mask]
# Parallelize across first N-1 dimensions
parallel, my_pwelch, n_jobs = parallel_func(_pwelch, n_jobs=n_jobs)
x_splits = np.array_split(x, n_jobs)
f_psd = parallel(my_pwelch(d, noverlap=n_overlap, nfft=n_fft,
fs=sfreq, freq_mask=freq_mask,
welch_fun=welch)
for d in x_splits)
# Combining/reshaping to original data shape
psds = np.concatenate(f_psd, axis=0)
psds = psds.reshape(np.hstack([dshape, -1]))
return psds, freqs