Source code for xpipeline.waveform.xinjectsignal

from xpipeline.core.xdetector import Detector
from .xwaveform import xmakewaveform
from gwpy.timeseries import TimeSeries
from xpipeline.core.xtimeseries import XTimeSeries
from gwpy.table import Table
from astropy.table import hstack

import numpy
import math

[docs]def xinjectsignal(family, start_time, block_time, detectors, sample_rate, phi, theta, psi=0, **kwargs): """Create simulated signals using a file defining the parameters Parameters: family (str): waveform family name to be generated start_time (int): Start time of the data being analysed. block_time (int): Duration (s) of the data being analysed. channel (str) channel[j] holds the name of the jth detector channel being analysed. The detector name is inferred from the first character of the channel name, and must be one of the names recognized by LoadDetectorData. sample_rate (int): Sample rates corresponding to channels. phi (float): Earth fixed coordinate theta (float): Earth fixed coordinate **kwargs: catalog_directory (optional, str): Path to directory containing pregenerated waveforms Returns: injection_data: `XTimeSeries` Notes: The coordinate system is explained in ComputeAntennaResponse. It is assumed that the sky positions in the injection file are supplied in this coordinate system. For glitch injections the reported times and angles are those for the injection into the first detector. Any portion of any signal which lies outside the interval [start_time, start_time+block_time] will be clipped. Otherwise, XINJECTSIGNAL will inject any signal of duration up to block_time without clipping. The edges of long-duration signals will be tapered with a hann window to avoid turn-on/off transients. """ # ---- Check for optional arguments. catalogdirectory = kwargs.pop('catalogdirectory', '') parameters = kwargs.pop('parameters', []) # ----- Speed of light (m/s). c = 299792458 # ----- Parse channel list and load info on corresponding detectors. injection_data = XTimeSeries() for det in detectors: #----- Make timeseries data for given detector. injection_data[det] = TimeSeries(numpy.zeros(int(block_time * sample_rate)), dx= 1./sample_rate, name=det) # create detector instance detector = Detector(det) #----- Time delay for incoming signal wrt center of Earth delay = detector.time_delay_from_earth_center_phi_theta( [phi], [theta]) #----- Compute antenna response, including polarization angle. [Fp, Fc, Fb, F1, F2, FL] = detector.compute_antenna_response( [phi], [theta], [psi]) snippet_pad = block_time snippet_duration = 2*snippet_pad + 1 #----- Peak Time of signal relative to start_time. peak_time = start_time + delay peak_time_for_waveform = snippet_pad + peak_time - math.floor(peak_time) #----- Waveform plus and cross polarizations, in wave frame. [t,hp,hc,hb] = xmakewaveform(family=family, parameters=parameters, T=snippet_duration, T0=peak_time_for_waveform[0], fs=sample_rate, catalogdirectory=catalogdirectory) # ---- Apply taper to start and end of snippet to avoid # discontinuous turn-on/off of very long signals. # Only taper if there is an signal to taper if numpy.where(hp != 0)[0].size: hp = hp.taper('right') if numpy.where(hc != 0)[0].size: hc = hc.taper('right') if numpy.where(hb != 0)[0].size: hb = hb.taper('right') #----- Reset t to GPS time. t = t + math.floor(peak_time) - snippet_pad #----- Sample indices. injection_samples = numpy.arange(0, t.size) #----- keep only samples which fall inside the desired time interval. k = numpy.where( (t>=start_time) & (t<start_time + block_time) )[0] injection_samples = injection_samples[k] t = t[k] hp = hp[k] hc = hc[k] hb = hb[k] #----- Combine with antenna response to give signal. injection_samples = (injection_samples + numpy.round( (math.floor(peak_time) - snippet_pad - start_time) *sample_rate) ).astype(int) injection_data[det][injection_samples] = Fp*hp + Fc*hc + Fb*hb injection_data[det].t0 = start_time return injection_data
[docs]def xinjectsignal_fromfile(start_time, block_time, channels, sample_rate, injection_file_name, rescale_by_antenna_response=False, injection_number=0, **kwargs): """Create simulated signals using a file defining the parameters Parameters: start_time (int): Start time of the data being analysed. block_time (int): Duration (s) of the data being analysed. channel (str) channel[j] holds the name of the jth detector channel being analysed. The detector name is inferred from the first character of the channel name, and must be one of the names recognized by LoadDetectorData. sample_rate (int): Sample rates corresponding to channels. injection_file_name (str): Name of file specifying simulated signals to be injected. injection_number Array (positive integer). Optional. If given then only the injections specified on rows "injection_number" of the injection file are injected. This over-rides the default behaviour, which is to inject all injections in the file (including those outside the interval [start_time,start_time+block_time]!). **kwargs: catalog_directory (optional, str): Path to directory containing pregenerated waveforms Returns: injection_data: `XTimeSeries` gps_s: Vector of peak time (GPS seconds, integer part) of each injection at the center of the Earth. gps_ns: Vector of peak time (GPS seconds, nanosecond part) of each injection at the center of the Earth. phi: Vector of azimuthal sky coordinate of each injection (radians, Earth-fixed coordinates). theta: Vector of polar sky coordinate of each injection psi: Vector of polarization angle of each injection (radians Earth-fixed coordinates). Notes: The coordinate system is explained in ComputeAntennaResponse. It is assumed that the sky positions in the injection file are supplied in this coordinate system. For glitch injections the reported times and angles are those for the injection into the first detector. Any portion of any signal which lies outside the interval [start_time, start_time+block_time] will be clipped. Otherwise, XINJECTSIGNAL will inject any signal of duration up to block_time without clipping. The edges of long-duration signals will be tapered with a hann window to avoid turn-on/off transients. """ ifos = [ifo.split(':')[0] for ifo in channels] injection_number = int(injection_number) # ---- Check for optional arguments. do_not_inject = kwargs.pop('do_not_inject', 0) catalogdirectory = kwargs.pop('catalogdirectory', '') # ----- Speed of light (m/s). c = 299792458 # ----- Parse channel list and load info on corresponding detectors. detector_dict = {} for det in ifos: detector_dict[det] = Detector(det) # We must check if there are detector specific injection # parameters injection_file_parameters = Table.read(injection_file_name, format='ascii',) _injection_file_columns = ['gps_s','gps_ns', 'phi', 'theta', 'psi', 'gwb_type', 'gwb_params'] if len(list(injection_file_parameters.keys())) > 7: # ----- Create appropriate columns for reading the injection # file based on detectors supplied above injection_file_columns = [] for det in ifos: for columns in _injection_file_columns: injection_file_columns.append(columns + '_' + det) #----- Read injection file and extract parameters of injections injection_file_parameters = Table.read(injection_file_name, format='ascii', names=injection_file_columns) else: # There is a single set of parameters for ever detector injection_file_parameters = Table() for det in ifos: injection_file_columns = [] for columns in _injection_file_columns: injection_file_columns.append(columns + '_' + det) injection_file_parameters = hstack((injection_file_parameters, Table.read(injection_file_name, format='ascii',names=injection_file_columns))) #----- If specific injections are specified then keep only those injections. injection_file_parameters = injection_file_parameters[injection_number] #----- Compute sum over detectors of Fp^2 for each injection, if desired. """ if (rescale_by_antenna_response) totalInjectedPower = zeros(nInjection,1) #----- Loop over simulated signals and record peak time, sky angles. for jInjection=1:nInjection #----- Use angles for first detector. phi = str2num(currentParameters{3}{jInjection}) theta = str2num(currentParameters{4}{jInjection}) psi = str2num(currentParameters{5}{jInjection}) #----- Compute Fp for each detector. for jDetector=1:nDetector Fp = ComputeAntennaResponse(phi,theta,psi,detector{jDetector}.d) totalInjectedPower(jInjection) = ... totalInjectedPower(jInjection) + Fp^2 end end end """ #----- Loop over detectors and construct simulated signals. injection_data = {} for channel, det in zip(channels, ifos): #----- Make timeseries data for given detector. injection_data[channel] = TimeSeries(numpy.zeros(int(block_time * sample_rate)), dx= 1./sample_rate, name=det) #---------- Extract parameters for current injection. gps_s = injection_file_parameters['gps_s_' + det] gps_ns = injection_file_parameters['gps_ns_' + det] phi = injection_file_parameters['phi_' + det] theta = injection_file_parameters['theta_' + det] psi = injection_file_parameters['psi_' + det] gwb_type = injection_file_parameters['gwb_type_' + det] gwb_params = injection_file_parameters['gwb_params_' + det] #----- Time delay for incoming signal wrt center of Earth delay = detector_dict[det].time_delay_from_earth_center_phi_theta( [phi], [theta]) #----- Compute antenna response, including polarization angle. [Fp, Fc, Fb, F1, F2, FL] = detector_dict[det].compute_antenna_response( [phi], [theta], [psi]) """ #----- Rescale amplitude by sum_over_detectors of Fp^2, if desired: if (rescale_by_antenna_response): Fp = Fp / sqrt(totalInjectedPower(jInjection)) Fc = Fc / sqrt(totalInjectedPower(jInjection)) end """ #----- Make injection data in a several-second snippet. This will # work for any signal with duration less than snippet_pad for # longer signals some clipping may occur. Clipping will # definitely occur for injections longer than snippet_duration. snippet_pad = block_time snippet_duration = 2*snippet_pad + 1 #----- Peak Time of signal relative to start_time. peak_time = gps_s + 1e-9 * gps_ns + delay peak_time_for_waveform = snippet_pad + peak_time - math.floor(peak_time) #----- Waveform plus and cross polarizations, in wave frame. [t,hp,hc,hb] = xmakewaveform(family=gwb_type, parameters=gwb_params, T=snippet_duration, T0=peak_time_for_waveform[0], fs=sample_rate, catalogdirectory=catalogdirectory) # ---- Apply taper to start and end of snippet to avoid # discontinuous turn-on/off of very long signals. # Only taper if there is an signal to taper #if numpy.where(hp != 0)[0].size: # hp.taper() #if numpy.where(hc != 0)[0].size: # hc.taper() #if numpy.where(hb != 0)[0].size: # hb.taper() #----- Reset t to GPS time. t = t + math.floor(peak_time) - snippet_pad #----- Sample indices. injection_samples = numpy.arange(0, t.size) #----- keep only samples which fall inside the desired time interval. k = numpy.where( (t>=start_time) & (t<start_time + block_time) )[0] injection_samples = injection_samples[k] t = t[k] hp = hp[k] hc = hc[k] hb = hb[k] #----- Combine with antenna response to give signal. injection_samples = (injection_samples + numpy.round( (math.floor(peak_time) - snippet_pad - start_time) *sample_rate) ).astype(int) injection_data[channel][injection_samples] = Fp*hp + Fc*hc + Fb*hb injection_data[channel].t0 = start_time #----- Record for output the peak time and sky angles for each of the # injections. For glitches record only the parameters for the first # detector. #----- Loop over simulated signals and record peak time, sky angles. #---------- Extract parameters for current injection. gps_s = injection_file_parameters['gps_s_' + ifos[0]] gps_ns = injection_file_parameters['gps_ns_' + ifos[0]] phi = injection_file_parameters['phi_' + ifos[0]] theta = injection_file_parameters['theta_' + ifos[0]] psi = injection_file_parameters['psi_' + ifos[0]] #----- Done return XTimeSeries(injection_data), gps_s, gps_ns, phi, theta, psi