Source code for pycequeau.core.netcdf
import xarray as xr
import numpy as np
import pandas as pd
import os
[docs]
def clip_netcdf(nc_files_path: str,
bounds: list,
output_path: str):
"""_summary_
Args:
nc_files_path (str): _description_
bounds (list): _description_
output_path (str): _description_
"""
nc_files_list = os.listdir(nc_files_path)
nc_files_list = [i for i in nc_files_list if i.endswith(".nc")]
nc_files_list_path = [os.path.join(nc_files_path, i)
for i in nc_files_list if i.endswith(".nc")]
for i, file in enumerate(nc_files_list_path):
nc_data = xr.open_dataset(file, engine="netcdf4")
coords_names = list(nc_data.coords)
# Rename the latitudes-longitudes
lat = "lat"
lon = "lon"
if "longitude" in coords_names:
nc_data = nc_data.rename({"longitude": "lon",
"latitude": "lat"})
if nc_data[lon].min() > 0:
min_lon = bounds[0]+360
max_lon = bounds[1]+360
else:
min_lon = bounds[0]
max_lon = bounds[1]
if nc_data[lat][0] > nc_data[lat][-1]:
min_lat = bounds[2]
max_lat = bounds[3]
else:
min_lat = bounds[3]
max_lat = bounds[2]
sliced_nc = nc_data.sel(**{lat: slice(max_lat, min_lat),
lon: slice(min_lon, max_lon)})
dimensions = list(sliced_nc.dims)
nc_var_names = list(sliced_nc.variables)
for dim in dimensions:
if dim in nc_var_names:
nc_var_names.remove(dim)
if "time_bnds" in nc_var_names:
nc_var_names.remove("time_bnds")
if nc_files_list[i] == "tmax.nc":
sliced_nc = sliced_nc.rename({"tmin": "tmax"})
path = os.path.join(output_path, nc_files_list[i])
if os.path.isfile(path):
os.remove(path)
sliced_nc.to_netcdf(path)
[docs]
def intermidiate_interpolation(ds: xr.Dataset, scale: int):
lon = ds["lon"].values
lat = ds["lat"].values
dx_original = lon[1] - lon[0]
dy_original = lat[1] - lat[0]
# Get the objectvie dx dy
dx = dx_original/scale
dy = dy_original/scale
# Create new vector for lat lon
lon_objective = np.arange(lon[0], lon[-1], dx)
lat_objective = np.arange(lat[0], lat[-1], dy)
dsi = ds.interp(time=ds["time"], lat=lat_objective,
lon=lon_objective, method="nearest")
return dsi
[docs]
def fix_calendar(ds: xr.Dataset):
# set all the calendars as gregorian
calendar_name = ds.time.dt.calendar
align_on = None
if calendar_name == "360_day":
align_on = "year"
# Check date range adn fix it
if calendar_name != "gregorian":
# align_on = "year"
ds = ds.convert_calendar(
"gregorian", missing=np.NaN, align_on=align_on)
# List variables to fill-up missing dates due t calendar changes
var_list = list(ds.variables.keys())
var_list.remove("time")
var_list.remove("lat")
var_list.remove("lon")
for var in var_list:
ds[var] = ds[var].interpolate_na("time", method="linear")
dtime = ds["time"].dt.strftime("%Y-%m-%d")
date_range = [str(date_) for date_ in dtime.values]
date_time = xr.cftime_range(start=str(date_range[0]),
end=str(date_range[-1]),
freq="1D", calendar="gregorian")
ds["time"] = date_time
return ds