Source code for xpipeline.core.xtimeseries

# -*- coding: utf-8 -*-
# Copyright (C) Scott Coughlin (2017-)
#
# This file is part of the XPypeline python package.
#
# hveto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# hveto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with hveto.  If not, see <http://www.gnu.org/licenses/>.

# ---- Import standard modules to the python path.
import numpy

from gwpy.timeseries import TimeSeriesDict
from gwpy.timeseries import TimeSeries
from gwpy.signal import filter_design
from .xfrequencyseries import XFrequencySeriesDict
from .xtimefrequencymap import XTimeFrequencyMapDict, XTimeFrequencyMap
from collections import OrderedDict

__author__ = 'Scott Coughlin <scott.coughlin@ligo.org>'
__all__ = ['XTimeSeries']

[docs]class XTimeSeries(TimeSeriesDict):
[docs] @classmethod def retrieve_data(cls, event_time, block_time, channel_names, sample_frequency, **kwargs): """Obtain data for either on source, off source, or injections. This uses the gwpy `TimeSeriesDict.get` method Parameters ---------- event_time : (`float) trigger time of event to be processing block_time : (`int`) length of data to be processed channel_names (`list`) : required data channels. sample_frequency (`int`): sample rate of the data desired frame_types : `dict`, optional key channel and value frametype in whcih the channel is stored verbose : `bool`, optional print verbose output about NDS progress. Returns: `TimeSeriesDict` : """ # Default is you do not want a timeslided dictionary # of timeseries time_slides = kwargs.pop('time_slides', {det : 0 for det in channel_names}) if not isinstance(time_slides, dict): raise ValueError("time_slides must be supplied " "in the form of a dict " "with key channel and value integer " "time to slide start and stop time") # If frametype is None then make dictionary # of key channel and value None frame_types = kwargs.pop('frame_types', {det : None for det in channel_names}) if not isinstance(frame_types, dict): raise ValueError("frame_types must be supplied " "in the form of a dict " "with key channel and value frametype " "assosciated with that channel") #----- Start and stop time for this event. start_time = event_time - block_time / 2; stop_time = event_time + block_time / 2; try: data = cls() for det in channel_names: data.append({det : TimeSeries.get(det, start_time + time_slides[det], stop_time + time_slides[det], frametype=frame_types[det], **kwargs ) }) except: # Retrieve data useing the get classmethod data = cls() for det in channel_names: data.append({det : TimeSeries.fetch_open_data( det.split(':')[0], start_time + time_slides[det], stop_time + time_slides[det], sample_rate=sample_frequency, **kwargs ) }) for key, series in list(data.items()): if series.sample_rate.decompose().value != sample_frequency: data[key] = series.resample(sample_frequency) # set one of the detectors to be the reference detecotr # for any future coherent combinations return XTimeSeries(data)
[docs] @classmethod def generate_data(cls, event_time, block_time, channel_names, sample_frequency, verbose=False): """Obtain data for either on source, off source, or injections. This uses the gwpy `TimeSeriesDict.get` method Parameters ---------- event_time : (`float) trigger time of event to be processing block_time : (`int`) length of data to be processed channel_names (`list`) : required data channels. sample_frequency (`int`): sample rate of the data desired verbose : `bool`, optional print verbose output about NDS progress. Returns: `TimeSeriesDict` : """ #----- Start and stop time for this event. start_time = event_time - block_time / 2; stop_time = event_time + block_time / 2; # Retrieve data and then resample and set epoch data = {} for det in channel_names: data[det] = TimeSeries(numpy.random.normal(scale=.1, size=sample_frequency*block_time), sample_rate=sample_frequency) data = XTimeSeries(data) for (idet, iseries) in list(data.items()): # set epoch of timeseries to the event_time iseries.t0 = start_time # set one of the detectors to be the reference detecotr # for any future coherent combinations return data
[docs] @classmethod def generate_noise_from_file(cls, file_name, event_time, block_time, channel_names, sample_frequency, verbose=False): """Obtain data for either on source, off source, or injections. This uses the gwpy `TimeSeriesDict.get` method Parameters ---------- event_time : (`float) trigger time of event to be processing block_time : (`int`) length of data to be processed channel_names (`list`) : required data channels. sample_frequency (`int`): sample rate of the data desired verbose : `bool`, optional print verbose output about NDS progress. Returns: `TimeSeriesDict` : """ #----- Start and stop time for this event. start_time = event_time - block_time / 2; stop_time = event_time + block_time / 2; # Retrieve data and then resample and set epoch data = {} for det in channel_names: data[det] = TimeSeries.read(file_name, path='/noise/aLIGOZeroDetHighPower') data = XTimeSeries(data) for (idet, iseries) in list(data.items()): # set epoch of timeseries to the event_time iseries.t0 = start_time # set one of the detectors to be the reference detecotr # for any future coherent combinations return data
[docs] def asd(self, fftlength, **kwargs): """Obtain the asd of items in this dict. Parameters ---------- fftlength : `dict`, `float` either a `dict` of (channel, `float`) pairs for key-wise asd calc, or a single float/int to compute as of all items. **kwargs other keyword arguments to pass to each item's asd method. """ asds = XFrequencySeriesDict() if not isinstance(fftlength, dict): fftlength = dict((c, fftlength) for c in self) for key, fftlen in list(fftlength.items()): asds[key] = self[key].asd(fftlength=fftlen, overlap=fftlen*0.5, method='lal_median_mean' ) return asds
[docs] def whiten(self, asds): """White this `XTimeSeries` against its own ASD Parameters ---------- asds : `dict` a `dict` of (channel, `XFrequencySeries`) pairs for key-wise whitened of the timeseries """ if not isinstance(asds, dict): raise ValueError("asds must be supplied in the form of a dict") whitened_timeseries = XTimeSeries() for (idet, iseries) in list(self.items()): dt = 1./asds[idet].dx whitened = iseries.whiten(fftlength=dt, overlap=dt*0.5, asd=asds[idet]) whitened_timeseries.append( {idet : whitened} ) return whitened_timeseries
[docs] def highpass(self, frequency, corruption=2): """Design a high-pass filter. Parameters ---------- fhigh : `float` high-pass corner frequency """ for k, v in list(self.items()): ts_hp = v.highpass(frequency) self[k] = ts_hp.crop(*ts_hp.span.contract(corruption)) return self
[docs] def spectrogram(self, fftlength): """Obtain the spectrograms of items in this dict. Parameters ---------- fftlength : `dict`, `float` either a `dict` of (channel, `float`) pairs for key-wise asd calc, or a single float/int to computer as of all items. **kwargs other keyword arguments to pass to each item's asd method. """ tfmaps = XTimeFrequencyMapDict() if not isinstance(fftlength, dict): fftlength = dict((c, fftlength) for c in self) for (idet, iseries) in list(self.items()): tfmaps[idet] = XTimeFrequencyMap(iseries.spectrogram2( fftlength=fftlength[idet], overlap=0.5*fftlength[idet], window='hann')) return tfmaps
[docs] def fftgram(self, fftlength): """Obtain the spectrograms of items in this dict. Parameters ---------- fftlength : `dict`, `float` either a `dict` of (channel, `float`) pairs for key-wise asd calc, or a single float/int to computer as of all items. **kwargs other keyword arguments to pass to each item's asd method. """ tfmaps = XTimeFrequencyMapDict() if not isinstance(fftlength, dict): fftlength = dict((c, fftlength) for c in self) for (idet, iseries) in list(self.items()): tfmaps[idet] = XTimeFrequencyMap(iseries.fftgram( fftlength=fftlength[idet], overlap=0.5*fftlength[idet], window='hann')) return tfmaps
[docs] def inject(self, injection_data, **kwargs): """Take an injection time series and inject into `XTimeSeries` This is essentially a wrapper arounf the very useful `gwpy.timeseries.TimeSeries.inject` method Parameters ---------- injection_data : `XTimeSeries`, A `XtimeSeries` containing the same keys as the `XTimeSeries` you are injecting into i.e. if you are injecting into an `XTimeSeries` with 'H1', 'L1' and 'V1' data then your injeciton data should have the same keys **kwargs other keyword arguments to pass to each item's asd method. """ injection_timeseries = XTimeSeries() for (idet, iseries) in list(self.items()): injection_timeseries.append( {idet : iseries.inject(injection_data[idet])} ) return injection_timeseries