Source code for phise.classes.context

import numpy as np
import astropy.units as u
import astropy.constants as const
import numba as nb
from copy import deepcopy as copy
import matplotlib.pyplot as plt

from . import telescope
from .camera import Camera
try:
    import matplotlib.pyplot as plt
    try:
        plt.rcParams['image.origin'] = 'lower'
    except Exception:
        pass
except Exception:
    plt = None
from io import BytesIO
from LRFutils import color
from scipy.optimize import curve_fit

# Internal libs
from .interferometer import Interferometer
from .target import Target
from .companion import Companion
from . import kernel_nuller
from .kernel_nuller import KernelNuller

from ..modules import coordinates
from ..modules import signals
from ..modules import phase

[docs] class Context: __slots__ = ('_initialized', '_interferometer', '_target', '_h', '_Δh', '_Γ', '_name', '_p', '_ph', '_monochromatic')
[docs] def __init__( self, interferometer:Interferometer, target:Target, h:u.Quantity, Δh: u.Quantity, Γ: u.Quantity, monochromatic = False, name:str = "Unnamed Context", ): """Créer un contexte d'observation. Paramètres ---------- interferometer : Interferometer Objet décrivant l'instrument et sa géométrie. target : Target Objet décrivant la cible (coordonnées, flux, compagnons). h : astropy.units.Quantity Heure locale/angle horaire central de l'observation. Δh : astropy.units.Quantity Durée / intervalle en angle horaire observé. Γ : astropy.units.Quantity Erreur de cophasage RMS (quantité en longueur). monochromatic : bool, optionnel Si True, utiliser l'approximation monochromatique. name : str, optionnel Nom du contexte. """ self._initialized = False self.interferometer = copy(interferometer) self.interferometer._parent_ctx = self self.target = copy(target) self.target._parent_ctx = self self.h = h self.Δh = Δh self.Γ = Γ self.monochromatic = monochromatic self.name = name self.project_telescopes_position() # define self.p self.update_photon_flux() # define self.pf self._initialized = True
# To string --------------------------------------------------------------- def __str__(self) -> str: res = f'Context "{self.name}"\n' res += " " + "\n ".join(str(self.interferometer).split("\n")) + "\n" res += " " + "\n ".join(str(self.target).split("\n")) + "\n" res += f' h: {self.h:.2f}\n' res += f' Δh: {self.Δh:.2f}\n' res += f' Γ: {self.Γ:.2f}' return res def __repr__(self) -> str: return self.__str__() # Interferometer property ------------------------------------------------- @property def interferometer(self) -> Interferometer: return self._interferometer @interferometer.setter def interferometer(self, interferometer:Interferometer): if not isinstance(interferometer, Interferometer): raise TypeError("interferometer must be an Interferometer object") self._interferometer = copy(interferometer) self.interferometer._parent_ctx = self if self._initialized: self.project_telescopes_position() # Target property --------------------------------------------------------- @property def target(self) -> Target: return self._target @target.setter def target(self, target: Target): if not isinstance(target, Target): raise TypeError("target must be a Target object") self._target = copy(target) self.target._parent_ctx = self if self._initialized: self.project_telescopes_position() # h property -------------------------------------------------------------- @property def h(self) -> u.Quantity: return self._h @h.setter def h(self, h: u.Quantity): if type(h) != u.Quantity: raise TypeError("h must be a Quantity") try: h = h.to(u.hourangle) except u.UnitConversionError: raise ValueError("h must be in a hourangle unit") self._h = h if self._initialized: self.project_telescopes_position() # Δh property ------------------------------------------------------------- @property def Δh(self) -> u.Quantity: return self._Δh @Δh.setter def Δh(self, Δh: u.Quantity): if type(Δh) != u.Quantity: raise TypeError("Δh must be a Quantity") try: Δh = Δh.to(u.hourangle) except u.UnitConversionError: raise ValueError("Δh must be in a hourangle unit") if Δh < (e := self.interferometer.camera.e.to(u.hour).value * u.hourangle): Δh = e self._Δh = Δh # Γ property -------------------------------------------------------------- @property def Γ(self) -> u.Quantity: return self. .setter def Γ(self, Γ: u.Quantity): if type(Γ) != u.Quantity: raise TypeError("Γ must be a Quantity") try: Γ = Γ.to(u.m) except u.UnitConversionError: raise ValueError("Γ must be in a distance unit") self. = Γ # p property -------------------------------------------------------------- @property def p(self) -> u.Quantity: return self._p @p.setter def p(self, p: u.Quantity): raise ValueError("p is a read-only property. Use project_telescopes_position() to set it accordingly to the other parameters in this context.") # monochromatic property -------------------------------------------------- @property def monochromatic(self) -> bool: return self._monochromatic @monochromatic.setter def monochromatic(self, monochromatic: bool): if not isinstance(monochromatic, bool): raise TypeError("monochromatic must be a boolean") self._monochromatic = monochromatic # Name property ----------------------------------------------------------- @property def name(self) -> str: return self._name @name.setter def name(self, name: str): if not isinstance(name, str): raise TypeError("name must be a string") self._name = name # Photon flux ------------------------------------------------------------- @property def pf(self) -> u.Quantity: """ Photon flux in the context """ if not hasattr(self, "_ph"): raise AttributeError("pf is not defined. Call update_photon_flux() first.") return self._ph @pf.setter def pf(self, pf: u.Quantity): """ Set the photon flux in the context """ raise ValueError("pf is a read-only property. Use update_photon_flux() to set it accordingly to the other parameters in this context.")
[docs] def update_photon_flux(self): """Calculer et stocker le flux photonique reçu par chaque télescope. Hypothèses ---------- - Le flux spectral de la cible est supposé constant sur la bande Δλ. - Si Δλ == 0 (cas monochromatique), la valeur est normalisée par 1 nm. """ f = self.target.f.to(u.W / u.m**2 / u.nm) λ = self.interferometer.λ.to(u.m) η = self.interferometer.η Δλ = self.interferometer.Δλ.to(u.nm) a = np.array([i.a.to(u.m**2).value for i in self.interferometer.telescopes]) * u.m**2 h = const.h c = const.c # Monochromatic case if Δλ == 0: Δλ = 1 * u.nm p = η * f * a * Δλ # Optical power [W] self._ph = p * λ / (h*c) # Photon flux [photons/s] (array of (n_telescopes,))
# Projected position ------------------------------------------------------ @property def p(self) -> u.Quantity: """ Projected position of the telescopes in a plane perpendicular to the line of sight. """ if not hasattr(self, "_p"): raise AttributeError("p is not defined. Call project_telescopes_position() first.") return self._p @p.setter def p(self, p: u.Quantity): """ Set the projected position of the telescopes in a plane perpendicular to the line of sight. """ raise ValueError("p is a read-only property. Use project_telescopes_position() to set it accordingly to the other parameters in this context.")
[docs] def project_telescopes_position(self): """Projeter les positions des télescopes dans un plan perpendiculaire. Met en place la propriété `p` (positions projetées en mètres) à partir des positions locales des télescopes et de l'angle horaire `h`. """ h = self.h.to(u.rad).value l = self.interferometer.l.to(u.rad).value δ = self.target.δ.to(u.rad).value r = np.array([i.r.to(u.m).value for i in self.interferometer.telescopes]) self._p = project_position_njit(r, h, l, δ) * u.m
# Plot projected positions over the time ----------------------------------
[docs] def plot_projected_positions( self, N:int = 11, return_image = False, ): """ Plot the telescope positions over the time. Parameters ---------- - N: Number of positions to plot - return_image: Return the image buffer instead of displaying it Returns ------- - None | Image buffer if return_image is True """ _, ax = plt.subplots() h_range = np.linspace(self.h - self.Δh/2, self.h + self.Δh/2, N, endpoint=True) # Plot UT trajectory for i, h in enumerate(h_range): ctx = copy(self) ctx.h = h for j, (x, y) in enumerate(ctx.p): ax.scatter(x, y, label=f"Telescope {j+1}" if i==len(h_range)-1 else None, color=f"C{j}", s=1+14*i/len(h_range)) print(self.interferometer.l) for (x, y) in self.p: ax.scatter(x, y, color="black", marker="+") ax.set_aspect("equal") ax.set_xlabel(f"x [{self.p.unit}]") ax.set_ylabel(f"y [{self.p.unit}]") ax.set_title(f"Projected telescope positions over the time ({ctx.Δh.to(u.hourangle).value * u.h} long)") plt.legend() if return_image: buffer = BytesIO() plt.savefig(buffer,format='png') plt.close() return buffer.getvalue() plt.show()
# Transmission maps -------------------------------------------------------
[docs] def get_transmission_maps(self, N:int) -> np.ndarray[float]: """ Generate all the kernel-nuller transmission maps for a given resolution Parameters ---------- - N: Resolution of the map Returns ------- - Null outputs map (3 x resolution x resolution) - Dark outputs map (6 x resolution x resolution) - Kernel outputs map (3 x resolution x resolution) """ N=N φ=self.interferometer.kn.φ.to(u.m).value σ=self.interferometer.kn.σ.to(u.m).value p=self.p.value λ=self.interferometer.λ.to(u.m).value λ0=self.interferometer.kn.λ0.to(u.m).value fov=self.interferometer.fov output_order=self.interferometer.kn.output_order return get_transmission_map_njit(N=N, φ=φ, σ=σ, p=p, λ=λ, λ0=λ0, fov=fov, output_order=output_order)
def plot_transmission_maps(self, N:int, return_plot:bool = False) -> None: # Get transmission maps n_maps, d_maps, k_maps = self.get_transmission_maps(N=N) # Get companions position to plot them companions_pos = [] for c in self.target.companions: x, y = coordinates.αθ_to_xy(α=c.α, θ=c.θ, fov=self.interferometer.fov) companions_pos.append((x*self.interferometer.fov/2, y*self.interferometer.fov/2)) _, axs = plt.subplots(2, 6, figsize=(35, 10)) fov = self.interferometer.fov extent = (-fov.value/2, fov.value/2, -fov.value/2, fov.value/2) for i in range(3): im = axs[0, i].imshow(n_maps[i], aspect="equal", cmap="hot", extent=extent) axs[0, i].set_title(f"Nuller output {i+1}") plt.colorbar(im, ax=axs[0, i]) for i in range(6): im = axs[1, i].imshow(d_maps[i], aspect="equal", cmap="hot", extent=extent) axs[1, i].set_title(f"Dark output {i+1}") axs[1, i].set_aspect("equal") plt.colorbar(im, ax=axs[1, i]) for i in range(3): im = axs[0, i + 3].imshow(k_maps[i], aspect="equal", cmap="bwr", extent=extent) axs[0, i + 3].set_title(f"Kernel {i+1}") plt.colorbar(im, ax=axs[0, i + 3]) for ax in axs.flatten(): ax.set_xlabel(r"$\theta_x$" + f" ({fov.unit})") ax.set_ylabel(r"$\theta_y$" + f" ({fov.unit})") ax.scatter(0, 0, color="yellow", marker="*", edgecolors="black") for x, y in companions_pos: ax.scatter(x, y, color="blue", edgecolors="black") # Display companions positions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ for ax in axs.flatten(): ax.set_xlabel(r"$\theta_x$" + f" ({fov.unit})") ax.set_ylabel(r"$\theta_y$" + f" ({fov.unit})") ax.scatter(0, 0, color="yellow", marker="*", edgecolors="black", s=100) for x, y in companions_pos: ax.scatter(x, y, color="blue", edgecolors="black") transmissions = "" companions = [Companion(name=self.target.name + " Star", c=1, α=0*u.deg, θ=0*u.mas)] + self.target.companions for i, c in enumerate(companions): α = c.α.to(u.rad) θ = c.θ.to(u.rad) p = self.p.to(u.m) λ = self.interferometer.λ.to(u.m) ψ = get_unique_source_input_fields_njit(a=1, θ=θ.value, α=α.value, λ=λ.value, p=p.value) n, d, b = self.interferometer.kn.propagate_fields(ψ=ψ, λ=self.interferometer.λ) k = np.array([np.abs(d[2*i])**2 - np.abs(d[2*i+1])**2 for i in range(3)]) linebreak = '<br>' if return_plot else '\n ' transmissions += '<h2>' if return_plot else '' transmissions += f"\n{c.name} throughputs:" transmissions += '</h1>' if return_plot else '\n----------' + linebreak transmissions += f"B: {np.abs(b)**2*100:.2f}%" + linebreak transmissions += f"N: {' | '.join([f'{np.abs(i)**2*100:.2f}%' for i in n])}" + f" Depth: {np.sum(np.abs(d)**2) / np.abs(b)**2:.2e}" + linebreak transmissions += f"D: {' | '.join([f'{np.abs(i)**2*100:.2f}%' for i in d])}" + f" Depth: {np.sum(np.abs(d)**2) / np.abs(b)**2:.2e}" + linebreak transmissions += f"K: {' | '.join([f'{i*100:.2f}%' for i in k])}" + f" Depth: {np.sum(np.abs(k)) / np.abs(b)**2:.2e}" + linebreak if return_plot: plot = BytesIO() plt.savefig(plot, format='png') plt.close() return plot.getvalue(), transmissions plt.show() print(transmissions) # Get transmission map gradiant norm --------------------------------------
[docs] def get_transmission_map_gradient_norm(self, N:int) -> np.ndarray[float]: """ Get the norm of the gradient of the transmission maps. Parameters ---------- - N: Resolution of the map Returns ------- - Gradient norm of the null outputs map (3 x resolution x resolution) - Gradient norm of the dark outputs map (6 x resolution x resolution) - Gradient norm of the kernel outputs map (3 x resolution x resolution) """ n_maps, d_maps, k_maps = self.get_transmission_maps(N=N) n_grad = np.empty_like(n_maps) d_grad = np.empty_like(d_maps) k_grad = np.empty_like(k_maps) for i in range(3): dnx, dny = np.gradient(n_maps[i]) n_grad[i] = np.sqrt(dnx**2 + dny**2) dkx, dky = np.gradient(k_maps[i]) k_grad[i] = np.sqrt(dkx**2 + dky**2) for i in range(6): ddx, ddy = np.gradient(d_maps[i]) d_grad[i] = np.sqrt(ddx**2 + ddy**2) return n_grad, d_grad, k_grad
# Plot transmission map gradient norm -------------------------------------
[docs] def plot_transmission_map_gradient_norm(self, N:int, return_plot:bool = False) -> None: """ Plot the norm of the gradient of the transmission maps. Parameters ---------- - N: Resolution of the map - return_plot: Return the image buffer instead of displaying it Returns ------- - None | Image buffer if return_plot is True """ n_grad, d_grad, k_grad = self.get_transmission_map_gradient_norm(N=N) _, axs = plt.subplots(2, 6, figsize=(35, 10)) fov = self.interferometer.fov extent = (-fov.value/2, fov.value/2, -fov.value/2, fov.value/2) companions_pos = [] for c in self.target.companions: x, y = coordinates.αθ_to_xy(α=c.α, θ=c.θ, fov=self.interferometer.fov) companions_pos.append((x*self.interferometer.fov/2, y*self.interferometer.fov/2)) for i in range(3): im = axs[0, i].imshow(n_grad[i], aspect="equal", cmap="gray", extent=extent) axs[0, i].set_title(f"Nuller output {i+1} gradient norm") plt.colorbar(im, ax=axs[0, i]) for i in range(6): im = axs[1, i].imshow(d_grad[i], aspect="equal", cmap="gray", extent=extent) axs[1, i].set_title(f"Dark output {i+1} gradient norm") axs[1, i].set_aspect("equal") plt.colorbar(im, ax=axs[1, i]) for i in range(3): im = axs[0, i + 3].imshow(k_grad[i], aspect="equal", cmap="gray", extent=extent) axs[0, i + 3].set_title(f"Kernel {i+1} gradient norm") plt.colorbar(im, ax=axs[0, i + 3]) for ax in axs.flatten(): ax.set_xlabel(r"$\theta_x$" + f" ({fov.unit})") ax.set_ylabel(r"$\theta_y$" + f" ({fov.unit})") ax.scatter(0, 0, color="yellow", marker="*", edgecolors="black", s=100) for x, y in companions_pos: ax.scatter(x, y, color="blue", edgecolors="black") if return_plot: plot = BytesIO() plt.savefig(plot, format='png') plt.close() return plot.getvalue() plt.show()
# Input fields ------------------------------------------------------------
[docs] def get_input_fields(self) -> np.ndarray[complex]: """ Get the complexe amplitude of the signals acquired by the telescopes. Returns ------- - (nb_companions + 1, nb_telescope) array of acquired signals (complex amplitudes) """ input_fields = [] λ = self.interferometer.λ.to(u.m).value p = self.p.to(u.m).value # Projected telescope positions pf = self.pf.to(1/u.s).value # Photon flux from the star for each telescope # Star input fields input_fields.append(get_unique_source_input_fields_njit(a=pf, θ=0, α=0, λ=λ, p=p)) # Companion input fields for c in self.target.companions: pf_c = pf * c.c # Photon flux from the companion for each telescope input_fields.append(get_unique_source_input_fields_njit(a=pf_c, θ=c.θ.to(u.rad).value, α=c.α.to(u.rad).value, λ=λ, p=p)) # Error OPD γ = np.random.normal(0, self.Γ.to(u.m).value, size=len(self.interferometer.telescopes)) # OPD to phase difference phase = 2 * np.pi * γ / λ # Add the OPD error to the input fields for i in range(len(input_fields)): input_fields[i] = input_fields[i] * np.exp(1j * phase) return np.array(input_fields, dtype=np.complex128)
# H range -----------------------------------------------------------------
[docs] def get_h_range(self) -> np.ndarray[float]: """ Get the hour angle range of the observation. Returns ------- - Hour angle range (in radian) """ nb_obs_per_night = int(self.Δh.to(u.hourangle).value // self.interferometer.camera.e.to(u.hour).value) if nb_obs_per_night < 1: nb_obs_per_night = 1 h_range = np.linspace(self.h - self.Δh/2, self.h + self.Δh/2, nb_obs_per_night) return h_range
# Observation -------------------------------------------------------------
[docs] def observe_monochromatic(self): """ Observe the target in this context with monochromatic approximations. Returns ------- - Dark data (6,) - # of photons events - Kernel data (3,) - # of photons events - Bright data (1,) - # of photons events """ ctx = copy(self) ctx.interferometer.Δλ = 1 * u.nm nb_objects = len(ctx.target.companions) + 1 ds = np.empty((nb_objects, 6), dtype=np.complex128) bs = np.empty(nb_objects, dtype=np.complex128) for ψ_i, ψ in enumerate(ctx.get_input_fields()): _, d, b = ctx.interferometer.kn.propagate_fields(ψ=ψ, λ=ctx.interferometer.λ) ds[ψ_i] = d bs[ψ_i] = b bright = ctx.interferometer.camera.acquire_pixel(bs) darks = np.empty(6) for i in range(6): darks[i] = ctx.interferometer.camera.acquire_pixel(ds[:, i]) kernels = np.empty(3) for i in range(3): kernels[i] = darks[2*i] - darks[2*i+1] # Multiply by the bandwidth darks *= self.interferometer.Δλ.to(u.nm).value kernels *= self.interferometer.Δλ.to(u.nm).value bright *= self.interferometer.Δλ.to(u.nm).value return darks, kernels, bright
[docs] def observe(self, spectral_samples=5): """ Observe the target in this context. Parameters ---------- - spectral_samples: Number of spectral samples to acquire (default: 10) Returns ------- - Dark data (6,) - # of photons events - Kernel data (3,) - # of photons events - Bright data (1,) - # of photons events """ if self.monochromatic: return self.observe_monochromatic() # Sampling bandwidth λ_range = np.linspace(self.interferometer.λ - self.interferometer.Δλ/2, self.interferometer.λ + self.interferometer.Δλ/2, spectral_samples) darks = np.empty((spectral_samples, 6)) kernels = np.empty((spectral_samples, 3)) brights = np.empty(spectral_samples) # Monochromatic approximation for i, λ in enumerate(λ_range): ctx_mono = copy(self) ctx_mono.interferometer.λ = λ ctx_mono.interferometer.Δλ = 1 * u.nm d, k, b = ctx_mono.observe_monochromatic() # Store the results for each wavelength darks[i] = d kernels[i] = k brights[i] = b # Integrate over the bandwidth dark = np.trapz(darks, λ_range.value, axis=0) kernel = np.trapz(kernels, λ_range.value, axis=0) bright = np.trapz(brights, λ_range.value, axis=0) return dark, kernel, bright
[docs] def observation_serie( self, n:int = 1, ) -> np.ndarray[int]: """ Generate a series of observations in this context. Parameters ---------- - n: Number of nights (= number of observations for a given hour angle) Returns ------- - Dark data (n, n_h, 6) - # of photons events - Kernel data (n, n_h, 3) - # of photons events - Bright data (n, n_h) - # of photons events """ h_range = self.get_h_range() brights = np.empty((n, len(h_range))) darks = np.empty((n, len(h_range), 6)) kernels = np.empty((n, len(h_range), 3)) for h_i, h in enumerate(h_range): ctx = copy(self) ctx.h = h for n_i in range(n): d, k, b = ctx.observe() brights[n_i, h_i] = b darks[n_i, h_i] = d kernels[n_i, h_i] = k return darks, kernels, brights
# Genetic calibration -----------------------------------------------------
[docs] def calibrate_gen( self, β: float, verbose: bool = False, plot:bool = False, figsize:tuple = (10, 10), ) -> dict: """ Optimize the phase shifters offsets to maximize the nulling performance. Parameters ---------- - β: Decay factor for the step size (0.5 <= β < 1) - verbose: Boolean, if True, print the optimization process - plot: Boolean, if True, plot the optimization process - figsize: Figure size for the plot Returns ------- - Dict: Dictionary with the optimization history (optional) """ self.Δh = self.interferometer.camera.e.to(u.hour).value * u.hourangle ψ = np.sqrt(self.pf.to(1/self.interferometer.camera.e.unit).value) * (1 + 0j) # Perfectly cophased inputs total_execpted_photons = np.sum(np.abs(ψ)**2) ε = 1e-6 * self.interferometer.λ.unit # Minimum shift step size # Shifters that contribute to redirecting light to the bright output φb = [1, 2, 3, 4, 5, 7] # Shifters that contribute to the symmetry of the dark outputs φk = [6, 8, 9, 10, 11, 12, 13, 14] # History of the optimization depth_history = [] shifters_history = [] Δφ = self.interferometer.λ / 4 while Δφ > ε: if verbose: print(color.black(color.on_red(f"--- New iteration ---")), f"Δφ={Δφ:.2e}") for i in φb + φk: log = "" # Getting observation with different phase shifts self.interferometer.kn.φ[i-1] += Δφ _, k_pos, b_pos = self.observe() self.interferometer.kn.φ[i-1] -= 2*Δφ _, k_neg, b_neg = self.observe() self.interferometer.kn.φ[i-1] += Δφ _, k_old, b_old = self.observe() # Computing throughputs b_pos = b_pos / total_execpted_photons b_neg = b_neg / total_execpted_photons b_old = b_old / total_execpted_photons k_pos = np.sum(np.abs(k_pos)) / total_execpted_photons k_neg = np.sum(np.abs(k_neg)) / total_execpted_photons k_old = np.sum(np.abs(k_old)) / total_execpted_photons # Save the history depth_history.append(np.sum(k_old) / np.sum(b_old)) shifters_history.append(np.copy(self.interferometer.kn.φ.value)) # Maximize the bright metric for group 1 shifters if i in φb: log += "Shift " + color.black(color.on_lightgrey(f"{i}")) + " Bright: " + color.black(color.on_green(f"{b_neg:.2e} | {b_old:.2e} | {b_pos:.2e}")) + " -> " if b_pos > b_old and b_pos > b_neg: log += color.black(color.on_green(" + ")) self.interferometer.kn.φ[i-1] += Δφ elif b_neg > b_old and b_neg > b_pos: log += color.black(color.on_green(" - ")) self.interferometer.kn.φ[i-1] -= Δφ else: log += color.black(color.on_green(" = ")) # Minimize the kernel metric for group 2 shifters else: log += "Shift " + color.black(color.on_lightgrey(f"{i}")) + " Kernel: " + color.black(color.on_blue(f"{k_neg:.2e} | {k_old:.2e} | {k_pos:.2e}")) + " -> " if k_pos < k_old and k_pos < k_neg: self.interferometer.kn.φ[i-1] += Δφ log += color.black(color.on_blue(" + ")) elif k_neg < k_old and k_neg < k_pos: self.interferometer.kn.φ[i-1] -= Δφ log += color.black(color.on_blue(" - ")) else: log += color.black(color.on_blue(" = ")) if verbose: print(log) Δφ *= β self.interferometer.kn.φ = phase.bound(self.interferometer.kn.φ, self.interferometer.λ) if plot: shifters_history = np.array(shifters_history) _, axs = plt.subplots(2,1, figsize=figsize, constrained_layout=True) axs[0].plot(depth_history) axs[0].set_xlabel("Iterations") axs[0].set_ylabel("Kernel-Null depth") axs[0].set_yscale("log") axs[0].set_title("Performance of the Kernel-Nuller") for i in range(shifters_history.shape[1]): axs[1].plot(shifters_history[:,i], label=f"Shifter {i+1}") axs[1].set_xlabel("Iterations") axs[1].set_ylabel("Phase shift") axs[1].set_yscale("linear") axs[1].set_title("Convergence of the phase shifters") # axs[1].legend(loc='upper right') plt.show() return { "depth": np.array(depth_history), "shifters": np.array(shifters_history), }
# Obstruction calibration -------------------------------------------------
[docs] def calibrate_obs( self, n: int = 1_000, plot: bool = False, figsize:tuple[int] = (30,20), ): """ Optimize the phase shifters offsets to maximize the nulling performance. Parameters ---------- - n: Number of points for the least squares optimization - plot: Boolean, if True, plot the optimization process - figsize: Figure size for the plot Returns ------- - Context: New context with the optimized kernel nuller """ kn = self.interferometer.kn input_attenuation_backup = kn.input_attenuation.copy() λ = self.interferometer.λ e = self.interferometer.camera.e total_photons = np.sum(self.pf.to(1/e.unit).value) * e.value if plot: _, axs = plt.subplots(6, 3, figsize=figsize, constrained_layout=True) for i in range(7): axs.flatten()[i].set_xlabel("Phase shift") axs.flatten()[i].set_ylabel("Throughput") def maximize_bright(p, plt_coords=None): x = np.linspace(0, λ.value,n) y = np.empty(n) if isinstance(p,list): Δp = kn.φ[p[1]-1] - kn.φ[p[0]-1] for i in range(n): if isinstance(p,list): kn.φ[p[0]-1] = i * λ / n kn.φ[p[1]-1] = (kn.φ[p[0]-1] + Δp) % λ else: kn.φ[p-1] = i * λ / n _, _, b = self.observe() y[i] = b / total_photons def sin(x, x0): return (np.sin((x-x0)/λ.value*2*np.pi)+1)/2 * (np.max(y)-np.min(y)) + np.min(y) popt, _ = curve_fit(sin, x, y, p0=[0], maxfev = 100_000) if isinstance(p,list): kn.φ[p[0]-1] = (np.mod(popt[0]+λ.value/4, λ.value) * λ.unit).to(kn.φ.unit) kn.φ[p[1]-1] = (kn.φ[p[0]-1] + Δp) % λ else: kn.φ[p-1] = (np.mod(popt[0]+λ.value/4, λ.value) * λ.unit).to(kn.φ.unit) if plot: axs[plt_coords].set_title(f"$|B(\phi{p})|$") axs[plt_coords].scatter(x, y, label='Data', color='tab:blue') axs[plt_coords].plot(x, sin(x, *popt), label='Fit', color='tab:orange') axs[plt_coords].axvline(x=np.mod(popt[0]+λ.value/4, λ.value), color='k', linestyle='--', label='Optimal phase shift') axs[plt_coords].set_xlabel(f"Phase shift ({λ.unit})") axs[plt_coords].set_ylabel("Bright throughput") axs[plt_coords].legend() def minimize_kernel(p, m, plt_coords=None): x = np.linspace(0,λ.value,n) y = np.empty(n) for i in range(n): kn.φ[p-1] = i * λ / n _, k, b = self.observe() y[i] = k[m-1] / b def sin(x, x0): return (np.sin((x-x0)/λ.value*2*np.pi)+1)/2 * (np.max(y)-np.min(y)) + np.min(y) popt, _ = curve_fit(sin, x, y, p0=[0], maxfev = 100_000) kn.φ[p-1] = (np.mod(popt[0], λ.value) * λ.unit).to(kn.φ.unit) if plot: axs[plt_coords].set_title(f"$K_{m}(\phi{p})$") axs[plt_coords].scatter(x, y, label='Data', color='tab:blue') axs[plt_coords].plot(x, sin(x, *popt), label='Fit', color='tab:orange') axs[plt_coords].axvline(x=np.mod(popt[0], λ.value), color='k', linestyle='--', label='Optimal phase shift') axs[plt_coords].set_xlabel(f"Phase shift ({λ.unit})") axs[plt_coords].set_ylabel("Kernel throughput") axs[plt_coords].legend() def maximize_darks(p, ds, plt_coords=None): x = np.linspace(0, λ.value, n) y = np.empty(n) for i in range(n): kn.φ[p-1] = i * λ / n d, _, b = self.observe() y[i] = np.sum(np.abs(d[np.array(ds)-1])) / b def sin(x, x0): return (np.sin((x-x0)/λ.value*2*np.pi)+1)/2 * (np.max(y)-np.min(y)) + np.min(y) popt, _ = curve_fit(sin, x, y, p0=[0], maxfev = 100_000) kn.φ[p-1] = (np.mod(popt[0]+λ.value/4, λ.value) * λ.unit).to(kn.φ.unit) if plot: axs[plt_coords].set_title(f"$|D_{ds[0]}(\phi{p})| + |D_{ds[1]}(\phi{p})|$") axs[plt_coords].scatter(x, y, label='Data', color='tab:blue') axs[plt_coords].plot(x, sin(x, *popt), label='Fit', color='tab:orange') axs[plt_coords].axvline(x=np.mod(popt[0]+λ.value/4, λ.value), color='k', linestyle='--', label='Optimal phase shift') axs[plt_coords].set_xlabel(f"Phase shift ({λ.unit})") axs[plt_coords].set_ylabel(f"Dark pair {ds} throughput") axs[plt_coords].legend() # Bright maximization self.interferometer.kn.input_attenuation = [1, 1, 0, 0] maximize_bright(2, plt_coords=(0,0)) maximize_bright([1,2], plt_coords=(1,0)) if plot: axs[1,1].axis('off') axs[1,2].axis('off') plt.show() return self.interferometer.kn.input_attenuation = [0, 0, 1, 1] maximize_bright(4, plt_coords=(0,1)) maximize_bright([3,4], plt_coords=(1,1)) self.interferometer.kn.input_attenuation = [1, 0, 1, 0] maximize_bright(7, plt_coords=(0,2)) maximize_bright([5,7], plt_coords=(1,2)) # Darks maximization self.interferometer.kn.input_attenuation = [1, 0, 0, -1] maximize_darks(8, [1,2], plt_coords=(1,0)) # Kernel minimization self.interferometer.kn.input_attenuation = [1, 0, 0, 0] minimize_kernel(11, 1, plt_coords=(2,0)) minimize_kernel(13, 2, plt_coords=(2,1)) minimize_kernel(14, 3, plt_coords=(2,2)) kn.φ = phase.bound(kn.φ, λ) kn.input_attenuation = input_attenuation_backup if plot: axs[1,1].axis('off') axs[1,2].axis('off') plt.show()
#============================================================================== # VLTI Context #==============================================================================
[docs] def get_VLTI() -> 'Context': """ Get the default context for the analysis. This context uses: - The VLTI with 4 UTs. - The first generation of active kernel nuller. - Vega as target star and an hypothetical companion at 2 mas with a contrast of 1e-6. """ λ = 1.55 * u.um # Central wavelength ctx = Context( h = 0 * u.hourangle, # Central hour angle Δh = 8 * u.hourangle, # Hour angle range Γ = 100 * u.nm, # Input cophasing error (RMS) monochromatic=False, name="Default Context", # Context name interferometer = Interferometer( l = -24.6275 * u.deg, # Latitude λ = λ, # Central wavelength Δλ = 1 * u.nm, # Bandwidth fov = 10 * u.mas, # Field of view η = 0.02, # Optical efficiency telescopes = telescope.get_VLTI_UTs(), name = "VLTI", # Interferometer name kn = KernelNuller( φ = np.zeros(14) * u.um, # Injected phase shifts σ = np.abs(np.random.normal(0, 1, 14)) * u.um, # Manufacturing OPD errors λ0 = λ, name = "First Generation Kernel-Nuller", # Kernel nuller name ), camera = Camera( e = 5 * u.min, # Exposure time name = "Default Camera", # Camera name ), ), target=Target( f = (1050 * u.Jy * 2 * np.pi * const.c / λ**2).to(u.W / u.m**2 / u.nm), # Target flux δ = -64.71 * u.deg, # Target declination name = "Vega", # Target name companions = [ Companion( c = 1e-6, # Companion contrast θ = 4 * u.mas, # Companion angular separation α = 0 * u.deg, # Companion position angle name = "Hypothetical Companion", # Companion name ), ], ), ) return ctx
#============================================================================== # LIFE Context #==============================================================================
[docs] def get_LIFE() -> 'Context': """ Get the default context for the analysis. This context uses: - The 4 telescopes of LIFE - The first generation of active kernel nuller. - Vega as target star and an hypothetical companion at 2 mas with a contrast of 1e-6. """ λ = 4 * u.um # Central wavelength ctx = Context( interferometer = Interferometer( l = -90 * u.deg, # Latitude λ = λ, # Central wavelength Δλ = 1 * u.nm, # Bandwidth fov = 10 * u.mas, # Field of view η = 0.02, # Optical efficiency telescopes = telescope.get_VLTI_UTs(), name = "LIFE", # Interferometer name kn = KernelNuller( φ = np.zeros(14) * u.um, # Injected phase shifts σ = np.abs(np.random.normal(0, 1, 14)) * u.um, # Manufacturing OPD errors λ0 = λ, name = "First Generation Kernel-Nuller", # Kernel nuller name ), camera = Camera( e = 5 * u.min, # Exposure time name = "Default Camera", # Camera name ), ), target=Target( f = (1050 * u.Jy * 2 * np.pi * const.c / λ**2).to(u.W / u.m**2 / u.nm), # Target flux δ = -90 * u.deg, # Target declination name = "Vega", # Target name companions = [ Companion( c = 1e-6, # Companion contrast θ = 4 * u.mas, # Companion angular separation α = 0 * u.deg, # Companion position angle name = "Hypothetical Companion", # Companion name ), ], ), h = 0 * u.hourangle, # Central hour angle Δh = 24 * u.hourangle, # Hour angle range Γ = 1 * u.nm, # Input cophasing error (RMS) name="Default Context", # Context name ) return ctx
#============================================================================== # Number functions #============================================================================== # Projected position ---------------------------------------------------------- @nb.njit() def project_position_njit( r: np.ndarray[float], h: float, l: float, δ: float, ) -> np.ndarray[float]: """ Project the telescope position in a plane perpendicular to the line of sight. Parameters ---------- - r: Array of telescope positions (in meters) - h: Hour angle (in radian) - l: Latitude (in radian) - δ: Declination (in radian) Returns ------- - Array of projected telescope positions (same shape and unit as p) """ M = np.array([ [ -np.sin(l)*np.sin(h), np.cos(h) ], [ np.sin(l)*np.cos(h)*np.sin(δ) + np.cos(l)*np.cos(δ), np.sin(h)*np.sin(δ)], ]) p = np.empty_like(r) for i, (x,y) in enumerate(r): p[i] = M @ np.array([y, x]) return p # Transmission maps ----------------------------------------------------------- @nb.njit() def get_transmission_map_njit( N: int, φ: np.ndarray[float], σ: np.ndarray[float], p: np.ndarray[float], λ: float, λ0: float, fov: float, output_order: np.ndarray[int] ) -> tuple[np.ndarray[complex], np.ndarray[complex], np.ndarray[float]]: """ Generate the transmission maps of this context with a given resolution Parameters ---------- - N: Resolution of the map - φ: Array of 14 injected OPD (in meter) - σ: Array of 14 intrasic OPD (in meter) - p: Projected telescope positions (in meter) - λ: Wavelength (in meter) - λ0: Reference wavelength (in meter) - fov: Field of view in mas - output_order: Order of the outputs Returns ------- - Null outputs map (3 x resolution x resolution) - Dark outputs map (6 x resolution x resolution) - Kernel outputs map (3 x resolution x resolution) """ # Get the coordinates of the map _, _, α_map, θ_map = coordinates.get_maps_njit(N=N, fov=fov) # mas to radian θ_map = θ_map / 1000 / 3600 / 180 * np.pi n_maps = np.zeros((3, N, N), dtype=np.complex128) d_maps = np.zeros((6, N, N), dtype=np.complex128) k_maps = np.zeros((3, N, N), dtype=float) for x in range(N): for y in range(N): α = α_map[x, y] θ = θ_map[x, y] ψ = get_unique_source_input_fields_njit(a=1, θ=θ, α=α, λ=λ, p=p) n, d, _ = kernel_nuller.propagate_fields_njit(ψ, φ, σ, λ, λ0, output_order) k = np.array([np.abs(d[2*i])**2 - np.abs(d[2*i+1])**2 for i in range(3)]) for i in range(3): n_maps[i, x, y] = n[i] for i in range(6): d_maps[i, x, y] = d[i] for i in range(3): k_maps[i, x, y] = k[i] return np.abs(n_maps) ** 2, np.abs(d_maps) ** 2, k_maps # Input fields ---------------------------------------------------------------- @nb.njit() def get_unique_source_input_fields_njit( a: float, θ: float, α: float, λ: float, p: np.ndarray[float], ) -> np.ndarray[complex]: """ Get the complexe amplitude of the input signals according to the object and telescopes positions. Parameters ---------- - a: Intensity of the signal (prop. to #photons/s) - θ: Angular separation (in radian) - α: Parallactic angle (in radian) - λ: Wavelength (in meter) - p: Projected telescope positions (in meter) Returns ------- - Array of acquired signals (complex amplitudes). """ # Array of complex signals s = np.empty(p.shape[0], dtype=np.complex128) for i, t in enumerate(p): # Rotate the projected telescope positions by the parallactic angle p_rot = t[0] * np.cos(-α) - t[1] * np.sin(-α) # Compute the phase delay according to the object position Φ = 2 * np.pi * p_rot * np.sin(θ) / λ # Build the complex amplitude of the signal s[i] = np.exp(1j * Φ) return s * np.sqrt(a / p.shape[0])