diff --git a/TopoPyScale/fetch_era5.py b/TopoPyScale/fetch_era5.py index 62db1bb..c5472ed 100644 --- a/TopoPyScale/fetch_era5.py +++ b/TopoPyScale/fetch_era5.py @@ -26,14 +26,13 @@ import shutil import glob from cdo import * +from munch import DefaultMunch # [ ] remove this dependencie era5_downloader bringing little value # [ ] create a class to execute download -import era5_downloader as era5down - import warnings warnings.filterwarnings("ignore", category=UserWarning) @@ -53,6 +52,206 @@ 'specific_humidity':'q'} + +class FetchERA5(): + """ + Class to fetch data from ERA5 based on config of loaded by the main class Topoclass.config or config loaded directly from yml file. + + Args: + config (str or DefaultMunch): point to a config object (inherited from Topoclass, or filepath to yml) + varoi_surf (list): list of surface variables to download. Default is None which will fetch only necessary for TPS + varoi_plev (list): list of pressure level variables to download. Default is None which will fetch only necessary for TPS + """ + + def __init__(self, config, varoi_surf=None, varoi_plev=None): + + if type(config) is DefaultMunch: + self.config = config + print("---> Config loaded") + elif type(config) is str: + self.config = self.load_config(config_file=config) + print(f"---> Config from {config} loaded") + else: + raise ValueError('Cannot find config. Check path to config, or config is a Munch.DefaultMunch object.') + + self.varoi_surf = varoi_surf + self.varoi_plev = varoi_plev + + # if keyword exists in config set realtime to value (True/False) + if self.config.climate[self.config.project.climate].realtime: + self.realtime = self.config.climate[self.config.project.climate].realtime + # make sure end date is correct + lastdate = fe.return_last_fullday() + self.config.project.end = self.config.project.end.replace(year=lastdate.year, + month=lastdate.month, + day=lastdate.day) + + # remove existing results (temporary results are ok and updated but cannot append to final files eg .outputs/downscaled/down_pt_0.nc) + try: + shutil.rmtree(self.config.outputs.path / "downscaled") + print('---> Downscaled directory cleaned') + os.makedirs(self.config.outputs.path / "downscaled") + except: + os.makedirs(self.config.outputs.path / "downscaled") + + # else set realtime to False + else: + self.realtime = False + + self.realtime = False # always false now as redownload of current month handled elsewhere ? key required only to dynamically set config.project.end + + @staticmethod + def load_config(config_file): + try: + with open(config_file, 'r') as f: + return DefaultMunch.fromYAML(f) + except IOError: + print(f'ERROR: config file does not exist. \n\t Current file path: {config_file}\n\t Current working directory: {os.getcwd()}') + + + def go_fetch(self, surf_plev='all'): + """ + Funtion to call fetching of ERA5 data + + """ + if surf_plev.lower() == 'surf': + self.get_plev = False + self.get_surf = True + elif surf_plev.lower() == 'plev': + self.get_plev = True + self.get_surf = False + + elif surf_plev.lower() == 'all': + self.get_plev = True + self.get_surf = True + + lonW = self.config.project.extent.get('lonW') - 0.4 + lonE = self.config.project.extent.get('lonE') + 0.4 + latN = self.config.project.extent.get('latN') + 0.4 + latS = self.config.project.extent.get('latS') - 0.4 + + # if keyword exists in config set out_format to value or default to netcdf + if self.config.climate[self.config.project.climate].cds_output_format: + output_format = self.config.climate[self.config.project.climate].cds_output_format + else: + output_format = 'netcdf' + + if self.config.climate[self.config.project.climate].cds_download_format: + download_format = self.config.climate[self.config.project.climate].cds_download_format + else: + download_format = 'unarchived' + + # Check data repository is currently supported + data_repository = self.config.climate[self.config.project.climate].data_repository + if data_repository == 'cds': + pass + else: + raise ValueError(f"Data repository {data_repository} not yet supported.") + + if self.get_surf: + # retreive ERA5 surface data + retrieve_era5( + self.config.climate[self.config.project.climate].product, + self.config.project.start, + self.config.project.end, + self.config.climate.path, + latN, latS, lonE, lonW, + self.config.climate[self.config.project.climate].timestep, + self.config.climate[self.config.project.climate].download_threads, + surf_plev='surf', + realtime=self.realtime, + output_format=output_format, + download_format=download_format, + new_CDS_API=True, + rm_daily=self.config.climate[self.config.project.climate].rm_daily, + surf_varoi=self.varoi_surf, + plev_varoi=self.varoi_plev + ) + if self.get_plev: + # retrieve era5 plevels + retrieve_era5( + self.config.climate[self.config.project.climate].product, + self.config.project.start, + self.config.project.end, + self.config.climate.path, + latN, latS, lonE, lonW, + self.config.climate[self.config.project.climate].timestep, + self.config.climate[self.config.project.climate].download_threads, + surf_plev='plev', + plevels=self.config.climate[self.config.project.climate].plevels, + realtime=self.realtime, + output_format=output_format, + download_format=download_format, + new_CDS_API=True, + rm_daily=self.config.climate[self.config.project.climate].rm_daily, + surf_varoi=self.varoi_surf, + plev_varoi=self.varoi_plev + ) + + + def get_era5_snowmapper(self, surf_plev, lastday): + """ + Funtion to call fetching of ERA5 data + TODO: + - merge monthly data into one file (cdo?)- this creates massive slow down! + """ + lonW = self.config.project.extent.get('lonW') - 0.4 + lonE = self.config.project.extent.get('lonE') + 0.4 + latN = self.config.project.extent.get('latN') + 0.4 + latS = self.config.project.extent.get('latS') - 0.4 + + + # if keyword exists in config set out_format to value or default to netcdf + if self.config.climate[self.config.project.climate].output_format: + output_format = self.config.climate[self.config.project.climate].output_format + # else set realtime to False + else: + output_format = 'netcdf' + + if surf_plev == 'surf': + # retreive ERA5 surface data + era5_request_surf_snowmapper( + lastday, + latN, latS, lonE, lonW, + self.config.climate.path, + output_format=output_format + ) + + if surf_plev == 'plev': + # retrieve era5 plevels + era5_request_plev_snowmapper( + lastday, + latN, latS, lonE, lonW, + self.config.climate.path, + plevels=self.config.climate[self.config.project.climate].plevels, + output_format=output_format + ) + + def to_zarr(self): + + # extract chunk size for time and pressure level + nlevels = len(self.config.climate[self.config.project.climate].plevels) + ntime = int(self.config.climate.era5.timestep[:-1]) * 365 + + print("Be aware that the conversion to Zarr requires plenty of memory, and may crash the console if resources are not sufficient") + + convert_netcdf_stack_to_zarr( + path_to_netcdfs= str(self.config.climate.path / 'yearly'), + zarrout= str(self.config.climate.path/'ERA5.zarr'), + chunks={'latitude':3, 'longitude':3, 'time':ntime, 'level':nlevels}, + compressor=None, + plev_name='PLEV_*.nc', + surf_name='SURF_*.nc', + parallelize=False, + n_cores=self.config.project.parallelization.setting.multicore.CPU_cores + ) + + + + + + + def append_nc_to_zarr_region(year, ncfile, zarrstore, surf=False): ncfile = str(ncfile) print(f"---> Appending {ncfile} to {zarrstore}") @@ -176,74 +375,6 @@ def to_zarr(ds, fout, chuncks={'x':3, 'y':3, 'time':8760, 'level':7}, compressor ds.chunk(chuncks).to_zarr(fout, zarr_format=3, encoding=encoder, mode=mode) -def fetch_era5_google(eraDir, startDate, endDate, lonW, latS, lonE, latN, plevels, step='3H',num_threads=1): - - print('\n') - print(f'---> Downloading ERA5 climate forcing from Google Cloud Storage') - - - bbox = (lonW, latS, lonE, latN) - time_step_dict = {'1H': (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23), - '3H': (0,3,6,9,12,15,18,21), - '6H': (0,6,12,18)} - - # prepare download of PLEVEL variables - local_plev_path = eraDir + "PLEV_{time:%Y%m%d}.nc" - era5_plevel = era5down.ERA5Downloader( - bbox_WSEN=bbox, # Bounding box: West, South, East, North - dest_path=local_plev_path, # can also be an S3 bucket with format "s3://bucket-name/era5-{region_name}/... - levels=plevels, # Pressure levels (can also be None) - region_name='dummy', - variables=[ - 'geopotential', 'temperature', 'u_component_of_wind', - 'v_component_of_wind', 'specific_humidity' - ], - time_steps = time_step_dict.get(step), - ) - - # prepare download of SURFACE variables - local_surf_path = eraDir + "SURF_{time:%Y%m%d}.nc" - era5_surf = era5down.ERA5Downloader( - bbox_WSEN=bbox, # Bounding box: West, South, East, North - dest_path=local_surf_path, # can also be an S3 bucket with format "s3://bucket-name/era5-{region_name}/... - levels=plevels, # Pressure levels (can also be None) - region_name='dummy', - variables=['geopotential_at_surface','2m_dewpoint_temperature', 'surface_thermal_radiation_downwards', - 'surface_solar_radiation_downwards','surface_pressure', - 'total_precipitation', '2m_temperature', 'toa_incident_solar_radiation', - 'friction_velocity', 'instantaneous_moisture_flux', 'instantaneous_surface_sensible_heat_flux' - ], - time_steps = time_step_dict.get(step), - ) - - - # date_range will make sure to include the month of the latest date (endDate) provided - date_vec = pd.date_range(startDate, pd.Timestamp(endDate)-pd.offsets.Day()+pd.offsets.MonthEnd(), freq='D', inclusive='both') - date_list = list(date_vec.strftime("%Y-%m-%d").values) - - if num_threads==1: - print("Downloading data sequentially") - for date in date_list: - era5_surf.download(date) - era5_plevel.download(date) - - elif num_threads>1: - print(f"Downloading data in parallel with {num_threads} threads.") - # pool multithread for fetching surface data - pool = ThreadPool(num_threads) - pool.starmap(era5_surf.download, zip(date_list)) - pool.close() - pool.join() - - # pool multithread for fetching surface data - pool = ThreadPool(num_threads) - pool.starmap(era5_plev.download, zip(date_list)) - pool.close() - pool.join() - else: - raise ValueError("download_type not available. Currently implemented: sequential, parallel") - - def fetch_era5_google_from_zarr(eraDir, startDate, endDate, lonW, latS, lonE, latN, plevels, bucket='gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3'): ''' diff --git a/TopoPyScale/topoclass.py b/TopoPyScale/topoclass.py index 522ea46..8ccd2ab 100644 --- a/TopoPyScale/topoclass.py +++ b/TopoPyScale/topoclass.py @@ -147,6 +147,7 @@ def __init__(self, config_file): self.toposub.project_directory = self.config.project.directory if self.config.project.climate.lower() == 'era5': + self.get_era5 = fe.FetchERA5(config=self.config) print("era5 download no longer automatically done in class decalaration - please declare in run.py file.") if self.config.project.climate.lower() == 'ifs_forecast': @@ -288,9 +289,6 @@ def extract_pts_param(self, method='nearest', **kwargs): self.toposub.df_centroids = df_centroids - def extract_grid_param(self): - return - def extract_topo_cluster_param(self): """ Function to segment a DEM in clusters and retain only the centroids of each cluster. @@ -732,100 +730,8 @@ def downscale_climate(self): shutil.rmtree(self.config.climate.tmp_path, ignore_errors=True) def get_era5(self): - """ - Funtion to call fetching of ERA5 data - - """ - lonW = self.config.project.extent.get('lonW') - 0.4 - lonE = self.config.project.extent.get('lonE') + 0.4 - latN = self.config.project.extent.get('latN') + 0.4 - latS = self.config.project.extent.get('latS') - 0.4 - - # if keyword exists in config set realtime to value (True/False) - if self.config.climate[self.config.project.climate].realtime: - realtime = self.config.climate[self.config.project.climate].realtime - # else set realtime to False - else: - realtime = False - - if realtime: # make sure end date is correct - lastdate = fe.return_last_fullday() - # 5th day will always be incomplete so we got to last fullday - self.config.project.end = self.config.project.end.replace(year=lastdate.year, month=lastdate.month, - day=lastdate.day) - - # remove existing results (temporary results are ok and updated but cannot append to final files eg .outputs/downscaled/down_pt_0.nc) - try: - shutil.rmtree(self.config.outputs.path / "downscaled") - print('---> Downscaled directory cleaned') - os.makedirs(self.config.outputs.path / "downscaled") - except: - os.makedirs(self.config.outputs.path / "downscaled") - - realtime = False # always false now as redownload of current month handled elsewhere ? key required only to dynamically set config.project.end - - - # if keyword exists in config set out_format to value or default to netcdf - if self.config.climate[self.config.project.climate].cds_output_format: - output_format = self.config.climate[self.config.project.climate].cds_output_format - else: - output_format = 'netcdf' - - if self.config.climate[self.config.project.climate].cds_download_format: - download_format = self.config.climate[self.config.project.climate].cds_download_format - else: - download_format = 'unarchived' - - data_repository = self.config.climate[self.config.project.climate].data_repository - if data_repository == 'cds': - # retreive ERA5 surface data - fe.retrieve_era5( - self.config.climate[self.config.project.climate].product, - self.config.project.start, - self.config.project.end, - self.config.climate.path, - latN, latS, lonE, lonW, - self.config.climate[self.config.project.climate].timestep, - self.config.climate[self.config.project.climate].download_threads, - surf_plev='surf', - realtime=realtime, - output_format=output_format, - download_format=download_format, - new_CDS_API=True, - rm_daily=self.config.climate[self.config.project.climate].rm_daily, - ) - # retrieve era5 plevels - fe.retrieve_era5( - self.config.climate[self.config.project.climate].product, - self.config.project.start, - self.config.project.end, - self.config.climate.path, - latN, latS, lonE, lonW, - self.config.climate[self.config.project.climate].timestep, - self.config.climate[self.config.project.climate].download_threads, - surf_plev='plev', - plevels=self.config.climate[self.config.project.climate].plevels, - realtime=realtime, - output_format=output_format, - download_format=download_format, - new_CDS_API=True, - rm_daily=self.config.climate[self.config.project.climate].rm_daily - ) - - elif data_repository == 'google_cloud_storage': - raise ValueError("WARNING: fetching ERA5 from Google Storage Cloud is not fully implemented yet") - - fe.fetch_era5_google(self.config.climate.path, self.config.project.start, - self.config.project.end, - lonW, - latS, - lonE, - latN, - self.config.climate[self.config.project.climate].plevels, - step=self.config.climate[self.config.project.climate].timestep, - num_threads=self.config.climate[self.config.project.climate].download_threads) - else: - raise ValueError("Config file for climate data specification issues.") + # to maintain backward compatibility and simplicity of use. + self.get_era5.go_fetch(surf_plev='all') def get_era5_snowmapper(self, surf_plev, lastday): diff --git a/doc/docs/07_features.md b/doc/docs/07_features.md index 1b38ccd..bc20171 100644 --- a/doc/docs/07_features.md +++ b/doc/docs/07_features.md @@ -12,6 +12,41 @@ name,x,y Using the setting spatial in `toposub` will lead you using the clustering method decribed in more details here below. +## Fetching ERA5 data from CDS + +As of now, TPS includes a class to implementing all the functionalities in dealing with the ERA5 data independent from `Topoclass`. This makes the process, now very long, of fetching data more flexible and robuts. The class eith inherits the config from `Topoclass`, or can be directly pointed towards the `config.yml` file. + +```python +from TopoPyScale import fetch_era5 as fe +from TopoPyScale import topoclass as tc + +config_file = 'config.yml' +mp = tc.Topoclass(config_file) +era = fe.FetchERA5(config=mp.config) +era.go_fetch() +``` + +The class integrate a number of options to make the downloading more flexible. For instance only download Surface Level data, with a given set of variables. + +```python +from TopoPyScale import fetch_era5 as fe +era = fe.FetchERA5(config='config.yml') + +varoi = ['geopotential_at_surface', + 'surface_thermal_radiation_downwards', + 'surface_solar_radiation_downwards', + 'surface_pressure', + 'total_precipitation', + '2m_temperature' + ] + +era.varoi_surf = varoi +era.go_fetch() +``` + +This way it is possible to use routines for TPS project but not only. + + ## Convert ERA5 to Zarr As explained in the (Datasets)[04_datasetSources.md], TPS relies on ERA5 data. In the future more datasets could be included. The minimum data required are at pressure levels `t,p,q,u,v`, and the surface level: