Source code for anacal.simulation

"""Simulation utilities for generating synthetic galaxy images.

Provides COSMOS-based galaxy generation, isolated and blended image
simulations, and noise generation routines used for shear calibration
studies.
"""
# FPFS shear estimator
# Copyright 20210905 Xiangchong Li.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# python lib

import gc
import os

import fitsio
import galsim
import numpy as np

_data_dir = os.path.join(
    os.path.dirname(os.path.abspath(__file__)),
    "data",
)


def _galsim_round_sersic(n, sersic_prec):
    """Round a Sersic index to the nearest multiple of *sersic_prec*."""
    return float(int(n / sersic_prec + 0.5)) * sersic_prec


[docs] def coord_distort_1(x, y, xref, yref, gamma1, gamma2, kappa=0.0, inverse=False): """Apply shear and convergence distortion to coordinates. Args: x (np.ndarray): x-coordinates in pixels. y (np.ndarray): y-coordinates in pixels. xref (float): Reference x-coordinate in pixels. yref (float): Reference y-coordinate in pixels. gamma1 (float): First component of shear (dimensionless). gamma2 (float): Second component of shear (dimensionless). kappa (float, optional): Convergence distortion. Defaults to ``0.0``. inverse (bool, optional): If ``True``, transform from source plane to lens plane; otherwise transform from lens plane to source plane. Returns: Tuple[np.ndarray, np.ndarray]: Distorted ``x`` and ``y`` coordinates in pixels. Notes: This function returns new arrays and does not modify ``x`` or ``y``. References: Bartelmann, M., & Schneider, P. (2001). *Weak gravitational lensing*. Physics Reports, 340(4-5), 291–472. """ if inverse: xu = x - xref yu = y - yref x2 = (1 - kappa - gamma1) * xu - gamma2 * yu + xref y2 = -gamma2 * xu + (1 - kappa + gamma1) * yu + yref else: u_mag = 1.0 / (1 - kappa) ** 2.0 - gamma1**2.0 - gamma2**2.0 xu = x - xref yu = y - yref x2 = ((1 - kappa + gamma1) * xu + gamma2 * yu + xref) * u_mag y2 = (gamma2 * xu + (1 - kappa - gamma1) * yu + yref) * u_mag return x2, y2
[docs] def coord_rotate(x, y, xref, yref, theta): """Rotate coordinates around a reference point. Args: x (np.ndarray): x-coordinates in pixels. y (np.ndarray): y-coordinates in pixels. xref (float): Reference x-coordinate in pixels. yref (float): Reference y-coordinate in pixels. theta (float): Rotation angle in radians, positive anticlockwise. Returns: Tuple[np.ndarray, np.ndarray]: Rotated ``x`` and ``y`` coordinates in pixels. Notes: This function returns new arrays and does not modify ``x`` or ``y``. References: Rotation matrix: https://en.wikipedia.org/wiki/Rotation_matrix """ xu = x - xref yu = y - yref x2 = np.cos(theta) * xu - np.sin(theta) * yu + xref y2 = np.sin(theta) * xu + np.cos(theta) * yu + yref return x2, y2
[docs] def generate_cosmos_gal_simple(record, gsparams=None, random_shape=False): """Generate a simplified COSMOS galaxy (Gaussian profiles only). Uses Gaussian approximations for both bulge+disk and single-Sersic fits, which is faster than :func:`generate_cosmos_gal` and suitable for use with :class:`galsim.RandomKnots`. Args: record (ndarray): One row of the COSMOS galaxy catalogue. gsparams (galsim.GSParams | None): GalSim rendering parameters. random_shape (bool): Whether to apply intrinsic ellipticity from the catalogue fit parameters. Returns: galsim.GSObject: GalSim galaxy object. """ bdparams = record["bulgefit"] sparams = record["sersicfit"] use_bulgefit = record["use_bulgefit"] if use_bulgefit: # Bulge parameters: # Minor-to-major axis ratio: bulge_hlr = record["hlr"][1] bulge_flux = record["flux"][1] disk_hlr = record["hlr"][2] disk_flux = record["flux"][2] bulge = galsim.Gaussian( flux=bulge_flux, half_light_radius=bulge_hlr, gsparams=gsparams ) disk = galsim.Exponential( flux=disk_flux, half_light_radius=disk_hlr, gsparams=gsparams ) if random_shape: # Apply shears for intrinsic shape. bulge_q = bdparams[11] bulge_beta = bdparams[15] * galsim.radians if bulge_q < 1.0: # pragma: no branch bulge = bulge.shear(q=bulge_q, beta=bulge_beta) # disk_q = bdparams[3] disk_beta = bdparams[7] * galsim.radians if disk_q < 1.0: # pragma: no branch disk = disk.shear(q=disk_q, beta=disk_beta) # Then combine the two components of the galaxy. # No center offset is included gal = bulge + disk else: gal_hlr = record["hlr"][0] gal_flux = record["flux"][0] gal = galsim.Gaussian( flux=gal_flux, half_light_radius=gal_hlr, ) if random_shape: # Apply shears for intrinsic shape. gal_q = sparams[3] gal_beta = sparams[7] * galsim.radians if gal_q < 1.0: gal = gal.shear(q=gal_q, beta=gal_beta) return gal
[docs] def generate_cosmos_gal(record, trunc_ratio=5.0, gsparams=None): """Generate a realistic COSMOS galaxy with Sersic/DeVaucouleurs profiles. Builds either a bulge+disk model (DeVaucouleurs bulge + Exponential disk) or a single Sersic profile, applying intrinsic ellipticity from the catalogue fit parameters. Profiles are optionally truncated at ``trunc_ratio * half_light_radius``. Based on the GalSim COSMOS scene implementation. Args: record (ndarray): One row of the COSMOS galaxy catalogue, containing ``bulgefit``, ``sersicfit``, ``use_bulgefit``, ``hlr``, and ``flux`` fields. trunc_ratio (float): Truncation radius in units of the half-light radius. Set to < 1 to disable truncation. gsparams (galsim.GSParams | None): GalSim rendering parameters. Returns: galsim.GSObject: GalSim galaxy object with intrinsic shape applied. """ # record columns: # For 'sersicfit', the result is an array of 8 numbers for each: # SERSICFIT[0]: intensity of light profile at the half-light radius. # SERSICFIT[1]: half-light radius measured along the major axis, in # units of pixels in the COSMOS lensing data reductions # (0.03 arcsec). # SERSICFIT[2]: Sersic n. # SERSICFIT[3]: q, the ratio of minor axis to major axis length. # SERSICFIT[4]: boxiness, currently fixed to 0, meaning isophotes are # all elliptical. # SERSICFIT[5]: x0, the central x position in pixels. # SERSICFIT[6]: y0, the central y position in pixels. # SERSICFIT[7]: phi, the position angle in radians. If phi=0, the major # axis is lined up with the x axis of the image. # For 'bulgefit', the result is an array of 16 parameters that comes from # doing a 2-component sersic fit. The first 8 are the parameters for the # disk, with n=1, and the last 8 are for the bulge, with n=4. bdparams = record["bulgefit"] sparams = record["sersicfit"] use_bulgefit = record["use_bulgefit"] if use_bulgefit: # Bulge parameters: # Minor-to-major axis ratio: bulge_hlr = record["hlr"][1] bulge_flux = record["flux"][1] disk_hlr = record["hlr"][2] disk_flux = record["flux"][2] if trunc_ratio <= 0.99: btrunc = None bulge = galsim.DeVaucouleurs( flux=bulge_flux, half_light_radius=bulge_hlr, gsparams=gsparams ) disk = galsim.Exponential( flux=disk_flux, half_light_radius=disk_hlr, gsparams=gsparams ) else: btrunc = bulge_hlr * trunc_ratio bulge = galsim.DeVaucouleurs( flux=bulge_flux, half_light_radius=bulge_hlr, trunc=btrunc, gsparams=gsparams, ) dtrunc = disk_hlr * trunc_ratio disk = galsim.Sersic( 1.0, flux=disk_flux, half_light_radius=disk_hlr, trunc=dtrunc, gsparams=gsparams, ) # Apply shears for intrinsic shape. bulge_q = bdparams[11] bulge_beta = bdparams[15] * galsim.radians if bulge_q < 1.0: # pragma: no branch bulge = bulge.shear(q=bulge_q, beta=bulge_beta) # disk_q = bdparams[3] disk_beta = bdparams[7] * galsim.radians if disk_q < 1.0: # pragma: no branch disk = disk.shear(q=disk_q, beta=disk_beta) # Then combine the two components of the galaxy. # No center offset is included gal = bulge + disk else: # Do a similar manipulation to the stored quantities for the single # Sersic profiles. gal_n = sparams[2] # Fudge this if it is at the edge of the allowed n values. Since # GalSim (as of #325 and #449) allow Sersic n in the range 0.3<=n<=6, # the only problem is that the fits occasionally go as low as n=0.2. # The fits in this file only go to n=6, so there is no issue with # too-high values, but we also put a guard on that side in case other # samples are swapped in that go to higher value of sersic n. if gal_n < 0.3: gal_n = 0.3 if gal_n > 6.0: gal_n = 6.0 # GalSim is much more efficient if only a finite number of Sersic n # values are used. This (optionally given constructor args) rounds n to # the nearest 0.05. change to 0.1 to speed up gal_n = _galsim_round_sersic(gal_n, 0.1) gal_hlr = record["hlr"][0] gal_flux = record["flux"][0] if trunc_ratio <= 0.99: strunc = None gal = galsim.Sersic( gal_n, flux=gal_flux, half_light_radius=gal_hlr, gsparams=gsparams, ) else: strunc = gal_hlr * trunc_ratio gal = galsim.Sersic( gal_n, flux=gal_flux, half_light_radius=gal_hlr, trunc=strunc, gsparams=gsparams, ) # Apply shears for intrinsic shape. gal_q = sparams[3] gal_beta = sparams[7] * galsim.radians if gal_q < 1.0: gal = gal.shear(q=gal_q, beta=gal_beta) return gal
def _generate_gal_fft(record, mag_zero, rng, gsparams): """Create a COSMOS galaxy with random rescaling and rotation (FFT path).""" gal0 = generate_cosmos_gal(record, gsparams=gsparams) # E.g., HSC's i-band coadds zero point is 27 flux = 10 ** ((mag_zero - record["mag_auto"]) / 2.5) gal0 = gal0.withFlux(flux) # rescale the radius by 'rescale' and keep surface brightness the # same rescale = rng.np.uniform(0.95, 1.05) gal0 = gal0.expand(rescale) # rotate by a random angle ang = (rng.np.uniform(0.0, np.pi * 2.0)) * galsim.radians gal0 = gal0.rotate(ang) return gal0 def _generate_gal_mc(record, mag_zero, rng, gsparams, npoints): """Create a COSMOS galaxy as RandomKnots (Monte-Carlo path).""" # need to truncate the profile since we do not want # Knots locate at infinity galp = generate_cosmos_gal_simple(record) # accounting for zeropoint difference between COSMOS HST and HSC # HSC's i-band coadds zero point is 27 flux = 10 ** ((mag_zero - record["mag_auto"]) / 2.5) galp = galp.withFlux(flux) # rescale the radius by 'rescale' and keep surface brightness the # same rescale = rng.np.uniform(0.95, 1.05) galp = galp.expand(rescale) # rotate by a random angle ang = (rng.np.uniform(0.0, np.pi * 2.0)) * galsim.radians galp = galp.rotate(ang) gal0 = galsim.RandomKnots( npoints=npoints, profile=galp, rng=rng, gsparams=gsparams, ) return gal0
[docs] def make_exposure_stamp( sim_method, rng, mag_zero, psf_obj, scale, cat_input, ngalx, ngaly, ngrid, rot_field, g1, g2, nrot_per_gal, do_shift, buff=0, draw_method="auto", ): """Render galaxies on a grid to build exposure stamps. Places galaxies on a regular grid, applies shear, convolves with the PSF, and draws images for each field rotation angle. Args: sim_method (str): Galaxy rendering method (``"fft"`` or ``"mc"``). rng (galsim.BaseDeviate): GalSim random number generator. mag_zero (float): Magnitude zero-point. psf_obj (galsim.GSObject): PSF object for convolution. scale (float): Pixel scale in arcseconds. cat_input (ndarray): COSMOS galaxy catalogue rows. ngalx (int): Number of galaxies along the x-axis. ngaly (int): Number of galaxies along the y-axis. ngrid (int): Stamp size in pixels per galaxy. rot_field (list[float]): Field rotation angles in radians. g1 (float): First shear component. g2 (float): Second shear component. nrot_per_gal (int): Number of shape-noise-cancelling rotations per galaxy. do_shift (bool): Whether to apply random sub-pixel shifts. buff (int): Zero-padding buffer around the image boundary. draw_method (str): GalSim drawing method. Returns: list[NDArray]: List of exposure arrays, one per field rotation. """ ngal = ngalx * ngaly gsparams = galsim.GSParams(maximum_fft_size=10240) gal0 = None gal_image_list = [] gal_input_list = [] npix_x = ngalx * ngrid + buff * 2.0 npix_y = ngaly * ngrid + buff * 2.0 for _ in range(len(rot_field)): gal_image = galsim.ImageF(npix_x, npix_y, scale=scale) gal_image.setOrigin(0, 0) gal_image_list.append(gal_image) gal_input_list.append([]) for i in range(ngal): # boundary ix = i % ngalx iy = i // ngalx # each galaxy irot = i % nrot_per_gal igal = i // nrot_per_gal if irot == 0: # prepare the base galaxy del gal0 if sim_method == "fft": gal0 = _generate_gal_fft( cat_input[igal], mag_zero, rng, gsparams, ) elif sim_method == "mc": gal0 = _generate_gal_mc( cat_input[igal], mag_zero, rng, gsparams, npoints=30, ) else: raise ValueError("Does not support sim_method=%s" % sim_method) else: # rotate the base galaxy assert gal0 is not None ang = np.pi / nrot_per_gal * galsim.radians # update gal0 gal0 = gal0.rotate(ang) s01 = rng.np.uniform(low=-0.5, high=0.5) * scale s02 = rng.np.uniform(low=-0.5, high=0.5) * scale for ii, rr in enumerate(rot_field): # base rotation and shear distortion gal = gal0.rotate(rr * galsim.radians).shear(g1=g1, g2=g2) gal = galsim.Convolve([psf_obj, gal], gsparams=gsparams) b = galsim.BoundsI( ix * ngrid + buff, (ix + 1) * ngrid - 1 + buff, iy * ngrid + buff, (iy + 1) * ngrid - 1 + buff, ) sub_img = gal_image_list[ii][b] # shift with a random offset s1, s2 = coord_rotate(s01, s02, xref=0, yref=0, theta=rr) if do_shift: gal = gal.shift(s1, s2) # shift to (ngrid//2, ngrid//2) # since I set it as the default center of grids in the simulation gal = gal.shift(0.5 * scale, 0.5 * scale) gal.drawImage(sub_img, add_to_image=True, method=draw_method) gc.collect() exposures = [img.array for img in gal_image_list] return exposures
[docs] def read_cosmos_catalog(filename): """Read a COSMOS galaxy catalogue from a FITS file. Args: filename (str): Path to the FITS catalogue file. Returns: ndarray: Structured array of catalogue rows. """ cat_input = fitsio.read(filename) return cat_input
[docs] class CosmosCatalog(object): """Filtered COSMOS galaxy catalogue for simulation input. Reads a COSMOS FITS catalogue and applies magnitude, half-light radius, and morphology-type filters. Args: filename (str): Path to the COSMOS FITS catalogue. max_mag (float | None): Upper magnitude cut (exclusive). min_mag (float | None): Lower magnitude cut (inclusive). max_hlr (float | None): Maximum half-light radius in arcseconds. min_hlr (float | None): Minimum half-light radius in arcseconds. gal_type (str): Galaxy morphology filter — ``"mixed"`` (all), ``"sersic"`` (single-component), ``"bulgedisk"`` (two-component), or ``"debug"`` (forced exponential). Attributes: cat_input (ndarray): Filtered catalogue array. ntrain (int): Number of galaxies after filtering. """ def __init__( self, filename=None, max_mag=None, min_mag=None, max_hlr=None, min_hlr=None, gal_type="mixed", ): src = read_cosmos_catalog(filename) # Initializing selector mask with all Trues sel = np.ones(len(src), dtype=bool) # Filtering conditions if max_mag is not None: sel &= src["mag_auto"] < max_mag if min_mag is not None: sel &= src["mag_auto"] >= min_mag if max_hlr is not None: if gal_type == "mixed": sel &= (src["hlr"][:, 0:3] < max_hlr).all(axis=1) elif gal_type == "sersic": sel &= src["hlr"][:, 0] < max_hlr elif gal_type == "bulgedisk": sel &= (src["hlr"][:, 1:3] < max_hlr).all(axis=1) if min_hlr is not None: if gal_type == "mixed": sel &= (src["hlr"][:, 0:3] >= max_hlr).all(axis=1) elif gal_type == "sersic": sel &= src["hlr"][:, 0] >= min_hlr elif gal_type == "bulgedisk": sel &= (src["hlr"][:, 1:3] >= max_hlr).all(axis=1) # Applying selector mask src = src[sel] if gal_type == "mixed": pass elif gal_type == "sersic": src = src[src["use_bulgefit"] == 0] elif gal_type == "bulgedisk": src = src[src["use_bulgefit"] != 0] elif gal_type == "debug": # This is used for debug src = src[src["use_bulgefit"] == 0] # src["sersicfit"][:, 3] = 1.0 # round galaxies # src["sersicfit"][:, 2] = 0.5 # only Gaussian src["sersicfit"][:, 2] = 1.0 # only Exponential else: raise ValueError("Do not support gal_type=%s" % gal_type) self.cat_input = src self.ntrain = len(src) return
[docs] def make_catalog(self, rng, n): """Draw *n* galaxies at random from the filtered catalogue. Args: rng (galsim.BaseDeviate): GalSim random number generator. n (int): Number of galaxies to sample. Returns: ndarray: Randomly sampled catalogue rows. Raises: ValueError: If the catalogue has fewer than *n* entries. """ if self.ntrain < n: raise ValueError( "There is not enough number of galaxies", ) # nrot_per_gal is the number of rotated galaxies in each subfield inds = rng.np.integers(low=0, high=self.ntrain, size=n) src = self.cat_input[inds] return src
[docs] def make_isolated_sim( ny, nx, psf_obj, gname, seed, cat_name=None, scale=0.168, mag_zero=27.0, rot_field=None, shear_value=0.02, ngrid=64, nrot_per_gal=4, max_mag=None, min_mag=None, max_hlr=None, min_hlr=None, gal_type="mixed", do_shift=False, npoints=30, sim_method="fft", buff=0, draw_method="auto", return_catalog=False, verbose=False, ): """Make an **isolated** galaxy image simulation on a regular grid. Galaxies are drawn from a COSMOS catalogue, placed on an ``(ny // ngrid) x (nx // ngrid)`` grid, sheared, convolved with a PSF, and rendered into pixel arrays. Args: ny (int): Number of pixels in the y-direction. nx (int): Number of pixels in the x-direction. psf_obj (galsim.GSObject): PSF object for convolution. gname (str): Shear specification string, e.g. ``"g1-0"`` or ``"g2-1"``. Format is ``"<component>-<index>"`` where *index* selects from ``[-shear_value, shear_value, 0]``. seed (int): Random seed for the simulation. cat_name (str | None): Path to a COSMOS FITS catalogue. Defaults to the bundled ``src_cosmos.fits``. scale (float): Pixel scale in arcseconds. mag_zero (float): Magnitude zero-point (27 for HSC). rot_field (list[float] | None): Field rotation angles in radians. Defaults to ``[0.0]``. shear_value (float): Amplitude of the applied shear. ngrid (int): Stamp size in pixels per galaxy. nrot_per_gal (int): Number of shape-noise-cancelling rotations. max_mag (float | None): Upper magnitude cut. min_mag (float | None): Lower magnitude cut. max_hlr (float | None): Maximum half-light radius in arcseconds. min_hlr (float | None): Minimum half-light radius in arcseconds. gal_type (str): Galaxy morphology filter (``"mixed"``, ``"sersic"``, ``"bulgedisk"``, or ``"debug"``). do_shift (bool): Whether to apply random sub-pixel offsets. npoints (int): Number of random knots (``"mc"`` method only). sim_method (str): Galaxy rendering method (``"fft"`` or ``"mc"``). buff (int): Zero-padding buffer in pixels. draw_method (str): GalSim drawing method. return_catalog (bool): If ``True``, also return the input catalogue. verbose (bool): Whether to show log info. Returns: list[NDArray] | tuple[list[NDArray], ndarray]: Exposure arrays, one per field rotation. If *return_catalog* is ``True``, returns ``(exposures, cat_input)``. """ if nx % ngrid != 0: raise ValueError("nx is not divisible by ngrid") if ny % ngrid != 0: raise ValueError("ny is not divisible by ngrid") # Basic parameters ngalx = int(nx // ngrid) ngaly = int(ny // ngrid) ngal = ngalx * ngaly rng = galsim.BaseDeviate(seed) ngeff = max(ngal // nrot_per_gal, 1) if cat_name is None: cat_name = os.path.join(_data_dir, "src_cosmos.fits") cosmos_cat = CosmosCatalog( filename=cat_name, max_mag=max_mag, min_mag=min_mag, max_hlr=max_hlr, min_hlr=min_hlr, gal_type=gal_type, ) cat_input = cosmos_cat.make_catalog(rng=rng, n=ngeff) # Get the shear information shear_list = np.array([-shear_value, shear_value, 0.0]) dis_version = int(eval(gname.split("-")[-1])) assert dis_version < 3 and dis_version >= 0, "gname is not supported" shear_const = shear_list[dis_version] g_version = gname.split("-")[0] if g_version == "g1": g1 = shear_const g2 = 0.0 elif g_version == "g2": g1 = 0.0 g2 = shear_const elif g_version == "g1_g2": g1 = shear_const g2 = 0.02 elif g_version == "g2_g1": g1 = 0.02 g2 = shear_const else: raise ValueError("cannot decide g1 or g2") if rot_field is None: rot_field = [0.0] exposures = make_exposure_stamp( sim_method=sim_method, rng=rng, mag_zero=mag_zero, psf_obj=psf_obj, scale=scale, cat_input=cat_input, ngalx=ngalx, ngaly=ngaly, ngrid=ngrid, rot_field=rot_field, g1=g1, g2=g2, nrot_per_gal=nrot_per_gal, do_shift=do_shift, buff=buff, draw_method=draw_method, ) if not return_catalog: return exposures else: return exposures, cat_input
[docs] def make_noise_sim( out_dir, infname, ind0, ny=6400, nx=6400, scale=0.168, verbose=False, ): """Generate a pure COSMOS-correlated noise image. Args: out_dir (str): Output directory (currently unused). infname (str): Path to the COSMOS noise correlation file. ind0 (int): Seed index for the random number generator. ny (int): Number of pixels in the y-direction. nx (int): Number of pixels in the x-direction. scale (float): Pixel scale in arcseconds. verbose (bool): Whether to show log info. Returns: NDArray: Noise image array of shape ``(ny, nx)``. """ variance = 0.01 ud = galsim.UniformDeviate(ind0 * 10000 + 1) # setup the galaxy image and the noise image noi_image = galsim.ImageF(nx, ny, scale=scale) noi_image.setOrigin(0, 0) noise_obj = galsim.getCOSMOSNoise( file_name=infname, rng=ud, cosmos_scale=scale, variance=variance ) noise_obj.applyTo(noi_image) return noi_image.array
[docs] def make_blended_sim( out_dir, psf_obj, gname, ind0, cat_name=None, ny=5000, nx=5000, rfrac=0.46, scale=0.168, mag_zero=27.0, rot_field=0.0, shear_value=0.02, nrot=4, rescale_min_max=None, verbose=False, ): """Make a COSMOS-like **blended** galaxy image simulation. Galaxies are randomly distributed within a circular aperture and sheared according to their photometric redshift bin. Args: out_dir (str): Output directory (used to infer galaxy density). psf_obj (galsim.GSObject): PSF object for convolution. gname (str): Shear specification string. ind0 (int): Random seed index. cat_name (str | None): Path to a COSMOS FITS catalogue. ny (int): Image height in pixels. nx (int): Image width in pixels. rfrac (float): Fraction of ``min(nx, ny)`` used as the circular aperture radius. scale (float): Pixel scale in arcseconds. mag_zero (float): Magnitude zero-point. rot_field (float): Additional field rotation in radians. shear_value (float): Amplitude of the applied shear. nrot (int): Number of shape-noise-cancelling rotations. rescale_min_max (list | NDArray | None): Lower and upper bounds for random galaxy size rescaling. verbose (bool): Whether to show log info. Returns: NDArray: Blended galaxy image array of shape ``(ny, nx)``. """ np.random.seed(ind0) bigfft = galsim.GSParams(maximum_fft_size=10240) # galsim setup # Get the shear information # Three choice on g(-shear_value,0,shear_value) shear_list = np.array([-shear_value, 0.0, shear_value]) shear_list = shear_list[[eval(i) for i in gname.split("-")[-1]]] # number of galaxy # we only have `ngeff' galsim galaxies but with `nrot' rotation r2 = (min(nx, ny) * rfrac) ** 2.0 density = int(out_dir.split("_psf")[0].split("_cosmo")[-1]) ngal = max(int(r2 * np.pi * scale**2.0 / 3600.0 * density), nrot) ngal = int(ngal // (nrot * 2) * (nrot * 2)) ngeff = ngal // (nrot * 2) if cat_name is None: cat_name = os.path.join(_data_dir, "src_cosmos.fits") # get the cosmos catalog cat_input = fitsio.read(cat_name) ntrain = len(cat_input) inds = np.random.randint(0, ntrain, ngeff) cat_input = cat_input[inds] # evenly distributed within a radius, min(nx,ny)*rfrac rarray = np.sqrt(r2 * np.random.rand(ngeff)) # radius tarray = np.random.uniform(0.0, np.pi / nrot, ngeff) # theta (0,pi/nrot) tarray = tarray + rot_field xarray = rarray * np.cos(tarray) + nx // 2 # x yarray = rarray * np.sin(tarray) + ny // 2 # y if rescale_min_max is None: rescale_min_max = [0.95, 1.05] rsarray = np.random.uniform(rescale_min_max[0], rescale_min_max[1], ngeff) del rarray, tarray zbound = np.array([1e-5, 0.5477, 0.8874, 1.3119, 12.0]) # sim 3 gal_image = galsim.ImageF(nx, ny, scale=scale) gal_image.setOrigin(0, 0) for ii in range(ngal): ig = ii // (nrot * 2) irot = ii % (nrot * 2) ss = cat_input[ig] # x,y xi = xarray[ig] yi = yarray[ig] # randomly rotate by an angle; we have 180/nrot deg pairs to remove # shape noise in additive bias estimation rot_ang = np.pi / nrot * irot ang = rot_ang * galsim.radians xi, yi = coord_rotate(xi, yi, nx // 2, ny // 2, rot_ang) # determine redshift shear_inds = np.where( (ss["zphot"] > zbound[:-1]) & (ss["zphot"] <= zbound[1:]) )[0] if len(shear_inds) == 1: if gname.split("-")[0] == "g1": g1 = shear_list[shear_inds][0] g2 = 0.0 elif gname.split("-")[0] == "g2": g1 = 0.0 g2 = shear_list[shear_inds][0] else: raise ValueError("g1 or g2 must be in gname") else: g1 = 0.0 g2 = 0.0 gal = generate_cosmos_gal(ss, gsparams=bigfft) # rescale the radius while keeping the surface brightness the same gal = gal.expand(rsarray[ig]) # determine and assign flux # (HSC's i-band coadds zero point is 27) # (LSST's coadds zero point is 30) flux = 10 ** ((mag_zero - ss["mag_auto"]) / 2.5) gal = gal.withFlux(flux) # rotate by 'ang' gal = gal.rotate(ang) # lensing shear gal = gal.shear(g1=g1, g2=g2) # position and subpixel offset xi, yi = coord_distort_1(xi, yi, nx // 2, ny // 2, g1, g2) xu = int(xi) yu = int(yi) dx = (0.5 + xi - xu) * scale dy = (0.5 + yi - yu) * scale gal = gal.shift(dx, dy) # PSF gal = galsim.Convolve([psf_obj, gal], gsparams=bigfft) # Bounary r_grid = max(gal.getGoodImageSize(scale), 32) rx1 = np.min([r_grid, xu]) rx2 = np.min([r_grid, nx - xu - 1]) rx = int(min(rx1, rx2)) del rx1, rx2 ry1 = np.min([r_grid, yu]) ry2 = np.min([r_grid, ny - yu - 1]) ry = int(min(ry1, ry2)) del ry1, ry2 # draw galaxy b = galsim.BoundsI(xu - rx, xu + rx - 1, yu - ry, yu + ry - 1) sub_img = gal_image[b] gal.drawImage(sub_img, add_to_image=True) del gal, b, sub_img, xu, yu, xi, yi, r_grid gc.collect() del cat_input, psf_obj return gal_image.array