from typing import Callable, Literal, Optional, Sequence, Type, Union, overload
import numpy as np
from astropy.io import fits
from .. import structures as struct
from ..io import io
from . import header as hdr
__all__ = [
"apply_element_wise",
"astype",
"change_axes_order",
"clip",
"copy",
"cube_from",
"cube_from_maps",
"cube_from_profiles",
"flatten",
"from_fits",
"from_numpy",
"is_logical",
"isfinite",
"isnan",
"map_from",
"nan_to_num",
"ones",
"ones_like",
"profile_from",
"stack_numpy",
"to_numpy",
"where",
"x_axis",
"y_axis",
"z_axis",
"zeros",
"zeros_like",
]
# Axes handler
@overload
def change_axes_order(
input: "struct.Cube", order: Literal["xyz", "yxz", "zxy", "zyx"]
) -> "struct.Cube":
...
@overload
def change_axes_order(input: "struct.Map", order: Literal["xy", "yx"]) -> "struct.Map":
...
[docs]
def change_axes_order(
input: Union["struct.Cube", "struct.Map"], order: str
) -> Union["struct.Cube", "struct.Map"]:
"""TODO"""
header = input.header
data = input.data
new_header = hdr.move_header_axes(header, order)
axes_sources = {
2: (0, 1),
3: (0, 1, 2),
}
axes_destinations = {
"xyz": (0, 1, 2),
"yxz": (0, 2, 1),
"zxy": (1, 2, 0),
"zyx": (2, 1, 0),
"xy": (0, 1),
"yx": (1, 0),
}
new_data = np.moveaxis(
data, axes_sources[len(order)], axes_destinations[order.strip().lower()]
)
return type(input)(new_data, new_header)
# Element wise application
@overload
def apply_element_wise(
input: "struct.Cube", fun: Callable[[np.ndarray], np.ndarray]
) -> "struct.Cube":
...
@overload
def apply_element_wise(
input: "struct.Map", fun: Callable[[np.ndarray], np.ndarray]
) -> "struct.Map":
...
@overload
def apply_element_wise(
input: "struct.Profile", fun: Callable[[np.ndarray], np.ndarray]
) -> "struct.Profile":
...
[docs]
def apply_element_wise(
input: "struct.Struct", fun: Callable[[np.ndarray], np.ndarray]
) -> "struct.Struct":
"""
Apply the element-wise operator `fun` on the `input` structure.
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
fun : `Callable[[np.ndarray], ndarray]`
Element-wise operator.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
new_data = fun(input.data)
return type(input)(new_data, input.header)
[docs]
def is_logical(input: "struct.Struct") -> bool:
"""TODO"""
return ((input.data == 0) | (input.data == 1)).all()
# Floating type conversion
@overload
def astype(input: "struct.Cube", dtype: Literal["float", "double"]) -> "struct.Cube":
...
@overload
def astype(input: "struct.Map", dtype: Literal["float", "double"]) -> "struct.Map":
...
@overload
def astype(
input: "struct.Profile", dtype: Literal["float", "double"]
) -> "struct.Profile":
...
[docs]
def astype(
input: "struct.Struct", dtype: Literal["float", "double"]
) -> "struct.Struct":
"""
Return the input structure with floating type 'float' or 'double'.
Note that there is no function as_int or as_float because structures are always of type float.
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
dtype : str
Floating type of output data. Must be 'float' or 'double'.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
if dtype.lower() == "float":
return type(input)(input.data.astype(np.single), input.header)
if dtype.lower() == "double":
return type(input)(input.data.astype(np.double), input.header)
raise ValueError("dtype must be 'float' or 'double', not {dtype}")
# Element wise test for nan values
@overload
def isnan(input: "struct.Cube") -> "struct.Cube":
...
@overload
def isnan(input: "struct.Map") -> "struct.Map":
...
@overload
def isnan(input: "struct.Profile") -> "struct.Profile":
...
[docs]
def isnan(input: "struct.Struct") -> "struct.Struct":
"""
Return a structure similar to `input` where a sample is 1 if input same sample is `nan` and 0 if is not.
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
return type(input)(np.isnan(input.data), input.header)
@overload
def isfinite(input: "struct.Cube") -> "struct.Cube":
...
@overload
def isfinite(input: "struct.Map") -> "struct.Map":
...
@overload
def isfinite(input: "struct.Profile") -> "struct.Profile":
...
[docs]
def isfinite(input: "struct.Struct") -> "struct.Profile":
"""
Return a structure similar to `input` where a sample is 1 if input same sample is finite and 0 if is not.
A finite element is a value different of `nan`, `inf` or `neginf`.
This function is the opposite to isnan because `inf` and `neginf` are automatically casted to `nan`
in structures constructors. So in practice : isnan(input) == ~isfinite(input).
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
return type(input)(np.isfinite(input.data), input.header)
# Nan functions
@overload
def nan_to_num(input: "struct.Cube", value: float = 0.0) -> "struct.Cube":
...
@overload
def nan_to_num(input: "struct.Map", value: float = 0.0) -> "struct.Map":
...
@overload
def nan_to_num(input: "struct.Profile", value: float = 0.0) -> "struct.Profile":
...
[docs]
def nan_to_num(input: "struct.Struct", value: float = 0.0) -> "struct.Struct":
"""
Returns a copy of `input` where the nans have been replaced by `value`.
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
value : float, optional
Value to replace `nans` elements. Default : 0.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
return type(input)(np.nan_to_num(input.data, nan=value), input.header)
# Initializers
@overload
def copy(input: "struct.Cube") -> "struct.Cube":
...
@overload
def copy(input: "struct.Map") -> "struct.Map":
...
@overload
def copy(input: "struct.Profile") -> "struct.Profile":
...
[docs]
def copy(input: "struct.Struct") -> "struct.Struct":
"""
Returns a copy of `input`.
Parameters
----------
input : `Cube | Map | Profile`
Input structure.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
return type(input)(input.data.copy(), input.header.copy())
@overload
def from_numpy(
cls: Type["struct.Cube"],
array: np.ndarray,
header: fits.Header,
axes: Optional[Literal["xyz", "yxz", "zxy", "zyx"]] = None,
) -> "struct.Cube":
...
@overload
def from_numpy(
cls: Type["struct.Map"],
array: np.ndarray,
header: fits.Header,
axes: Optional[Literal["xy", "yx"]] = None,
) -> "struct.Map":
...
@overload
def from_numpy(
cls: Type["struct.Profile"],
array: np.ndarray,
header: fits.Header,
axes: None = None,
) -> "struct.Profile":
...
[docs]
def from_numpy(
cls: Type["struct.Struct"],
array: np.ndarray,
header: fits.Header,
axes: Optional[str] = None,
) -> "struct.Struct":
"""
Create an inputect of type `cls` from numpy array `array` and astropy header `header`.
Parameters
----------
cls : `Type[Cube] | Type[Map] | Type[Profile]`
Type of structure to create.
array : np.ndarray
Data of structure to create. Must be 1D if cls is `Profile`, 2D if cls is `Map` and
3D if cls is `Cube`.
header : fits.Header
Astropy fits header. Can be taken from another cube or created by hand.
axes : `str`, optional
Order of axes. Must be 'xyz', 'yxz', 'zxy', 'zyx', 'xy', 'yx' or None.
Returns
-------
out : `Cube | Map | Profile`
Created structure.
"""
if axes is not None:
header = hdr.move_header_axes(header, axes)
return cls(array, header)
@overload
def from_fits(
cls: Type["struct.Cube"],
filename: str,
path: str = None,
axes=Optional[Literal["xyz", "yxz", "zxy", "zyx"]],
) -> "struct.Cube":
...
@overload
def from_fits(
cls: Type["struct.Map"],
filename: str,
path: str = None,
axes: Optional[Literal["xy", "yx"]] = None,
) -> "struct.Map":
...
@overload
def from_fits(
cls: Type["struct.Profile"], filename: str, path: str = None, axes: None = None
) -> "struct.Profile":
...
[docs]
def from_fits(
cls: Type["struct.Struct"],
filename: str,
path: str = None,
axes: Optional[str] = None,
) -> "struct.Struct":
"""
Load an inputect of type `cls` from file `filename/path`.
Parameters
----------
cls : `Type[Cube] | Type[Map] | Type[Profile]`
Type of structure to create.
filename : `str`
Filename of FITS file to load. Extension can be ommited. Handle both .fits or .fits.gz files,
but notice that .gz files take longer to load.
path : `str`, optional
Path to the FITS file.
axes : `str`, optional
Order of axes. Must be 'xyz', 'yxz', 'zxy', 'zyx', 'xy', 'yx' or None.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
data, header = io._load_fits(filename, path)
if axes is not None:
header = hdr.move_header_axes(header, axes)
return cls(data.astype(np.float32), header)
@overload
def zeros(cls: Type["struct.Cube"], header: fits.Header) -> "struct.Cube":
...
@overload
def zeros(cls: Type["struct.Map"], header: fits.Header) -> "struct.Map":
...
@overload
def zeros(cls: Type["struct.Profile"], header: fits.Header) -> "struct.Profile":
...
[docs]
def zeros(cls: Type["struct.Struct"], header: fits.Header) -> "struct.Struct":
"""
Create an inputect of type `cls` fill with zeros from astropy header `header`.
Parameters
----------
cls : `Type[Cube] | Type[Map] | Type[Profile]`
Type of structure to create.
header : fits.Header
Astropy fits header. Can be taken from another cube or created by hand.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
shape = [header[f"NAXIS{k}"] for k in range(header["NAXIS"], 0, -1)]
return from_numpy(cls, np.zeros(shape), header)
@overload
def ones(cls: Type["struct.Cube"], header: fits.Header) -> "struct.Cube":
...
@overload
def ones(cls: Type["struct.Map"], header: fits.Header) -> "struct.Map":
...
@overload
def ones(cls: Type["struct.Profile"], header: fits.Header) -> "struct.Profile":
...
[docs]
def ones(cls: Type["struct.Struct"], header: fits.Header) -> "struct.Struct":
"""
Create an inputect of type `cls` fill with ones from astropy header `header`.
Parameters
----------
cls : `Type[Cube] | Type[Map] | Type[Profile]`
Type of structure to create.
header : fits.Header
Astropy fits header. Can be taken from another cube or created by hand.
Returns
-------
out : `Cube | Map | Profile`
Output structure.
"""
shape = [header[f"NAXIS{k}"] for k in range(header["NAXIS"], 0, -1)]
print(shape)
return from_numpy(cls, np.ones(shape), header)
@overload
def zeros_like(input: "struct.Cube") -> "struct.Cube":
...
@overload
def zeros_like(input: "struct.Map") -> "struct.Map":
...
@overload
def zeros_like(input: "struct.Profile") -> "struct.Profile":
...
[docs]
def zeros_like(input: "struct.Struct"):
"""
Returns a structure with the same shape than input filled with zeros.
Parameters
----------
input : Cube | Map | Profile
Input structure
Returns
-------
out : Cube | Map | Profile
Output structure
"""
return type(input)(np.zeros_like(input.data), input.header)
@overload
def ones_like(input: "struct.Cube") -> "struct.Cube":
...
@overload
def ones_like(input: "struct.Map") -> "struct.Map":
...
@overload
def ones_like(input: "struct.Profile") -> "struct.Profile":
...
[docs]
def ones_like(input: "struct.Struct") -> "struct.Struct":
"""
Returns a structure with the same shape than input filled with ones.
Parameters
----------
input : Cube | Map | Profile
Input structure
Returns
-------
out : Cube | Map | Profile
Output structure
"""
return type(input)(np.ones_like(input.data), input.header)
# Derived initializers
[docs]
def map_from(cube: "struct.Cube", value: float = 0.0):
"""
Returns a map with the same x and y axis than `cube` filled with `value`.
Parameters
----------
cube : Cube
Input cube.
Returns
-------
out : Map
Output map.
"""
new_header = hdr.remove_header_axis(cube.header, axis="spectral")
new_data = np.zeros((cube.ny, cube.nx)) + value
return struct.Map(new_data, new_header)
[docs]
def profile_from(cube: "struct.Cube", value: float = 0.0):
"""
Returns a profile with the same z axis than `cube` filled with `value`.
Parameters
----------
cube : Cube
Input cube.
Returns
-------
out : Profile
Output profile.
"""
new_header = hdr.remove_header_axis(cube.header, axis="spatial")
new_data = np.zeros(cube.nz) + value
return struct.Profile(new_data, new_header)
[docs]
def cube_from(map: "struct.Map", profile: "struct.Profile", value: float = 0.0):
"""
Returns a cube with the same x and y axis than `map` and the same z axis
than profile filled with `value`.
Parameters
----------
map : Map
Input map.
profile : Profile
Input profile.
Returns
-------
cube : Cube
Output cube.
"""
new_header = hdr.merge_headers(map.header, profile.header)
new_data = np.zeros(profile.shape + map.shape) + value
return struct.Cube(new_data, new_header)
[docs]
def cube_from_maps(maps: Sequence["struct.Map"]) -> "struct.Cube":
"""
Returns a cube builded by concatening maps in `maps` sequences.
Parameters
----------
maps : Sequence[Map]
Sequence of maps of same shape. Must not be empty.
Returns
-------
cube : Cube
Output cube with same spatial shape than elements of maps.
"""
if len(maps) == 0:
raise ValueError("maps must not be empty")
nz, ny, nx = len(maps), maps[0].ny, maps[0].nx
new_data = np.zeros((nz, ny, nx))
new_header = hdr.create_header("cube", maps[0].header, nz=nz)
for k in range(nz):
new_data[k, :, :] = maps[k].data
return struct.Cube(new_data, new_header)
[docs]
def cube_from_profiles(profiles: Sequence[Sequence["struct.Profile"]]) -> "struct.Cube":
"""
Returns a cube builded by concatening maps in `maps` sequences.
Parameters
----------
profiles : Sequence[Sequence[Profile]]
Sequence of sequence of profiles of same shape.
The sequence must not be empty and each sub-sequence must also not be empty.
profiles[i][j] is the pixel of the i-th row and the j-th column.
Returns
-------
cube : Cube
Output cube with same spectral shape than elements of profiles.
"""
if len(profiles) == 0:
raise ValueError("profiles must not be empty")
for _, e in enumerate(profiles):
if len(e) == 0:
raise ValueError("Each element of profiles must be a non-empty sequence")
ny, nx, nz = len(profiles), len(profiles[0]), profiles[0][0].nz
new_data = np.zeros((nz, ny, nx))
new_header = hdr.create_header("cube", profiles[0][0].header, nx=nx, ny=ny)
for i in range(ny):
for j in range(nx):
new_data[:, j, i] = profiles[j][i].data
return struct.Cube(new_data, new_header)
# Other functions
@overload
def where(
bool_input: "struct.Cube",
input_1: Union["struct.Cube", float],
input_2: Union["struct.Cube", float],
) -> "struct.Cube":
...
@overload
def where(
bool_input: "struct.Map",
input_1: Union["struct.Map", float],
input_2: Union["struct.Map", float],
) -> "struct.Map":
...
@overload
def where(
bool_input: "struct.Profile",
input_1: Union["struct.Profile", float],
input_2: Union["struct.Profile", float],
) -> "struct.Profile":
...
[docs]
def where(
logical_input: "struct.Struct",
input_1: Union["struct.Struct", float],
input_2: Union["struct.Struct", float],
) -> "struct.Struct":
"""TODO"""
if not isinstance(logical_input, struct.Struct):
raise TypeError(
f"logical_input must be an instance of Struct (Cube, Map or Profile), not {type(logical_input)}"
)
if not is_logical(logical_input):
raise TypeError(
f"logical_input must be logical i.e. contains only 0 and 1 samples"
)
a = input_1.data if isinstance(input_1, struct.Struct) else input_1
b = input_2.data if isinstance(input_2, struct.Struct) else input_2
new_data = np.where(logical_input.data == 1, a, b)
return type(logical_input)(new_data, logical_input.header)
@overload
def clip(
input: "struct.Cube", vmin: Optional[float], vmax: Optional[float]
) -> "struct.Cube":
...
@overload
def clip(
input: "struct.Map", vmin: Optional[float], vmax: Optional[float]
) -> "struct.Map":
...
@overload
def clip(
input: "struct.Profile", vmin: Optional[float], vmax: Optional[float]
) -> "struct.Profile":
...
[docs]
def clip(
input: "struct.Struct", vmin: Optional[float], vmax: Optional[float]
) -> "struct.Struct":
"""TODO"""
return type(input)(np.clip(input.data, vmin, vmax), input.header)
# To numpy
[docs]
def flatten(input: "struct.Struct"):
"""
Shortcut for input.data.flatten().
Parameters
----------
input : Cube | Map | Profile
Structure.
Returns
-------
np.ndarray
1D numpy array extracted from structure data.
"""
return input.data.flatten()
[docs]
def to_numpy(input: "struct.Struct", item: str) -> np.ndarray:
"""TODO"""
if item.strip().lower() not in ["pixel", "map"]:
raise ValueError(f"item must be 'pixel' or 'map', not {item}")
if isinstance(input, struct.Cube):
if item == "pixel":
return input.data.reshape(-1, input.data.shape[1] * input.data.shape[2]).T
else:
return input.data
elif isinstance(input, struct.Map):
if item == "pixel":
return input.data.flatten()
else:
raise ValueError("item = 'map' is not compatible with a Map inputect")
elif isinstance(input, struct.Profile):
if item == "pixel":
raise ValueError(
"item = 'profile' is not compatible with a Profile inputect"
)
else:
return input.data
else:
raise TypeError(
f"input must be an instance of Cube, Map or Profile, not {type(input)}"
)
[docs]
def stack_numpy(arrays: Sequence[np.ndarray]) -> np.ndarray:
"""TODO"""
return np.concatenate(arrays, axis=0)
# Axes
[docs]
def x_axis(
input: Union["struct.Map", "struct.Cube"], unit: Literal["index", "angle"] = "index"
) -> np.ndarray:
"""TODO"""
if not isinstance(input, (struct.Map, struct.Cube)):
raise TypeError(f"input must be an instance of Map or Cube, not {type(input)}")
unit = unit.strip().lower()
if unit not in ["index", "angle"]:
raise ValueError(f"unit must be 'index' or 'angle', not '{unit}'")
axis = np.arange(input.nx)
if unit == "angle":
axis = hdr.indices_to_coordinates(input.header, axis, 0)[0]
return axis
[docs]
def y_axis(
input: Union["struct.Map", "struct.Cube"], unit: Literal["index", "angle"] = "index"
) -> np.ndarray:
"""TODO"""
if not isinstance(input, (struct.Map, struct.Cube)):
raise TypeError(f"input must be an instance of Map or Cube, not {type(input)}")
unit = unit.strip().lower()
if unit not in ["index", "angle"]:
raise ValueError(f"unit must be 'index' or 'angle', not '{unit}'")
axis = np.arange(input.ny)
if unit == "angle":
axis = hdr.indices_to_coordinates(input.header, axis, 0)[1]
return axis
[docs]
def z_axis(
input: Union["struct.Profile", "struct.Cube"],
unit: Literal["index", "velocity", "frequency"] = "index",
) -> np.ndarray:
"""TODO"""
if not isinstance(input, (struct.Profile, struct.Cube)):
raise TypeError(
f"input must be an instance of Profile or Cube, not {type(input)}"
)
unit = unit.strip().lower()
if unit not in ["index", "velocity", "frequency"]:
raise ValueError(
f"unit must be 'index', 'velocity' or 'frequency', not '{unit}'"
)
axis = np.arange(input.nz)
if unit == "velocity":
axis = hdr.index_to_velocity(input.header, axis)
elif unit == "frequency":
axis = hdr.index_to_frequency(input.header, axis)
return axis