Source code for voeventparse.convenience

"""Convenience routines for common actions on VOEvent objects"""

from __future__ import absolute_import

from collections import OrderedDict
from copy import deepcopy

import astropy.time
import iso8601
import lxml
import pytz
from voeventparse.misc import (Position2D)
from orderedmultidict import omdict as OMDict


[docs]def get_event_time_as_utc(voevent, index=0): """ Extracts the event time from a given `WhereWhen.ObsDataLocation`. Returns a datetime (timezone-aware, UTC). Accesses a `WhereWhere.ObsDataLocation.ObservationLocation` element and returns the AstroCoords.Time.TimeInstant.ISOTime element, converted to a (UTC-timezoned) datetime. Note that a packet may include multiple 'ObsDataLocation' entries under the 'WhereWhen' section, for example giving locations of an object moving over time. Most packets will have only one, however, so the default is to access the first. This function now implements conversion from the TDB (Barycentric Dynamical Time) time scale in ISOTime format, since this is the format used by GAIA VOEvents. (See also http://docs.astropy.org/en/stable/time/#time-scale ) Other timescales (i.e. TT, GPS) will presumably be formatted as a TimeOffset, parsing this format is not yet implemented. Args: voevent (:class:`voeventparse.voevent.Voevent`): Root node of the VOevent etree. index (int): Index of the ObsDataLocation to extract an ISOtime from. Returns: :class:`datetime.datetime`: Datetime representing the event-timestamp, converted to UTC (timezone aware). """ try: od = voevent.WhereWhen.ObsDataLocation[index] ol = od.ObservationLocation coord_sys = ol.AstroCoords.attrib['coord_system_id'] timesys_identifier = coord_sys.split('-')[0] if timesys_identifier == 'UTC': isotime_str = str(ol.AstroCoords.Time.TimeInstant.ISOTime) return iso8601.parse_date(isotime_str) elif (timesys_identifier == 'TDB'): isotime_str = str(ol.AstroCoords.Time.TimeInstant.ISOTime) isotime_dtime = iso8601.parse_date(isotime_str) tdb_time = astropy.time.Time(isotime_dtime, scale='tdb') return tdb_time.utc.to_datetime().replace(tzinfo=pytz.UTC) elif (timesys_identifier == 'TT' or timesys_identifier == 'GPS'): raise NotImplementedError( "Conversion from time-system '{}' to UTC not yet implemented" ) else: raise ValueError( 'Unrecognised time-system: {} (badly formatted VOEvent?)'.format( timesys_identifier ) ) except AttributeError: return None
[docs]def get_event_position(voevent, index=0): """Extracts the `AstroCoords` from a given `WhereWhen.ObsDataLocation`. Note that a packet may include multiple 'ObsDataLocation' entries under the 'WhereWhen' section, for example giving locations of an object moving over time. Most packets will have only one, however, so the default is to just return co-ords extracted from the first. Args: voevent (:class:`voeventparse.voevent.Voevent`): Root node of the VOEvent etree. index (int): Index of the ObsDataLocation to extract AstroCoords from. Returns: Position (:py:class:`.Position2D`): The sky position defined in the ObsDataLocation. """ od = voevent.WhereWhen.ObsDataLocation[index] ac = od.ObservationLocation.AstroCoords ac_sys = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoordSystem sys = ac_sys.attrib['id'] if hasattr(ac.Position2D, "Name1"): assert ac.Position2D.Name1 == 'RA' and ac.Position2D.Name2 == 'Dec' posn = Position2D(ra=float(ac.Position2D.Value2.C1), dec=float(ac.Position2D.Value2.C2), err=float(ac.Position2D.Error2Radius), units=ac.Position2D.attrib['unit'], system=sys) return posn
def _get_param_children_as_omdict(subtree_element): elt = subtree_element omd = OMDict() if elt.find('Param') is not None: for p in elt.Param: omd.add(p.attrib.get('name'), p.attrib) return omd
[docs]def get_grouped_params(voevent): """ Fetch grouped Params from the `What` section of a voevent as an omdict. This fetches 'grouped' Params, i.e. those enclosed in a Group element, and returns them as a nested dict-like structure, keyed by GroupName->ParamName->AttribName. Note that since multiple Params may share the same ParamName, the returned data-structure is actually an `orderedmultidict.omdict <https://github.com/gruns/orderedmultidict>`_ and has extra methods such as 'getlist' to allow retrieval of all values. Args: voevent (:class:`voeventparse.voevent.Voevent`): Root node of the VOevent etree. Returns (orderedmultidict.omdict): Mapping of ``ParamName->Attribs``. Typical access like so:: foo_val = top_params['foo']['value'] # If there are multiple Param entries named 'foo': all_foo_vals = [atts['value'] for atts in top_params.getlist('foo')] """ groups_omd = OMDict() w = deepcopy(voevent.What) lxml.objectify.deannotate(w) if w.find('Group') is not None: for grp in w.Group: groups_omd.add(grp.attrib.get('name'), _get_param_children_as_omdict(grp)) return groups_omd
[docs]def get_toplevel_params(voevent): """ Fetch ungrouped Params from the `What` section of a voevent as an omdict. This fetches 'toplevel' Params, i.e. those not enclosed in a Group element, and returns them as a nested dict-like structure, keyed like ParamName->AttribName. Note that since multiple Params may share the same ParamName, the returned data-structure is actually an `orderedmultidict.omdict <https://github.com/gruns/orderedmultidict>`_ and has extra methods such as 'getlist' to allow retrieval of all values. Any Params with no defined name (technically off-spec, but not invalidated by the XML schema) are returned under the dict-key ``None``. Args: voevent (:class:`voeventparse.voevent.Voevent`): Root node of the VOevent etree. Returns (orderedmultidict.omdict): Mapping of ``ParamName->Attribs``. Typical access like so:: foo_val = top_params['foo']['value'] # If there are multiple Param entries named 'foo': all_foo_vals = [atts['value'] for atts in top_params.getlist('foo')] """ result = OrderedDict() w = deepcopy(voevent.What) lxml.objectify.deannotate(w) return _get_param_children_as_omdict(w)
[docs]def pull_astro_coords(voevent, index=0): """ Deprecated alias of :func:`.get_event_position` """ import warnings warnings.warn( """ The function `pull_astro_coords` has been renamed to `get_event_position`. This alias is preserved for backwards compatibility, and may be removed in a future release. """, FutureWarning) return get_event_position(voevent, index)
[docs]def pull_isotime(voevent, index=0): """ Deprecated alias of :func:`.get_event_time_as_utc` """ import warnings warnings.warn( """ The function `pull_isotime` has been renamed to `get_event_time_as_utc`. This alias is preserved for backwards compatibility, and may be removed in a future release. """, FutureWarning) return get_event_time_as_utc(voevent, index)
[docs]def pull_params(voevent): """ Attempts to load the `What` section of a voevent as a nested dictionary. .. warning:: Deprecated due to `Missing name attributes` issues. `Param` or `Group` entries which are missing the `name` attribute will be entered under a dictionary key of ``None``. This means that if there are multiple entries missing the `name` attribute then earlier entries will be overwritten by later entries, so you will not be able to use this convenience routine effectively. Use :func:`get_grouped_params` and :func:`get_toplevel_params` instead. Args: voevent (:class:`voeventparse.voevent.Voevent`): Root node of the VOevent etree. Returns: dict: Mapping of ``Group->Param->Attribs``. Access like so:: foo_param_val = what_dict['GroupName']['ParamName']['value'] .. note:: Parameters without a group are indexed under the key 'None' - otherwise, we might get name-clashes between `params` and `groups` (unlikely but possible) so for ungrouped Params you'll need something like:: what_dict[None]['ParamName']['value'] """ import warnings warnings.warn( """ The function `pull_params` has been deprecated in favour of the split functions `get_toplevel_params` and `get_grouped_params`, due to possible name-shadowing issues when combining multilevel-nested-dicts (see docs for details). This alias is preserved for backwards compatibility, and may be removed in a future release. """, FutureWarning) result = OrderedDict() w = deepcopy(voevent.What) lxml.objectify.deannotate(w) if w.countchildren() == 0: return result toplevel_params = OrderedDict() result[None] = toplevel_params if w.find('Param') is not None: for p in w.Param: toplevel_params[p.attrib.get('name')] = p.attrib if w.find('Group') is not None: for g in w.Group: g_params = {} result[g.attrib.get('name')] = g_params if hasattr(g, 'Param'): for p in g.Param: g_params[p.attrib.get('name')] = p.attrib return result
[docs]def prettystr(subtree): """Print an element tree with nice indentation. Prettyprinting a whole VOEvent often doesn't seem to work, probably for issues relating to whitespace cf. http://lxml.de/FAQ.html#why-doesn-t-the-pretty-print-option-reformat-my-xml-output This function is a quick workaround for prettyprinting a subsection of a VOEvent, for easier desk-checking. Args: subtree(:class`lxml.etree.ElementTree`): A node in the VOEvent element tree. Returns: str: Prettyprinted string representation of the raw XML. """ subtree = deepcopy(subtree) lxml.objectify.deannotate(subtree) lxml.etree.cleanup_namespaces(subtree) return lxml.etree.tostring(subtree, pretty_print=True).decode( encoding="utf-8")