Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 201 additions & 70 deletions TopoPyScale/fetch_era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import shutil
import glob
from cdo import *
from munch import DefaultMunch


# [ ] remove this dependencie era5_downloader bringing little value
# [ ] create a class to execute download


import era5_downloader as era5down

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

Expand All @@ -53,6 +52,206 @@
'specific_humidity':'q'}



class FetchERA5():
"""
Class to fetch data from ERA5 based on config of loaded by the main class Topoclass.config or config loaded directly from yml file.

Args:
config (str or DefaultMunch): point to a config object (inherited from Topoclass, or filepath to yml)
varoi_surf (list): list of surface variables to download. Default is None which will fetch only necessary for TPS
varoi_plev (list): list of pressure level variables to download. Default is None which will fetch only necessary for TPS
"""

def __init__(self, config, varoi_surf=None, varoi_plev=None):

if type(config) is DefaultMunch:
self.config = config
print("---> Config loaded")
elif type(config) is str:
self.config = self.load_config(config_file=config)
print(f"---> Config from {config} loaded")
else:
raise ValueError('Cannot find config. Check path to config, or config is a Munch.DefaultMunch object.')

self.varoi_surf = varoi_surf
self.varoi_plev = varoi_plev

# if keyword exists in config set realtime to value (True/False)
if self.config.climate[self.config.project.climate].realtime:
self.realtime = self.config.climate[self.config.project.climate].realtime
# make sure end date is correct
lastdate = fe.return_last_fullday()
self.config.project.end = self.config.project.end.replace(year=lastdate.year,
month=lastdate.month,
day=lastdate.day)

# remove existing results (temporary results are ok and updated but cannot append to final files eg .outputs/downscaled/down_pt_0.nc)
try:
shutil.rmtree(self.config.outputs.path / "downscaled")
print('---> Downscaled directory cleaned')
os.makedirs(self.config.outputs.path / "downscaled")
except:
os.makedirs(self.config.outputs.path / "downscaled")

# else set realtime to False
else:
self.realtime = False

self.realtime = False # always false now as redownload of current month handled elsewhere ? key required only to dynamically set config.project.end

@staticmethod
def load_config(config_file):
try:
with open(config_file, 'r') as f:
return DefaultMunch.fromYAML(f)
except IOError:
print(f'ERROR: config file does not exist. \n\t Current file path: {config_file}\n\t Current working directory: {os.getcwd()}')


def go_fetch(self, surf_plev='all'):
"""
Funtion to call fetching of ERA5 data

"""
if surf_plev.lower() == 'surf':
self.get_plev = False
self.get_surf = True
elif surf_plev.lower() == 'plev':
self.get_plev = True
self.get_surf = False

elif surf_plev.lower() == 'all':
self.get_plev = True
self.get_surf = True

lonW = self.config.project.extent.get('lonW') - 0.4
lonE = self.config.project.extent.get('lonE') + 0.4
latN = self.config.project.extent.get('latN') + 0.4
latS = self.config.project.extent.get('latS') - 0.4

# if keyword exists in config set out_format to value or default to netcdf
if self.config.climate[self.config.project.climate].cds_output_format:
output_format = self.config.climate[self.config.project.climate].cds_output_format
else:
output_format = 'netcdf'

if self.config.climate[self.config.project.climate].cds_download_format:
download_format = self.config.climate[self.config.project.climate].cds_download_format
else:
download_format = 'unarchived'

# Check data repository is currently supported
data_repository = self.config.climate[self.config.project.climate].data_repository
if data_repository == 'cds':
pass
else:
raise ValueError(f"Data repository {data_repository} not yet supported.")

if self.get_surf:
# retreive ERA5 surface data
retrieve_era5(
self.config.climate[self.config.project.climate].product,
self.config.project.start,
self.config.project.end,
self.config.climate.path,
latN, latS, lonE, lonW,
self.config.climate[self.config.project.climate].timestep,
self.config.climate[self.config.project.climate].download_threads,
surf_plev='surf',
realtime=self.realtime,
output_format=output_format,
download_format=download_format,
new_CDS_API=True,
rm_daily=self.config.climate[self.config.project.climate].rm_daily,
surf_varoi=self.varoi_surf,
plev_varoi=self.varoi_plev
)
if self.get_plev:
# retrieve era5 plevels
retrieve_era5(
self.config.climate[self.config.project.climate].product,
self.config.project.start,
self.config.project.end,
self.config.climate.path,
latN, latS, lonE, lonW,
self.config.climate[self.config.project.climate].timestep,
self.config.climate[self.config.project.climate].download_threads,
surf_plev='plev',
plevels=self.config.climate[self.config.project.climate].plevels,
realtime=self.realtime,
output_format=output_format,
download_format=download_format,
new_CDS_API=True,
rm_daily=self.config.climate[self.config.project.climate].rm_daily,
surf_varoi=self.varoi_surf,
plev_varoi=self.varoi_plev
)


def get_era5_snowmapper(self, surf_plev, lastday):
"""
Funtion to call fetching of ERA5 data
TODO:
- merge monthly data into one file (cdo?)- this creates massive slow down!
"""
lonW = self.config.project.extent.get('lonW') - 0.4
lonE = self.config.project.extent.get('lonE') + 0.4
latN = self.config.project.extent.get('latN') + 0.4
latS = self.config.project.extent.get('latS') - 0.4


