# -*- coding: utf-8 -*-
# Copyright (C) Scott Coughlin (2017-)
# This file is part of the XPypeline python package.
# hveto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# hveto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with hveto. If not, see <http://www.gnu.org/licenses/>.
# ---- Import standard modules to the python path.
import numpy
from collections import OrderedDict
from gwpy.frequencyseries import FrequencySeries
import copy
__author__ = 'Scott Coughlin <scott.coughlin@ligo.org>'
__all__ = ['XFrequencySeriesDict',
SQRT_OF_NEG_1 = numpy.sqrt(numpy.array([-1],dtype=complex))
[docs]class XFrequencySeriesDict(OrderedDict):
[docs] def project_onto_antenna_patterns(self, antenna_responses,
"""Shift timeseries by assosciated time delay
antenna_responses : `dict`
key-wise pair of
OrderedDict([('H1', array([-0.02424373])),
('L1', array([0.3089992]))])),
OrderedDict([('H1', array([-0.5677237])),
('L1', array([0.52872644]))])),
OrderedDict([('H1', array([0.12427263])),
('L1', array([-0.30016348]))]))])
to_dominant_polarization_frame " `bool`
This boolean determines whether or not to calculate the
relevant angle parameter that would project the data into
the orthogonal cross plus polarization frame.
antenna_response_asds = XAntennaProjectedFrequencySeriesDict()
for pattern, responses in list(antenna_responses.items()):
antenna_weighted_asds = XFrequencySeriesDict()
for ifo, asd in list(self.items()):
abbr_ifo = ifo.split(':')[0]
antenna_weighted_asds[ifo] = responses[abbr_ifo] / asd
antenna_response_asds[pattern] = antenna_weighted_asds
if to_dominant_polarization_frame:
wfp = antenna_response_asds['f_plus'].to_array()
wfc = antenna_response_asds['f_cross'].to_array()
FpDP, FcDP, psi = convert_to_dominant_polarization_frame(wfp, wfc)
tmp = copy.deepcopy(antenna_response_asds)
for detidx, idet in enumerate(tmp['f_plus']):
antenna_response_asds['f_plus'][idet] = (
numpy.cos(2*psi[:,detidx]) * tmp['f_plus'][idet] +
numpy.sin(2*psi[:,detidx]) * tmp['f_cross'][idet]
antenna_response_asds['f_cross'][idet] = (
-numpy.sin(2*psi[:,detidx]) * tmp['f_plus'][idet] +
numpy.cos(2*psi[:,detidx]) * tmp['f_cross'][idet]
if circular:
return antenna_response_asds
[docs] def to_array(self):
"""Convert to number of freq bins by number of detectors array
number_of_frequencies = list(self.values())[0].size
number_of_detectors = len(self)
array = numpy.zeros([number_of_frequencies, number_of_detectors])
for idx, asd in enumerate(self.values()):
array[:, idx] = asd
return array
[docs] def calculate_magnitude(self):
"""Matrix M_AB components.
This is the dot product of the projected_asds, with
itself, summed accross detectors.
Units Hz
return numpy.sqrt(sum([v**2 for v in list(self.values())]))
[docs] def slice_frequencies(self, indices):
"""select a subset of frequencies from XFrequencySeriesDict
indices (array):
an array of indexs to select from all elements
of `XFrequencySeriesDict`
asd_subset = XFrequencySeriesDict()
for det, asd in list(self.items()):
asd_subset[det] = self[det][indices]
return asd_subset
[docs]class XAntennaProjectedFrequencySeriesDict(OrderedDict):
"""Controls a dictionaries of projected asds
[docs] def calculate_magnitude(self):
"""Find unit vector of a series of asds
asds_magnitude = XFrequencySeriesDict()
for k, v in list(self.items()):
asds_magnitude[k] = v.calculate_magnitude()
return asds_magnitude
[docs] def to_unit(self):
"""Find unit vector of a series of asds
for k, v in list(self.items()):
unit_dpf_asds = XFrequencySeriesDict()
for k1, v1 in list(v.items()):
unit_dpf_asds[k1] = v1 / v.calculate_magnitude()
self[k] = unit_dpf_asds
return self
[docs] def to_circular(self):
Find unit vector of a series of asds
# Initialize projected circular keys
self['f_left'] = XFrequencySeriesDict()
self['f_right'] = XFrequencySeriesDict()
self['f_left_null'] = XFrequencySeriesDict()
self['f_right_null'] = XFrequencySeriesDict()
# calculate the magntiude squared of the plus and cross projected asds
f_plus_magnitude_squared = numpy.square(self['f_plus'].calculate_magnitude())
f_cross_magnitude_squared = numpy.square(self['f_cross'].calculate_magnitude())
for f_plus, f_cross in zip(list(self['f_plus'].values()), list(self['f_cross'].values())):
self['f_left'][f_plus.name] = f_plus + SQRT_OF_NEG_1 * f_cross
self['f_right'][f_plus.name] = f_plus - SQRT_OF_NEG_1 * f_cross
self['f_left_null'][f_plus.name] = f_plus / f_plus_magnitude_squared - SQRT_OF_NEG_1 * f_cross / f_cross_magnitude_squared
self['f_right_null'][f_plus.name] = f_plus / f_plus_magnitude_squared + SQRT_OF_NEG_1 * f_cross / f_cross_magnitude_squared
return self
[docs]def convert_to_dominant_polarization_frame(Fp, Fc):
"""Take in stream of fplus and ≈f_cross and convert to DPF
DPF is the Dominant Polarization Frame
Fp : `float`
Fc : `float`
# ---- Compute rotation needed to reach DP frame.
psi = numpy.zeros([len(Fp), 1])
psi[:, 0] = 1/4*numpy.arctan(2*(numpy.sum(Fp*Fc, 1))/(
numpy.sum(Fp*Fp, 1)-numpy.sum(Fc*Fc, 1))
psi = psi.repeat(Fp.shape[1], 1)
# ---- Rotate to DP frame.
FpDP = numpy.cos(2*psi)*Fp + numpy.sin(2*psi)*Fc
FcDP = -numpy.sin(2*psi)*Fp + numpy.cos(2*psi)*Fc
# ---- Further rotate polarization by pi/4 if |Fp|<|Fc|.
swapindex = (FpDP**2).sum(1) < (FcDP**2).sum(1)
FpDP[swapindex, :] = FcDP[swapindex, :]
FcDP[swapindex, :] = -FpDP[swapindex, :]
psi[swapindex, :] = psi[swapindex, :] + numpy.pi/4
return FpDP, FcDP, psi