Source code for satproc.chips

import logging
import os

import fiona
import numpy as np
import rasterio
from rasterio.crs import CRS
from shapely.geometry import box, shape
from shapely.ops import unary_union
from skimage import exposure
from skimage.io import imsave
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from satproc.masks import (
    all_masks_empty,
    multiband_chip_mask_by_classes,
    prepare_label_shapes,
    write_window_masks,
)
from satproc.utils import rescale_intensity, sliding_windows, write_chips_geojson

__author__ = "Damián Silvani"
__copyright__ = "Dymaxion Labs"
__license__ = "Apache-2.0"

_logger = logging.getLogger(__name__)


[docs]def extract_chips( rasters, aoi=None, labels=None, label_property="class", masks={"extent"}, mask_type="class", extent_no_border=False, rescale_mode=None, rescale_range=None, bands=None, chip_type="tif", write_footprints=False, classes=None, crs=None, skip_existing=True, within=False, windows_mode="whole_overlap", skip_low_contrast=False, skip_with_empty_mask=True, *, size, step_size, output_dir, ): if mask_type not in ("single", "class"): raise RuntimeError(f"mask type '{mask_type}' not implemented") if aoi: _logger.info("Prepare AOI shape") aoi_poly = prepare_aoi_shape(aoi) else: aoi_poly = None if labels: _logger.info("Prepare label shapes") polys_dict = prepare_label_shapes( labels, mask_type=mask_type, label_property=label_property, classes=classes ) else: polys_dict = None if step_size is None: step_size = size with logging_redirect_tqdm(): for raster in tqdm(rasters, desc="Rasters", ascii=True): extract_chips_from_raster( raster, size=size, step_size=step_size, rescale_mode=rescale_mode, rescale_range=rescale_range, bands=bands, output_dir=output_dir, chip_type=chip_type, within=within, write_footprints=write_footprints, crs=crs, labels=labels, label_property=label_property, classes=classes, masks=masks, mask_type=mask_type, extent_no_border=extent_no_border, aoi_poly=aoi_poly, polys_dict=polys_dict, windows_mode=windows_mode, skip_existing=skip_existing, skip_low_contrast=skip_low_contrast, skip_with_empty_mask=skip_with_empty_mask, )
[docs]def extract_chips_from_raster( raster, rescale_mode=None, rescale_range=None, bands=None, chip_type="tif", write_footprints=False, labels=None, label_property="class", mask_type="class", masks={"extent"}, classes=None, crs=None, skip_existing=True, within=False, aoi_poly=None, polys_dict=None, windows_mode="whole_overlap", skip_low_contrast=False, skip_with_empty_mask=True, extent_no_border=False, *, size, step_size, output_dir, ): if mask_type not in ("single", "class"): raise RuntimeError(f"mask type '{mask_type}' not implemented") # If mask type is single, there is a single class if mask_type == "single": classes = None if extent_no_border and "extent" not in masks: _logger.warn( ( "You specified `no_border` option but will be ignored " "(no `extent` mask specified)" ) ) if skip_existing: _logger.info("Will skip existing files") basename, _ = os.path.splitext(os.path.basename(raster)) with rasterio.open(raster) as ds: _logger.info("Raster size: %s", (ds.width, ds.height)) if any(b > ds.count for b in bands): raise RuntimeError( f"Raster has {ds.count} bands, " f"but you asked to use {bands} band indexes" ) if bands is None: bands = list(range(1, min(ds.count, 3) + 1)) _logger.info(( f"Building windows of size {size}, step size {step_size}" f"{' (no overlap)' if step_size >= size else ''}" )) win_size = (size, size) win_step_size = (step_size, step_size) windows = list( sliding_windows( win_size, win_step_size, ds.width, ds.height, mode=windows_mode ) ) _logger.info("Total windows: %d", len(windows)) _logger.info("Building window shapes") window_shapes = [ box(*rasterio.windows.bounds(w, ds.transform)) for w, _ in windows ] window_and_shapes = zip(windows, window_shapes) # Filter windows by AOI shape if aoi_poly: _logger.info("Filtering windows by AOI") _logger.info('Using "%s" function', "within" if within else "intersects") def filter_fn(w, aoi): if within: return w.within(aoi) else: return w.intersects(aoi) window_and_shapes = [ (w, s) for w, s in window_and_shapes if filter_fn(s, aoi_poly) ] _logger.info("Total windows after filtering: %d", len(window_and_shapes)) meta = ds.meta.copy() if crs: meta["crs"] = CRS.from_string(crs) if rescale_mode: # If rescaling, set nodata=0 (will rescale to uint8 1-255) meta["nodata"] = 0 chips = [] for c, ((window, (i, j)), win_shape) in tqdm( list(enumerate(window_and_shapes)), desc=f"{os.path.basename(raster)} windows", ascii=True, ): _logger.debug("%s %s", window, (i, j)) name = f"{basename}_{i}_{j}.{chip_type}" img_path = os.path.join(output_dir, "images", name) mask_paths = {kind: os.path.join(output_dir, kind, name) for kind in masks} # Gather list of required files required_files = {img_path} if labels: required_files = required_files | set(mask_paths.values()) # If all files already exist and we are skipping existing files, continue if skip_existing and all(os.path.exists(p) for p in required_files): continue img = ds.read(window=window) img = np.nan_to_num(img) img = np.array([img[b - 1, :, :] for b in bands]) if rescale_mode: img = rescale_intensity(img, rescale_mode, rescale_range) if skip_low_contrast and exposure.is_low_contrast(img): continue # Write mask file if labels: keys = classes if classes is not None else polys_dict.keys() mask_imgs = multiband_chip_mask_by_classes( classes=keys, transform=ds.transform, window=window, polys_dict=polys_dict, extent_mask_path=mask_paths.get("extent"), boundary_mask_path=mask_paths.get("boundary"), distance_mask_path=mask_paths.get("distance"), label_property=label_property, extent_no_border=extent_no_border, ) # If all masks are empty, skip this window if skip_with_empty_mask and all_masks_empty(mask_imgs): continue write_window_masks( mask_imgs, window=window, metadata=meta, transform=ds.transform ) chip = (win_shape, (c, i, j)) chips.append(chip) # Write image file if chip_type == "tif": write_tif( img, img_path, window=window, meta=meta.copy(), transform=ds.transform, bands=bands, skip_low_contrast=skip_low_contrast, ) else: write_image( img, img_path, skip_low_contrast=skip_low_contrast, ) if write_footprints: geojson_path = os.path.join(output_dir, "{}.geojson".format(basename)) _logger.info("Write chips footprints GeoJSON at %s", geojson_path) write_chips_geojson( geojson_path, chips, chip_type=chip_type, crs=str(meta["crs"]), basename=basename, )
[docs]def write_image(img, path, *, percentiles=None, skip_low_contrast=False): rgb = np.dstack(img[:3, :, :]).astype(np.uint8) if skip_low_contrast and exposure.is_low_contrast(rgb): return False os.makedirs(os.path.dirname(path), exist_ok=True) imsave(path, rgb) return True
[docs]def write_tif(img, path, *, skip_low_contrast=False, window, meta, transform, bands): os.makedirs(os.path.dirname(path), exist_ok=True) meta.update( { "driver": "GTiff", "dtype": img.dtype, "height": window.height, "width": window.width, "transform": rasterio.windows.transform(window, transform), "count": len(bands), } ) img = np.array([img[b - 1, :, :] for b in bands]) with rasterio.open(path, "w", **meta) as dst: dst.write(img) return True
[docs]def get_shape(feature): """Get shape geometry from feature Parameters ---------- feature : dict Feature as read from Fiona Returns ------- shapely.geometry.BaseGeometry """ geom = feature["geometry"] try: return shape(geom) except Exception as err: _logger.warn("Failed to get shape from feature %s: %s", feature, err) return None
[docs]def prepare_aoi_shape(aoi): with fiona.open(aoi) as src: aoi_polys = [get_shape(f) for f in src] aoi_polys = [shp for shp in aoi_polys if shp and shp.is_valid] aoi_poly = unary_union(aoi_polys) return aoi_poly