# if keyword exists in config set out_format to value or default to netcdf
if self.config.climate[self.config.project.climate].output_format:
output_format = self.config.climate[self.config.project.climate].output_format
# else set realtime to False
else:
output_format = 'netcdf'

if surf_plev == 'surf':
# retreive ERA5 surface data
era5_request_surf_snowmapper(
lastday,
latN, latS, lonE, lonW,
self.config.climate.path,
output_format=output_format
)

if surf_plev == 'plev':
# retrieve era5 plevels
era5_request_plev_snowmapper(
lastday,
latN, latS, lonE, lonW,
self.config.climate.path,
plevels=self.config.climate[self.config.project.climate].plevels,
output_format=output_format
)

def to_zarr(self):

# extract chunk size for time and pressure level
nlevels = len(self.config.climate[self.config.project.climate].plevels)
ntime = int(self.config.climate.era5.timestep[:-1]) * 365

print("Be aware that the conversion to Zarr requires plenty of memory, and may crash the console if resources are not sufficient")

convert_netcdf_stack_to_zarr(
path_to_netcdfs= str(self.config.climate.path / 'yearly'),
zarrout= str(self.config.climate.path/'ERA5.zarr'),
chunks={'latitude':3, 'longitude':3, 'time':ntime, 'level':nlevels},
compressor=None,
plev_name='PLEV_*.nc',
surf_name='SURF_*.nc',
parallelize=False,
n_cores=self.config.project.parallelization.setting.multicore.CPU_cores
)







def append_nc_to_zarr_region(year, ncfile, zarrstore, surf=False):
ncfile = str(ncfile)
print(f"---> Appending {ncfile} to {zarrstore}")
Expand Down Expand Up @@ -176,74 +375,6 @@ def to_zarr(ds, fout, chuncks={'x':3, 'y':3, 'time':8760, 'level':7}, compressor
ds.chunk(chuncks).to_zarr(fout, zarr_format=3, encoding=encoder, mode=mode)


def fetch_era5_google(eraDir, startDate, endDate, lonW, latS, lonE, latN, plevels, step='3H',num_threads=1):

print('\n')
print(f'---> Downloading ERA5 climate forcing from Google Cloud Storage')


bbox = (lonW, latS, lonE, latN)
time_step_dict = {'1H': (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23),
'3H': (0,3,6,9,12,15,18,21),
'6H': (0,6,12,18)}

# prepare download of PLEVEL variables
local_plev_path = eraDir + "PLEV_{time:%Y%m%d}.nc"
era5_plevel = era5down.ERA5Downloader(
bbox_WSEN=bbox, # Bounding box: West, South, East, North
dest_path=local_plev_path, # can also be an S3 bucket with format "s3://bucket-name/era5-{region_name}/...
levels=plevels, # Pressure levels (can also be None)
region_name='dummy',
variables=[
'geopotential', 'temperature', 'u_component_of_wind',
'v_component_of_wind', 'specific_humidity'
],
time_steps = time_step_dict.get(step),
)

# prepare download of SURFACE variables
local_surf_path = eraDir + "SURF_{time:%Y%m%d}.nc"
era5_surf = era5down.ERA5Downloader(
bbox_WSEN=bbox, # Bounding box: West, South, East, North
dest_path=local_surf_path, # can also be an S3 bucket with format "s3://bucket-name/era5-{region_name}/...
levels=plevels, # Pressure levels (can also be None)
region_name='dummy',
variables=['geopotential_at_surface','2m_dewpoint_temperature', 'surface_thermal_radiation_downwards',
'surface_solar_radiation_downwards','surface_pressure',
'total_precipitation', '2m_temperature', 'toa_incident_solar_radiation',
'friction_velocity', 'instantaneous_moisture_flux', 'instantaneous_surface_sensible_heat_flux'
],
time_steps = time_step_dict.get(step),
)


# date_range will make sure to include the month of the latest date (endDate) provided
date_vec = pd.date_range(startDate, pd.Timestamp(endDate)-pd.offsets.Day()+pd.offsets.MonthEnd(), freq='D', inclusive='both')
date_list = list(date_vec.strftime("%Y-%m-%d").values)

if num_threads==1:
print("Downloading data sequentially")
for date in date_list:
era5_surf.download(date)
era5_plevel.download(date)

elif num_threads>1:
print(f"Downloading data in parallel with {num_threads} threads.")
# pool multithread for fetching surface data
pool = ThreadPool(num_threads)
pool.starmap(era5_surf.download, zip(date_list))
pool.close()
pool.join()

# pool multithread for fetching surface data
pool = ThreadPool(num_threads)
pool.starmap(era5_plev.download, zip(date_list))
pool.close()
pool.join()
else:
raise ValueError("download_type not available. Currently implemented: sequential, parallel")



def fetch_era5_google_from_zarr(eraDir, startDate, endDate, lonW, latS, lonE, latN, plevels, bucket='gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3'):
'''
Expand Down
Loading