Source code for ranch.core.header

""" Header module """

from typing import Optional, Union, Literal

from datetime import datetime

import numpy as np

from astropy.io import fits

__all__ = [
    'C_LIGHT', 'bound_coordinates', 'bound_frequencies', 'bound_velocities', 'change_coordinates', 'check_header', 'coordinates_to_indices', 'create_header', 'frequency_to_index', 'index_to_frequency', 'index_to_velocity', 'indices_to_coordinates', 'merge_headers', 'move_header_axes', 'remove_header_axis', 'update_header', 'velocity_to_index'
]

[docs] def check_header(data, header) : """ Returns True if data is compatible with the header, else raise a `ValueError` """ if header['NAXIS'] != data.ndim : raise ValueError(f"header must have card NAXIS = {data.ndim}, not {header['NAXIS']}") h_shape = tuple((header[f'NAXIS{k}'] for k in range(data.ndim, 0, -1))) if data.shape != h_shape : raise ValueError(f"data dimensions {data.shape} do not match the header cards {h_shape}") return True
[docs] def update_header(data : np.ndarray, header : fits.Header) : """ Updates header cards to match data. Data and header are first verified by `check_header` i.e. their dimensions must match. Missing cards are also added to avoid future errors. """ check_header(data, header) new_header = header.copy() # Update data extrema (if all values are NaNs, min and max are 0 by default) if np.isfinite(data).any() : new_header['DATAMIN'] = np.nanmin(data) new_header['DATAMAX'] = np.nanmax(data) else : new_header['DATAMIN'] = 0. new_header['DATAMAX'] = 0. # Update date new_header['DATE'] = str(datetime.now()) # Update origin if 'ORIGIN' in new_header.keys() : if '- MODIFIED' not in new_header['ORIGIN'].strip().upper() : new_header['ORIGIN'] = new_header['ORIGIN'].strip().upper() + ' - MODIFIED' # Update several cards if 'OBJECT' not in new_header.keys() : new_header['OBJECT'] = 'UNKNOWN' if 'BUNIT' not in new_header.keys() : new_header['BUNIT'] = 'UNKNOWN' if 'RA' not in new_header.keys() : new_header['RA'] = 0. if 'DEC' not in new_header.keys() : new_header['DEC'] = 0. if 'RESTFREQ' not in new_header.keys() : new_header['RESTFREQ'] = 1. # Axis cards if data.ndim not in (1, 2, 3) : raise ValueError(f'data dimension must be 1, 2 or 3, not {data.ndim}') for axis in range(1, data.ndim+1) : if f'CTYPE{axis}' not in new_header.keys() : new_header[f'CTYPE{axis}'] = 'INDICES' if f'CRVAL{axis}' not in new_header.keys() : new_header[f'CRVAL{axis}'] = 1 if f'CDELT{axis}' not in new_header.keys() : new_header[f'CDELT{axis}'] = 1 if f'CRPIX{axis}' not in new_header.keys() : new_header[f'CRPIX{axis}'] = 1 if f'CROTA{axis}' not in new_header.keys() : new_header[f'CROTA{axis}'] = 0 return new_header
[docs] def merge_headers(header_1 : fits.Header, header_2 : fits.Header) : """ Returns a header compatible with `header_1` and `header_2`. The returned header can have a different number of axis than the inputs. """ n1, n2 = header_1['NAXIS'], header_2['NAXIS'] cards = ['NAXIS{}', 'CTYPE{}', 'CRVAL{}', 'CDELT{}', 'CRPIX{}', 'CROTA{}'] if n1 == n2 : # Case 1 : header have the same number of axis h_shape_1 = tuple((header_1[f'NAXIS{k}'] for k in range(header_1['NAXIS'], 0, -1))) h_shape_2 = tuple((header_2[f'NAXIS{k}'] for k in range(header_1['NAXIS'], 0, -1))) if h_shape_1 != h_shape_2 : raise ValueError(f"header_1 and header_2 must have the same axis dimensions ({h_shape_1} and {h_shape_2})") # By default, the new header is a copy of header_1 new_header = header_1.copy() elif (n1 == 2 and n2 == 1) or (n1 == 1 and n2 == 2) : # Case 2 : one has two spatial axis and the other has one spectral axis (map_header, profile_header) = (header_1.copy(), header_2.copy())\ if n1 > n2 else (header_2.copy(), header_1.copy()) map_header['NAXIS'] = 3 for card in cards : map_header[card.format(3)] = profile_header[card.format(1)] new_header = map_header elif (n1 == 3 and n2 == 2) or (n1 == 2 and n2 == 3) : # Case 3 : one has three axis and the other has two spatial axis # By default, the new header is a copy of the cube header new_header = header_1.copy() if n1 > n2 else header_2.copy() elif (n1 == 3 and n2 == 1) or (n1 == 1 and n2 == 3) : # Case 4 : one has three axis and the other has one spectral axis # By default, the new header is a copy of the profile header new_header = header_1.copy() if n1 > n2 else header_2.copy() else : # Else return an error raise ValueError("Headers cannot be merged.") # Check somes cards try : if header_1['LINE'].strip().lower() != header_2['LINE'].strip().lower() : new_header['LINE'] = 'COMBINATION' except : pass # Remove unit if 'BUNIT' in new_header.keys() : new_header['BUNIT'] = 'UNKNOWN' # Update origin if 'ORIGIN' in new_header.keys() : if '- MODIFIED' not in new_header['ORIGIN'].strip().upper() : new_header['ORIGIN'] = new_header['ORIGIN'].strip().upper() + ' - MODIFIED' return new_header
[docs] def create_header(struct : str, ref_header : Optional[fits.Header] = None, nx : Optional[int] = None, ny : Optional[int] = None, nz : Optional[int] = None) -> fits.Header: """ TODO """ struct = struct.lower() if struct not in ['cube', 'map', 'profile'] : raise ValueError(f"struct must be 'cube', 'map' or 'profile', not {struct}") if ref_header is None : new_header = fits.Header() new_header['SIMPLE'] = 'T' new_header['BITPIX'] = 32 #new_header['BSCALE'] = 0.3341172671867E-08 #new_header['BZERO'] = 0.3305371761322E+01 #new_header['BLANK'] = 2147483647 new_header['DATAMIN'] = 0. new_header['DATAMAX'] = 0. new_header['BUNIT'] = 'UNKNOWN' new_header['OBJECT'] = 'UNKNOWN' new_header['RADESYS'] = 'FK5' #new_header['RA'] = 0.8522612499999E+02 #new_header['DEC'] = -0.2466666666667E+01 #new_header['EQUINOX'] = 0.2000000000000E+04 new_header['LINE'] = 'UNKNOWN' #new_header['ALTRPIX'] = 0.1200000000000E+03 #new_header['ALTRVAL'] = 0.1134869952662E+12 #new_header['RESTFREQ'] = 0.1134909702000E+12 #new_header['IMAGFREQ'] = 0.9400874075270E+11 #new_header['VELO-LSR'] = 0.1050000000000E+05 #new_header['VELREF'] = 257 #new_header['SPECSYS'] = 'LSRK' #new_header['BMAJ'] = 0.6346730671426E-02 #new_header['BMIN'] = 0.6346730671426E-02 #new_header['BPA'] = 0.0000000000000E+00 new_header['ORIGIN'] = 'IRAM' new_header['DATE'] = str(datetime.now()) if struct == 'cube' : if nx is None or ny is None or nz is None : raise ValueError('nx, ny and nz cannot be None to create a cube header without reference') new_header['NAXIS'] = 3 new_header['NAXIS1'] = nx new_header['NAXIS2'] = ny new_header['NAXIS3'] = nz new_header['CTYPE1'] = 'INDICES' new_header['CTYPE2'] = 'INDICES' new_header['CTYPE3'] = 'INDICES' new_header['CRVAL1'] = 1 new_header['CDELT1'] = 1 new_header['CRPIX1'] = 1 new_header['CROTA1'] = 0 new_header['CRVAL2'] = 1 new_header['CDELT2'] = 1 new_header['CRPIX2'] = 1 new_header['CROTA2'] = 0 new_header['CRVAL3'] = 1 new_header['CDELT3'] = 1 new_header['CRPIX3'] = 1 new_header['CROTA3'] = 0 return new_header if struct == 'map' : if nx is None or ny is None : raise ValueError('nx and ny cannot be None to create a map header without reference') new_header['NAXIS'] = 2 new_header['NAXIS1'] = nx new_header['NAXIS2'] = ny new_header['CTYPE1'] = 'INDICES' new_header['CTYPE2'] = 'INDICES' new_header['CRVAL1'] = 1 new_header['CDELT1'] = 1 new_header['CRPIX1'] = 1 new_header['CROTA1'] = 0 new_header['CRVAL2'] = 1 new_header['CDELT2'] = 1 new_header['CRPIX2'] = 1 new_header['CROTA2'] = 0 return new_header if struct == 'profile' : if nz is None : raise ValueError('nz cannot be None to create a profile header without reference') new_header['NAXIS'] = 1 new_header['NAXIS1'] = nz new_header['CTYPE1'] = 'INDICES' new_header['CRVAL1'] = 1 new_header['CDELT1'] = 1 new_header['CRPIX1'] = 1 new_header['CROTA1'] = 0 return new_header if ref_header['NAXIS'] == 1 : ref_struct = 'profile' elif ref_header['NAXIS'] == 2 : ref_struct = 'map' elif ref_header['NAXIS'] == 3 : ref_struct = 'cube' else : raise ValueError(f"ref_header['NAXIS'] must be equal to 1, 2 or 3, not {ref_header['NAXIS']}") if struct == 'cube' and ref_struct == 'cube' : return ref_header.copy() if struct == 'cube' and ref_struct == 'map' : return merge_headers(ref_header, create_header('profile', nz = nz)) if struct == 'cube' and ref_struct == 'profile' : return merge_headers(ref_header, create_header('map', nx = nx, ny = ny)) raise ValueError(f'Cannot create {struct} header using a {ref_struct} header as reference')
# Cards
[docs] def move_header_axes(header : fits.Header, order : Literal['xyz', 'yxz', 'zxy', 'zyx', 'xy', 'yx']) -> fits.Header : """ Return a version of header where the order of axes if xyz. Input header axes order is `order`. """ order = order.lower() # If header is a cube header if header['NAXIS'] == 3 : if order == 'xyz' : indices = (1, 2, 3) elif order == 'yxz' : indices = (2, 1, 3) elif order == 'zxy' : indices = (3, 1, 2) elif order == 'zyx' : indices = (3, 2, 1) else : raise ValueError(f"order must be 'xyz', 'yxz', 'zxy' or 'zyx', not '{order}'") elif header['NAXIS'] == 2 : if order == 'xy' : indices = (1, 2) elif order == 'yx' : indices = (2, 1) else : raise ValueError(f"order must be 'xy' or 'yx', not '{order}'") else : raise ValueError('move_header_axes is not defined for a number of axis different than 2 or 3') new_header = header.copy() cards = ['NAXIS{}', 'CTYPE{}', 'CRVAL{}', 'CDELT{}', 'CRPIX{}', 'CROTA{}'] for i, j in enumerate(indices, 1) : for card in cards : new_header[card.format(j)] = header[card.format(i)] return new_header
[docs] def remove_header_axis(header : fits.Header, axis : str) -> fits.Header : """ Axis must be 'spatial' or 'spectral'. Header must be a cube header i.e. have header['NAXIS'] = 3. """ axis = axis.strip().lower() cards = ['NAXIS{}', 'CTYPE{}', 'CRVAL{}', 'CDELT{}', 'CRPIX{}', 'CROTA{}'] new_header = header.copy() if axis == 'spatial' : for card in cards : new_header[card.format(1)] = new_header[card.format(3)] del new_header[card.format(2)] del new_header[card.format(3)] new_header['NAXIS'] = 1 elif axis == 'spectral' : for card in cards : del new_header[card.format(3)] new_header['NAXIS'] = 2 else : raise ValueError(f"axis must be 'spatial' or 'spectral', not {axis}") return new_header
def _replace_none(x, value) : """ Returns x+1 if x is not None, else value """ return x+1 if x is not None else value
[docs] def change_coordinates(header, x = None, y = None, z = None) -> fits.Header : """ Coordinates must begin at 0 """ new_header = header.copy() if header['NAXIS'] == 3 : # Cubes handling if x is None : x = [1, header['NAXIS1']] else : x = [_replace_none(x[0], 1), _replace_none(x[1], header['NAXIS1'])] if y is None : y = [1, header['NAXIS2']] else : y = [_replace_none(y[0], 1), _replace_none(y[1], header['NAXIS2'])] if z is None : z = [1, header['NAXIS3']] else : z = [_replace_none(z[0], 1), _replace_none(z[1], header['NAXIS3'])] new_header['CRPIX1'] = new_header['CRPIX1'] - x[0] new_header['CRPIX2'] = new_header['CRPIX2'] - y[0] new_header['CRPIX3'] = new_header['CRPIX3'] - z[0] new_header['NAXIS1'] = x[1] - x[0] + 1 new_header['NAXIS2'] = y[1] - y[0] + 1 new_header['NAXIS3'] = z[1] - z[0] + 1 elif header['NAXIS'] == 2 : # Maps handling if x is None : x = [1, header['NAXIS1']] else : x = [_replace_none(x[0], 1), _replace_none(x[1], header['NAXIS1'])] if y is None : y = [1, header['NAXIS2']] else : y = [_replace_none(y[0], 1), _replace_none(y[1], header['NAXIS2'])] new_header['CRPIX1'] = new_header['CRPIX1'] - x[0] new_header['CRPIX2'] = new_header['CRPIX2'] - y[0] new_header['NAXIS1'] = x[1] - x[0] + 1 new_header['NAXIS2'] = y[1] - y[0] + 1 elif header['NAXIS'] == 1 : # Profiles handling if z is None : z = [1, header['NAXIS1']] else : z = [_replace_none(z[0], 1), _replace_none(z[1], header['NAXIS1'])] new_header['CRPIX1'] = new_header['CRPIX1'] - z[0] new_header['NAXIS1'] = z[1] - z[0] + 1 else : raise ValueError(f"'NAXIS' card must be 1, 2 or 3 (not {header['NAXIS']})") return new_header
# Coordinates # Unit for index is numpy index (beginning at 0) # Unit for velocity is km/s # Unit for frequency is GHz # Unit for angle is ' (minute of arc) C_LIGHT = 299792458 # In m/s
[docs] def indices_to_coordinates(header : fits.Header, i : Union[int, np.ndarray], j : Union[int, np.ndarray], absolute : bool = False) : """ Returns the absolute coordinates in degrees of the (i,j) point in pixels (beginning at 0) """ if header['NAXIS'] not in [2, 3] : # If header of profile raise ValueError("header must be the header of a cube or a map") if (np.array(i) < 0).any() or (np.array(j) < 0).any() : raise ValueError('i and j must be non-negative indices') if (np.array(i) >= header['NAXIS1']).any() : raise ValueError(f"i must be lower than {header['NAXIS1']}") if (np.array(j) >= header['NAXIS2']).any() : raise ValueError(f"j must be lower than {header['NAXIS2']}") if absolute : x = (i+1-header['CRPIX1']) * header['CDELT1'] + header['CRVAL1'] y = (j+1-header['CRPIX2']) * header['CDELT2'] + header['CRVAL2'] else : x = (i+1-header['CRPIX1']) * header['CDELT1'] y = (j+1-header['CRPIX2']) * header['CDELT2'] x, y = 60*x, 60*y # Conversion from degrees to minutes of arc return x, y
[docs] def coordinates_to_indices(header : fits.Header, x : Union[float, np.ndarray], y : Union[float, np.ndarray], absolute : bool = False) : """ TODO """ if header['NAXIS'] == 1 : raise ValueError("header must be the header of a cube or a map") xmin, ymin = indices_to_coordinates(header, 0, 0, absolute = absolute) xmax, ymax = indices_to_coordinates(header, header['NAXIS1']-1, header['NAXIS2']-1, absolute = absolute) if (np.array(x) < xmin).any() : raise ValueError(f"x must be greater than {xmin}'") if (np.array(y) < ymin).any() : raise ValueError(f"y must be greater than {ymin}'") if (np.array(x) > xmax).any() : raise ValueError(f"x must be lower than {xmax}'") if (np.array(y) > ymax).any() : raise ValueError(f"y must be lower than {ymax}'") if absolute : i = (x - header['CRVAL1']) / header['CDELT1'] + header['CRPIX1'] - 1 j = (y - header['CRVAL2']) / header['CDELT2'] + header['CRPIX2'] - 1 else : i = x / header['CDELT1'] + header['CRPIX1'] - 1 j = y / header['CDELT2'] + header['CRPIX2'] - 1 return round(i), round(j)
[docs] def bound_coordinates(header : fits.Header, absolute : bool = False) : """ Returns the absolute coordinates bounds """ xymin = indices_to_coordinates(header, 0, 0, absolute = absolute) xymax = indices_to_coordinates(header, header['NAXIS1']-1, header['NAXIS2']-1, absolute = absolute) xbounds = xymin[0], xymax[0] ybounds = xymin[1], xymax[1] return xbounds, ybounds
[docs] def index_to_velocity(header : fits.Header, k : Union[int, np.ndarray]) : """ Returns the velocity of the k-th spectral image by Doppler effect (beginning at 0) """ if header['NAXIS'] not in [1, 3] : # If header of map raise ValueError("header must be the header of a cube or a profile") if (np.array(k) < 0).any() : raise ValueError('k must be a non-negative index') if header['NAXIS'] == 1 : # Header of profile if (np.array(k) >= header['NAXIS1']).any() : raise ValueError(f"k must be lower than {header['NAXIS1']}") v = (k+1-header['CRPIX1']) * header['CDELT1'] + header['CRVAL1'] else : # Header of cube if (np.array(k) >= header['NAXIS3']).any() : raise ValueError(f"k must be lower than {header['NAXIS3']}") v = (k+1-header['CRPIX3']) * header['CDELT3'] + header['CRVAL3'] return v / 1e3 # Convert to km/s
[docs] def velocity_to_index(header : fits.Header, v : Union[float, np.ndarray]) : """ Returns the velocity channel index (beginning at 0) corresponding to a velocity v """ if header['NAXIS'] not in [1, 3] : raise ValueError("header must be the header of a cube or a profile") vmin = index_to_velocity(header, 0) if (np.array(v) < vmin).any() : raise ValueError(f'v must be greater than {vmin}') if header['NAXIS'] == 1 : # Header of profile vmax = index_to_velocity(header, header['NAXIS1']-1) if (np.array(v) > vmax).any() : raise ValueError(f'v must be lower than {vmax}') v_ = v * 1e3 # Convert to m/s k = (v_ - header['CRVAL1']) / header['CDELT1'] + header['CRPIX1'] - 1 else : # Header of cube vmax = index_to_velocity(header, header['NAXIS3']-1) if (np.array(v) > vmax).any() : raise ValueError(f'v must be lower than {vmax}') v_ = v * 1e3 # Convert to m/s k = (v_ - header['CRVAL3']) / header['CDELT3'] + header['CRPIX3'] - 1 return round(k)
[docs] def bound_velocities(header : fits.Header) : """ Returns the min and max velocities """ vmin = index_to_velocity(header, 0) vmax = index_to_velocity(header, header[f"NAXIS{header['NAXIS']}"]-1) return vmin, vmax
[docs] def index_to_frequency(header : fits.Header, k : Union[int, np.ndarray]) : """ Returns the frequency of the k-th spectal image (beginning at 0) """ if header['NAXIS'] not in [1, 3] : # If header of map raise ValueError("header must be the header of a cube or a profile") if (np.array(k) < 0).any() : raise ValueError('k must be a non-negative index') if header['NAXIS'] == 1 : if (np.array(k) >= header['NAXIS1']).any() : raise ValueError(f"k must be lower than {header['NAXIS1']}") df = -(header['CDELT1']/C_LIGHT) * header['RESTFREQ'] f = (k+1-header['CRPIX1']) * df + header['RESTFREQ'] else : if (np.array(k) >= header['NAXIS3']).any() : raise ValueError(f"k must be lower than {header['NAXIS3']}") df = -(header['CDELT3']/C_LIGHT) * header['RESTFREQ'] f = (k+1-header['CRPIX3']) * df + header['RESTFREQ'] return f / 1e9 # Convert to GHz
[docs] def frequency_to_index(header : fits.Header, f : Union[float, np.ndarray]) : """ Returns the velocity channel index (beginning at 0) corresponding to a frequency f """ if header['NAXIS'] not in [1, 3] : raise ValueError("header must be the header of a cube or a profile") fmin = index_to_frequency(header, 0) if (np.array(f) < fmin).any() : raise ValueError(f'f must be greater than {fmin}') if header['NAXIS'] == 1 : fmax = index_to_velocity(header, header['NAXIS1']-1) if (np.array(f) > fmax).any() : raise ValueError(f'f must be lower than {fmax}') f_ = f * 1e9 # Convert to Hz df = -(header['CDELT1']/C_LIGHT) * header['RESTFREQ'] k = (f_ - header['RESTFREQ']) / df + header['CRPIX1'] - 1 else : fmax = index_to_velocity(header, header['NAXIS3']-1) if (np.array(f) > fmax).any() : raise ValueError(f'f must be lower than {fmax}') f_ = f * 1e9 # Convert to Hz df = -(header['CDELT3']/C_LIGHT) * header['RESTFREQ'] k = (f_ - header['RESTFREQ']) / df + header['CRPIX3'] - 1 return round(k)
[docs] def bound_frequencies(header : fits.Header) : """ Returns the min and max frequencies """ fmin = index_to_frequency(header, header[f"NAXIS{header['NAXIS']}"]-1) fmax = index_to_frequency(header, 0) return fmin, fmax