from __future__ import annotations
import sys
from math import floor
import itertools
import geopandas as gpd
import pandas as pd
from osgeo import gdal, ogr, osr
import numpy as np
from shapely.geometry import Polygon
from shapely.validation import make_valid, explain_validity
from . import projections
# from shapely.validation import make_valid, explain_validity
# import xarray as xr
# MultiPolygon
# import matplotlib.pyplot as plt
# from src.meteo.base import MeteoStation
# from src.core import projections
# from src.physiographic.base import Basin
# import rasterio
# from rasterio.features import shapes
# def polygonize_raster(raster_name: str):
# # https://gis.stackexchange.com/questions/187877/how-to-polygonize-raster-to-shapely-polygons
# # Polygonize both rasters and change the path name to the shp
# mask = None
# with rasterio.Env():
# with rasterio.open(raster_name) as src:
# image = src.read(1) # first band
# results = (
# {'properties': {'raster_val': v}, 'geometry': s}
# for i, (s, v) in enumerate(shapes(image, mask=mask, transform=src.transform)))
# geoms = list(results)
# # Convert the list into a gpd
# gdf = gpd.GeoDataFrame.from_features(geoms)
# # Open the tif file to get the epsg
# raster_ref = gdal.Open(raster_name, gdal.GA_ReadOnly)
# epsg = projections.get_proj_code(raster_ref)
# gdf = gdf.set_crs(epsg)
# gdf['validity'] = gdf.apply(
# lambda row: explain_validity(row.geometry), axis=1)
# gdf.geometry = gdf.apply(
# lambda row: make_valid(row.geometry), axis=1)
# print(gdf)
# gdf.geometry.make_valid()
# print(make_valid(gdf["geometry"]))
# gdf.geometry = make_valid(gdf.geometry)
# return gdf
# gdf.to_file(raster_name.replace("tif","shp"))
# print(gpd_d)
# self._Basin = self._Basin.replace(".tif", ".shp")
# self._SubBasins = self._SubBasins.replace(".tif", ".shp")
# gpd_d.to_file(self._Basin)
[docs]
def polygonize_raster(raster_name: str):
"""_summary_
Args:
raster_name (str): _description_
Returns:
_type_: NoData value from the input raster first band.
"""
# Open the file
src_ds = gdal.Open(raster_name)
# Define shp file attributes
srcband = src_ds.GetRasterBand(1)
nodata_value = srcband.GetNoDataValue()
dst_layername = 'polygonized'
drv = ogr.GetDriverByName("ESRI Shapefile")
# Set the export name
dst_ds = drv.CreateDataSource(raster_name.replace(".tif", ".shp"))
# Define the spatial reference based on the TIF attributes
sp_ref = osr.SpatialReference()
epsg = projections.get_proj_code(src_ds)
sp_ref.SetFromUserInput(epsg)
dst_layer = dst_ds.CreateLayer(dst_layername, srs=sp_ref)
# Rasterize the object
fld = ogr.FieldDefn("raster_val", ogr.OFTInteger)
dst_layer.CreateField(fld)
dst_field = dst_layer.GetLayerDefn().GetFieldIndex("raster_val")
gdal.Polygonize(srcband, None, dst_layer, dst_field, [], callback=None)
return nodata_value
[docs]
def fix_geometry(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""_summary_
https://gis.stackexchange.com/questions/430384/using-shapely-methods-explain-validity-and-make-valid-on-shapefile
Args:
gdf (gpd.GeoDataFrame): _description_
Returns:
gpd.GeoDataFrame: _description_
"""
gdf['validity'] = gdf.apply(
lambda row: explain_validity(row.geometry), axis=1)
gdf.geometry = gdf.apply(lambda row: make_valid(
row.geometry) if not row.geometry.is_valid == 'Valid Geometry' else row.geometry, axis=1)
return gdf
# def drop_duplicated_geometries(geoseries: gpd.GeoSeries):
# """
# taken from:
# https://ml-gis-service.com/index.php/2021/09/24/toolbox-drop-duplicated-geometries-from-geodataframe/
# Function drops duplicated geometries from a geoseries. It works as follow:
# 1. Take record from the dataset. Check it's index against list of indexes-to-skip. If it's not there then move to the next step.
# 2. Store record's index in the list of processed indexes (to re-create geoseries without duplicates) and in the list of indexes-to-skip.
# 3. Compare this record to all other records. If any of them is a duplicate then store its index in the indexes-to-skip.
# 4. If all records are checked then re-create dataframe without duplicates based on the list of processed indexes.
# INPUT:
# :param geoseries: (gpd.GeoSeries)
# OUTPUT:
# :returns: (gpd.GeoDataFrame)
# """
# indexes_to_skip = []
# processed_indexes = []
# for index, geom in geoseries.items():
# if index not in indexes_to_skip:
# processed_indexes.append(index)
# indexes_to_skip.append(index)
# for other_index, other_geom in geoseries.items():
# if other_index in indexes_to_skip:
# pass
# else:
# if geom.equals(other_geom):
# indexes_to_skip.append(other_index)
# else:
# pass
# output_gs = geoseries[processed_indexes].copy()
# return indexes_to_skip, processed_indexes
[docs]
def rasterize_shp(grid_shp: str,
ref_name: str,
field: str) -> np.ndarray:
"""_summary_
Args:
grid_shp (str): _description_
ref_name (str): _description_
field (str): _description_
Returns:
np.ndarray: _description_
"""
# Get raster georeference info
raster = gdal.Open(ref_name, gdal.GA_ReadOnly)
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = -transform[5]
x_res = raster.RasterXSize
y_res = raster.RasterYSize
shp = ogr.Open(grid_shp, gdal.GA_ReadOnly)
lyr = shp.GetLayer()
# Get shp info
# proj = lyr.GetSpatialRef()
# xmin, xmax, ymin, ymax = lyr.GetExtent()
# Get the resolution to the new raster
# Create tiff format file
# grid_raster = gdal.GetDriverByName('GTiff').Create(
# grid_shp.replace(".shp", ".tif"),
# x_res, y_res, 1, gdal.GDT_Int32)
# Create the raster in the memory
grid_raster = gdal.GetDriverByName('MEM').Create(
'', x_res, y_res, 1, gdal.GDT_Int32)
grid_raster.SetGeoTransform(raster.GetGeoTransform())
grid_raster.SetProjection(raster.GetProjection())
band = grid_raster.GetRasterBand(1)
band.SetNoDataValue(0)
gdal.RasterizeLayer(grid_raster, [1], lyr, options=[
"ATTRIBUTE="+field])
band = grid_raster.GetRasterBand(1)
data_array = band.ReadAsArray()
return data_array
[docs]
def rasterize_shp_as_byte(grid_shp: str,
ref_name: str, field: str,
name: str) -> None:
"""_summary_
Args:
grid_shp (str): _description_
ref_name (str): _description_
field (str): _description_
name (str): _description_
Returns:
_type_: _description_
"""
# Get raster georeference info
raster = gdal.Open(ref_name, gdal.GA_ReadOnly)
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = -transform[5]
x_res = raster.RasterXSize
y_res = raster.RasterYSize
# Get shp info
shp = ogr.Open(grid_shp, gdal.GA_ReadOnly)
lyr = shp.GetLayer()
# proj = lyr.GetSpatialRef()
# xmin, xmax, ymin, ymax = lyr.GetExtent()
# Get the resolution to the new raster
# Create tiff format file
# grid_raster = gdal.GetDriverByName('GTiff').Create(
# grid_shp.replace(".shp",".tif"),
# x_res, y_res, 1, gdal.GDT_Int32)
# Create the raster in the memory
grid_raster = gdal.GetDriverByName('GTiff').Create(
name, x_res, y_res, 1, gdal.GDT_Byte)
grid_raster.SetGeoTransform(raster.GetGeoTransform())
grid_raster.SetProjection(raster.GetProjection())
band = grid_raster.GetRasterBand(1)
band.SetNoDataValue(0)
gdal.RasterizeLayer(grid_raster, [1], lyr, options=[
"ATTRIBUTE="+field])
band = grid_raster.GetRasterBand(1)
data_array = band.ReadAsArray()
return data_array
[docs]
def get_altitude_point(DEM: gdal.Dataset,
lat_utm: np.array,
lon_utm: np.array):
"""_summary_
Args:
DEM (gdal.Dataset): _description_
lat_utm (np.array): _description_
lon_utm (np.array): _description_
Returns:
_type_: _description_
"""
xtup, ytup, ptup = GetExtent(DEM)
# (xmin, xmax), (ymin, ymax), (xpixel, ypixel)
# xmin, xmax, ymin, ymax, xpixel, ypixel = GetExtent(DEM)
DEM_array = DEM.ReadAsArray()
altitudes = []
for y, x in zip(lat_utm, lon_utm):
col = int((x-np.amin(xtup))/ptup[0])
row = int((np.amax(ytup)-y)/(-ptup[1]))
if col < 0 or row < 0:
altitudes.append(-9999)
elif row >= DEM_array.shape[0] or col > DEM_array.shape[1]:
altitudes.append(-9999)
else:
altitudes.append(DEM_array[row, col])
return altitudes
[docs]
def GetExtent(raster: gdal.Dataset):
""" Return list of corner coordinates from a gdal Dataset """
xmin, xpixel, _, ymax, _, ypixel = raster.GetGeoTransform()
width, height = raster.RasterXSize, raster.RasterYSize
xmax = xmin + width * xpixel
ymin = ymax + height * ypixel
return (xmin, xmax), (ymin, ymax), (xpixel, ypixel)
# return (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin)
[docs]
def regrid_CE(raster: gdal.Dataset,
shp: ogr.DataSource,
grid_size: int) -> gdal.Dataset:
"""_summary_
Args:
raster (gdal.Dataset): _description_
shp (ogr.DataSource): _description_
grid_size (int): _description_
Returns:
gdal.Dataset: _description_
"""
xmin, xpixel, _, ymax, _, ypixel = raster.GetGeoTransform()
width, height = raster.RasterXSize, raster.RasterYSize
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = transform[5]
# xmin = transform[0]
# ymin = transform[3]
xmax = xmin + width*xpixel
ymin = ymax + height*ypixel
lyr = shp.GetLayer()
proj = lyr.GetSpatialRef()
xmin, xmax, ymin, ymax = lyr.GetExtent()
# Get the resolution to the new raster
xres = int((xmax-xmin)/grid_size)
yres = int((ymax-ymin)/grid_size)
# yres = raster.RasterYSize
# print(xres,yres)
# GTiff
mem_ds = gdal.GetDriverByName("GTiff").Create(
'test.tif', abs(xres), abs(yres), 1, gdal.GDT_Int32)
mem_ds.SetProjection(proj.ExportToWkt())
mem_ds.SetGeoTransform((xmin, grid_size, 0, ymin, 0, grid_size))
band = mem_ds.GetRasterBand(1)
NoData_value = 0
band.SetNoDataValue(NoData_value)
gdal.RasterizeLayer(mem_ds, [1], lyr, options=["ATTRIBUTE=newid"])
return mem_ds
[docs]
def rasterize_feature(gdf: gpd.GeoDataFrame,
raster_name: str,
att: str) -> np.ndarray:
"""_summary_
Args:
gdf (gpd.GeoDataFrame): _description_
raster_name (str): _description_
att (str): _description_
Returns:
np.ndarray: _description_
"""
# Get raster georeference info
raster = gdal.Open(raster_name, gdal.GA_ReadOnly)
transform = raster.GetGeoTransform()
xOrigin = transform[0]
yOrigin = transform[3]
pixelWidth = transform[1]
pixelHeight = -transform[5]
# yOrigin1 = yOrigin + pixelHeight[5] * raster.RasterYSize
# Get no data value
# no_data = raster.GetRasterBand(1).GetNoDataValue()
# xmin, ymin, xmax, ymax = gdf.bounds
# Specify offset and rows and columns to read
xoff = floor((gdf['minx'] - xOrigin)/pixelWidth)
yoff = floor((yOrigin - gdf['maxy'])/pixelHeight)
xcount = int((gdf['maxx'] - gdf['minx'])/pixelWidth)
ycount = int((gdf['maxy'] - gdf['miny'])/pixelHeight)
# Get the projection
proj = osr.SpatialReference(wkt=raster.GetProjection())
# create the spatial reference system, WGS84
srs = osr.SpatialReference()
epsg = int(proj.GetAttrValue('AUTHORITY', 1))
srs.ImportFromEPSG(epsg)
# Create an in-memory vector dataset from the GeoDataFrame's geometry column
outDriver = ogr.GetDriverByName("MEMORY")
tempSHP = outDriver.CreateDataSource("")
if gdf.geometry.geom_type == "Polygon":
temp_layer = tempSHP.CreateLayer('feat', srs, ogr.wkbPolygon)
elif gdf.geometry.geom_type == "MultiPolygon":
temp_layer = tempSHP.CreateLayer('feat', srs, ogr.wkbMultiPolygon)
else:
sys.exit("Geometry type not valid. Fix it")
# Add an ID field
idField = ogr.FieldDefn(att, ogr.OFTInteger)
geom = ogr.CreateGeometryFromWkb(gdf['geometry'].wkb)
temp_layer.CreateField(idField)
# Create the feature and set values
featureDefn = temp_layer.GetLayerDefn()
feat = ogr.Feature(featureDefn)
feat.SetGeometry(geom)
feat.SetField(att, 1)
temp_layer.CreateFeature(feat)
target_ds = gdal.GetDriverByName('MEM').Create(
"", xcount, ycount, gdal.GDT_Byte)
target_ds.SetGeoTransform((gdf['minx'], pixelWidth, 0,
gdf['miny'], 0, pixelHeight,))
# Rasterize the in-memory vector dataset onto the mask array
gdal.RasterizeLayer(target_ds, [1],
temp_layer, options=["ATTRIBUTE="+att])
bandmask = target_ds.GetRasterBand(1)
datamask = bandmask.ReadAsArray()
banddataraster = raster.GetRasterBand(1)
# no_data = raster.GetRasterBand(1).GetNoDataValue()
# Mask array based on the nondata value
dataraster = banddataraster.ReadAsArray(
xoff, yoff, xcount, ycount)
# masked_dataraster = np.ma.masked_where(dataraster==no_data,dataraster)
# Change
# np.set_printoptions(threshold=sys.maxsize)
# print(datamask)
# print(dataraster)
# print(datamask.shape)
return datamask*dataraster
# def rasterize_shp2(gdf: gpd.GeoDataFrame,
# raster_name: str) -> np.ndarray:
# # Get affine transformation matrix from the raster dataset
# # Get raster georeference info
# raster = gdal.Open(raster_name, gdal.GA_ReadOnly)
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = -transform[5]
# # Get no data value
# # no_data = raster.GetRasterBand(1).GetNoDataValue()
# xmin, ymin, xmax, ymax = gdf.geometry.bounds
# # Specify offset and rows and columns to read
# xoff = ceil((xmin - xOrigin)/pixelWidth)
# yoff = int((yOrigin - ymax)/pixelHeight)
# xcount = int((xmax - xmin)/pixelWidth)
# ycount = int((ymax - ymin)/pixelHeight)
# proj = osr.SpatialReference(wkt=raster.GetProjection())
# # create the spatial reference system, WGS84
# srs = osr.SpatialReference()
# epsg = int(proj.GetAttrValue('AUTHORITY', 1))
# srs.ImportFromEPSG(epsg)
# # Create an in-memory vector dataset from the GeoDataFrame's geometry column
# driver = ogr.GetDriverByName('Memory')
# temp_ds = driver.CreateDataSource('')
# temp_layer = temp_ds.CreateLayer('temp', srs, ogr.wkbMultiPolygon)
# feature = ogr.Feature(temp_layer.GetLayerDefn())
# idField = ogr.FieldDefn("temp", ogr.OFTInteger)
# temp_layer.CreateField(idField)
# feature.SetField("temp", 1)
# # print(feature.GetField("temp"))
# # Make a geometry, from Shapely object
# geom = ogr.CreateGeometryFromWkb(gdf['geometry'].wkb)
# feature.SetGeometry(geom)
# temp_layer.CreateFeature(feature)
# # Create an in-memory temporal raster
# target_ds = gdal.GetDriverByName('MEM').Create(
# '', xcount, ycount, ogr.OFTInteger)
# target_ds.SetGeoTransform((
# xmin, pixelWidth, 0,
# ymax, 0, pixelHeight,
# ))
# bandmask = target_ds.GetRasterBand(1)
# bandmask.SetNoDataValue(0)
# bandmask.Fill(0)
# # Create for target raster the same projection as for the value raster
# raster_srs = osr.SpatialReference()
# raster_srs.ImportFromWkt(raster.GetProjectionRef())
# target_ds.SetProjection(raster_srs.ExportToWkt())
# # Rasterize the in-memory vector dataset onto the mask array
# gdal.RasterizeLayer(target_ds, [1], temp_layer, options=["ATTRIBUTE=temp"])
# bandmask = target_ds.GetRasterBand(1)
# datamask = bandmask.ReadAsArray()
# banddataraster = raster.GetRasterBand(1)
# dataraster = banddataraster.ReadAsArray(
# xoff, yoff, xcount, ycount)
# pass
# def zonal_statistics(feat,
# raster: gdal.Dataset,
# shp: ogr.DataSource,
# field: str):
# # https://pcjericks.github.io/py-gdalogr-cookbook/raster_layers.html#calculate-zonal-statistics
# # Get raster georeference info
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = transform[5]
# # Get no data value
# no_data = raster.GetRasterBand(1).GetNoDataValue()
# burn_value = int(feat.GetField(field))
# lyr = shp.GetLayer()
# # Get extent of feat
# geom = feat.GetGeometryRef()
# if (geom.GetGeometryName() == 'MULTIPOLYGON'):
# count = 0
# pointsX = []
# pointsY = []
# for polygon in geom:
# geomInner = geom.GetGeometryRef(count)
# ring = geomInner.GetGeometryRef(0)
# numpoints = ring.GetPointCount()
# for p in range(numpoints):
# lon, lat, z = ring.GetPoint(p)
# pointsX.append(lon)
# pointsY.append(lat)
# count += 1
# elif (geom.GetGeometryName() == 'POLYGON'):
# ring = geom.GetGeometryRef(0)
# numpoints = ring.GetPointCount()
# pointsX = []
# pointsY = []
# for p in range(numpoints):
# lon, lat, z = ring.GetPoint(p)
# pointsX.append(lon)
# pointsY.append(lat)
# else:
# sys.exit("ERROR: Geometry needs to be either Polygon or Multipolygon")
# xmin = min(pointsX)
# xmax = max(pointsX)
# ymin = min(pointsY)
# ymax = max(pointsY)
# # Specify offset and rows and columns to read
# xoff = ceil((xmin - xOrigin)/pixelWidth)
# yoff = int((yOrigin - ymax)/pixelWidth)
# xcount = int((xmax - xmin)/pixelWidth)
# ycount = int((ymax - ymin)/pixelWidth)
# # print(lyr.GetExtent())
# # print(xmin,xmax,ymin,ymax)
# # print(xoff,yoff,ycount,xcount)
# # Create memory target raster
# # GTiff
# target_ds = gdal.GetDriverByName('MEM').Create(
# '', xcount, ycount, gdal.GDT_Int16)
# target_ds.SetGeoTransform((
# xmin, pixelWidth, 0,
# ymax, 0, pixelHeight,
# ))
# bandmask = target_ds.GetRasterBand(1)
# bandmask.SetNoDataValue(0)
# bandmask.Fill(0)
# # Create for target raster the same projection as for the value raster
# raster_srs = osr.SpatialReference()
# raster_srs.ImportFromWkt(raster.GetProjectionRef())
# target_ds.SetProjection(raster_srs.ExportToWkt())
# # bandmask = target_ds.GetRasterBand(1)
# # Rasterize zone polygon to raster
# # query = field + " = '" + str(burn_value) +"'"
# query = "INTERSECTS(geometry, POLYGON((%s %s, %s %s, %s %s, %s %s, %s %s)))" % \
# (xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin)
# # Rasterize the shapefile with a value of 1 for all pixels inside the selected polygons
# gdal.RasterizeLayer(target_ds, [1], lyr, options=[
# 'WHERE="%s"' % query], burn_values=[1])
# # print(shp)
# # gdal.RasterizeLayer(target_ds, [1], lyr, burn_values=[1])
# # Read raster as arrays
# banddataraster = raster.GetRasterBand(1)
# dataraster = banddataraster.ReadAsArray(
# xoff, yoff, xcount, ycount)
# # bandmask = target_ds.GetRasterBand(1)
# datamask = bandmask.ReadAsArray()
# print(datamask)
# zoneraster = np.ma.masked_array(
# dataraster, np.logical_not(datamask))
# # print(dataraster)
# # row, col = centroid_index(feat, target_ds)
# # if zoneraster[abs(row), abs(col)] == 0:
# # return 0
# # else:
# # # Calculate statistics of zonal raster
# # return zoneraster[abs(row), abs(col)]
# # np.mean(zoneraster), np.median(zoneraster), np.std(zoneraster), np.var(zoneraster)
# return zoneraster
# def centroid_index(feat, raster: gdal.Dataset):
# transform = raster.GetGeoTransform()
# xOrigin = transform[0]
# yOrigin = transform[3]
# pixelWidth = transform[1]
# pixelHeight = transform[5]
# # Get the centroid
# geom_poly = feat.GetGeometryRef()
# centroid = geom_poly.Centroid()
# point = centroid.GetPoint(0)
# col = int((point[0] - xOrigin) / pixelWidth)
# row = int((yOrigin - point[1]) / pixelHeight)
# return row, col
[docs]
def get_index_list(raster: gdal.Dataset,
x: np.ndarray,
y: np.ndarray) -> tuple:
"""_summary_
Args:
raster (gdal.Dataset): _description_
x (np.ndarray): _description_
y (np.ndarray): _description_
Returns:
tuple: _description_
"""
transform = raster.GetGeoTransform()
xOrigin = transform[0]
yOrigin = transform[3]
pixelWidth = transform[1]
pixelHeight = -transform[5]
col = np.abs(((x - xOrigin) / pixelWidth)).astype(int)
row = np.abs(((yOrigin - y) / pixelHeight)).astype(int)
# xmin, xpixel, _, ymax, _, ypixel = raster.GetGeoTransform()
# width, height = raster.RasterXSize, raster.RasterYSize
# xmax = xmin + width*xpixel
# ymin = ymax + height*ypixel
# col = np.abs(((x - xmin) / width)).astype(int)
# row = np.abs(((ymax - y) / height)).astype(int)
return row, col
# def loop_zonal_stats(raster: gdal.Dataset,
# shp: ogr.DataSource):
# lyr = shp.GetLayer()
# featList = range(lyr.GetFeatureCount())
# statDict = {}
# # idField = ogr.FieldDefn("newid", ogr.OFTInteger)
# # lyr.CreateField(idField)
# for i, FID in enumerate(featList):
# feat = lyr.GetFeature(FID)
# # Get extent of feat
# CEval = zonal_statistics(feat, raster, shp)
# feat.SetField("newid", int(CEval))
# lyr.SetFeature(feat)
# return statDict
[docs]
def falls_in_extent(extent: tuple,
x: list,
y: list) -> np.ndarray:
"""_summary_
Args:
extent (tuple): _description_
x (list): _description_
y (list): _description_
Returns:
np.ndarray: _description_
"""
# Extract the extent from the given values
x_ext, y_ext = extent
# (lon, lat) -> (x, y)
xy_pairs = np.c_[list(itertools.product(x, y))]
idx, = np.where((xy_pairs[:, 0] <= np.amin(x_ext)) |
(xy_pairs[:, 0] >= np.amax(x_ext)) |
(xy_pairs[:, 1] <= np.amin(y_ext)) |
(xy_pairs[:, 1] >= np.amax(y_ext)))
xy_pairs[idx] = np.nan
return xy_pairs[~np.isnan(xy_pairs).any(axis=1), :]
# def remap_CEgrid(CEgrid: gdal.Dataset,
# fishnet: ogr.DataSource):
# array = CEgrid.ReadAsArray()
# dx, dy = np.where(array == 699)
# pass
[docs]
def find_nearest(array: np.ndarray, value: float) -> np.ndarray:
"""_summary_
https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array
Args:
array (np.ndarray): _description_
value (float): _description_
Returns:
np.ndarray: _description_
"""
# array = np.asarray(array)
# idx = (np.abs(array - value)).argmin()
return (np.abs(array - value)).argmin()
[docs]
def convert_multi_to_poly(geometry: ogr.Geometry) -> ogr.Geometry:
"""_summary_
Args:
geometry (ogr.Geometry): _description_
Returns:
ogr.Geometry: _description_
"""
# Convert the Multipolygon to a Polygon
# if geometry.GetGeometryCount() == 1:
# polygon = geometry.GetGeometryRef(0)
# else:
polygon = ogr.Geometry(ogr.wkbPolygon)
for i in range(geometry.GetGeometryCount()):
poly = geometry.GetGeometryRef(i)
for ring in range(poly.GetGeometryCount()):
polygon.AddGeometry(poly.GetGeometryRef(ring))
return polygon
[docs]
def ogr_to_gpd(shp: ogr.DataSource) -> gpd.GeoDataFrame:
"""_summary_
This function converst a input OGR object into a GeoDataFrame
The results is taken to be used in the merge function.
Or any other funtion that needs geopandas
Args:
shp (ogr.DataSource): _description_
Returns:
gpd.GeoDataFrame: _description_
"""
layer = shp.GetLayer()
srs = layer.GetSpatialRef()
col_names = [field.name for field in layer.schema]
col_names.append("geometry")
# Convert the layer to a pandas DataFrame
df = pd.DataFrame(columns=col_names,
index=range(len(layer)))
for i, feature in enumerate(layer):
geom2 = feature.GetGeometryRef()
geom2 = geom2.MakeValid()
if geom2.GetGeometryName() == 'MULTIPOLYGON':
polygon = convert_multi_to_poly(geom2)
print(polygon)
#
# num_rings = polygon.GetGeometryCount()
exterior_ring = polygon.GetGeometryRef(0)
else:
# num_rings = geom2.GetGeometryCount()
exterior_ring = geom2.GetGeometryRef(0)
num_points = exterior_ring.GetPointCount()
points = []
# Getting vextexes of each polygon
for j in range(num_points):
point = exterior_ring.GetPoint(j)
points.append(point)
for field in layer.schema:
df[field.name][i] = feature.GetField(field.name)
# print(geom2.GetGeometryName())
# if geom2.GetGeometryName()=="MULTIPOLYGON":
# df["geometry"][i] = MultiPolygon(points)
# else:
df["geometry"][i] = Polygon(points)
EPSG_code = srs.GetAttrValue('AUTHORITY', 1)
EPSG = f"EPSG:{EPSG_code}"
gdf = gpd.GeoDataFrame(df, crs=EPSG)
print(gdf)
return gdf
[docs]
def saveGTIFF(ref_TIF: str,
data_array: np.ndarray,
output_name: str):
ds = gdal.Open(ref_TIF, gdal.GA_ReadOnly)
[rows, cols] = data_array.shape
# Save the file depending on the file type name
type_name = data_array.dtype.base.name
if type_name in ["uint8"]:
gdal_type = gdal.GDT_Byte
if type_name in ["int16"]:
gdal_type = gdal.GDT_UInt16
if type_name in ["int32", "int64"]:
gdal_type = gdal.GDT_UInt32
elif type_name in ["float32", "float64"]:
gdal_type = gdal.GDT_Float32
# type_name = data_type.base.name
driver = gdal.GetDriverByName("GTiff")
outdata = driver.Create(output_name, cols, rows, 1, gdal_type)
# sets same geotransform as input
outdata.SetGeoTransform(ds.GetGeoTransform())
outdata.SetProjection(ds.GetProjection()) # sets same projection as input
outdata.GetRasterBand(1).WriteArray(data_array)
# if you want these values transparent
outdata.GetRasterBand(1).SetNoDataValue(-9999)
outdata.FlushCache() # saves to disk!!
[docs]
def convert_slope(slope_file_path: str) -> np.ndarray:
ds = gdal.Open(slope_file_path, gdal.GA_ReadOnly)
band = ds.GetRasterBand(1)
nan_val = band.GetNoDataValue()
array_d = band.ReadAsArray()
# convert the slope from degrees to m/m -> atan(slope)
array_d[array_d == nan_val] = np.nan
array_d = np.deg2rad(array_d)
array_d = np.arctan(array_d)
return array_d
[docs]
def reclassify_landcover(LC_file_path: str, classes_idx: list, output_tif_name: str):
ds = gdal.Open(LC_file_path, gdal.GA_ReadOnly)
band = ds.GetRasterBand(1)
nan_val = band.GetNoDataValue()
if nan_val == None:
nan_val = 0
array_d = band.ReadAsArray()
array_d[array_d == 0] = nan_val
reclass_array = np.copy(array_d)
# This is class name for cequeau. Just for reference
# classes_name = ["forest","bare","wetlands","water","urban"]
# Loop into each of the classes from the land cover
for land_type, ceq_class in zip(classes_idx, [1, 7, 14, 18, 16]):
# idx = np.stack([land_type])
for idx in land_type:
reclass_array[array_d == idx] = ceq_class
# np.delete(a, idx[:, 1:])
saveGTIFF(LC_file_path, reclass_array, output_tif_name)
return 0
# Function to retrieve the outlet point of the basin
[docs]
def get_outlet_point(FAC_path: str):
ds = gdal.Open(FAC_path, gdal.GA_ReadOnly)
band = ds.GetRasterBand(1)
array_d = band.ReadAsArray()
i, j = np.where(array_d == np.amax(array_d))
xoff, a, b, yoff, d, e = ds.GetGeoTransform()
xp = a * j[0] + b * i[0] + xoff
yp = d * j[0] + e * i[0] + yoff
return xp, yp