import asyncio
import concurrent.futures
import functools
import glob
import json
import os
import shutil
from pathlib import Path
from . import download_utils
METADATA_FILENAME = ".metadata.json"
FILE_EXT = [
"gpml",
"gpmlz",
"gpml.gz",
"dat",
"pla",
"shp",
"geojson",
"json",
".gpkg",
"gmt",
"vgp",
]
[docs]class PlateModel:
"""Class to manage a plate model"""
def __init__(self, model_name: str, model_cfg=None, data_dir=".", readonly=False):
"""Constructor
:param model_name: model name
:param model_cfg: model configuration in JSON format
:param data_dir: the folder path of the model data
:param readonly: this will return whatever local folder has. Will not attempt to download data from internet
"""
self.model_name = model_name.lower()
self.meta_filename = METADATA_FILENAME
self.model = model_cfg
self.readonly = readonly
self.data_dir = data_dir
self.model_dir = f"{self.data_dir}/{self.model_name}/"
if readonly:
if not PlateModel.is_model_dir(self.model_dir):
raise Exception(
f"{self.model_dir} must be valid model dir in readonly mode."
)
else:
with open(f"{self.model_dir}/.metadata.json", "r") as f:
self.model = json.load(f)
if not readonly:
# async and concurrent things
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=15)
self.loop = asyncio.new_event_loop()
self.run = functools.partial(self.loop.run_in_executor, self.executor)
asyncio.set_event_loop(self.loop)
def __getstate__(self):
attributes = self.__dict__.copy()
del attributes["executor"]
del attributes["loop"]
del attributes["run"]
return attributes
def __setstate__(self, state):
self.__dict__ = state
if not self.readonly:
# async and concurrent things
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=15)
self.loop = asyncio.new_event_loop()
self.run = functools.partial(self.loop.run_in_executor, self.executor)
asyncio.set_event_loop(self.loop)
def __del__(self):
if not self.readonly:
self.loop.close()
[docs] def get_cfg(self):
return self.model
[docs] def get_model_dir(self):
if PlateModel.is_model_dir(self.model_dir):
return self.model_dir
elif not self.readonly:
return self.create_model_dir()
else:
raise Exception(
f"The model dir {self.model_dir} is invalid and could not create it (in readonly mode)."
)
[docs] def get_data_dir(self):
return self.data_dir
[docs] def set_data_dir(self, new_dir):
self.data_dir = new_dir
self.model_dir = f"{self.data_dir}/{self.model_name}/"
[docs] def get_big_time(self):
return self.model["BigTime"]
[docs] def get_small_time(self):
return self.model["SmallTime"]
[docs] def get_avail_layers(self):
"""get all available layers in this plate model"""
if not self.model:
raise Exception("Fatal: No model configuration found!")
return list(self.model["Layers"].keys())
[docs] def get_rotation_model(self):
"""return a list of rotation files"""
if not self.readonly:
rotation_folder = self.download_layer_files("Rotations")
else:
rotation_folder = f"{self.model_dir}/Rotations"
rotation_files = glob.glob(f"{rotation_folder}/*.rot")
rotation_files.extend(glob.glob(f"{rotation_folder}/*.grot"))
# print(rotation_files)
return rotation_files
[docs] def get_coastlines(self):
"""return coastlines feature collection"""
return self.get_layer("Coastlines")
[docs] def get_static_polygons(self):
"""return StaticPolygons feature collection"""
return self.get_layer("StaticPolygons")
[docs] def get_continental_polygons(self):
"""return ContinentalPolygons feature collection"""
return self.get_layer("ContinentalPolygons")
[docs] def get_topologies(self):
"""return Topologies feature collection"""
return self.get_layer("Topologies")
[docs] def get_COBs(self):
"""return COBs feature collection"""
return self.get_layer("COBs")
[docs] def get_layer(self, layer_name):
"""get layer files by name
:param layer_name: layer name
:returns: a list of file names
"""
if not self.readonly:
layer_folder = self.download_layer_files(layer_name)
else:
layer_folder = f"{self.model_dir}/{layer_name}"
files = []
for ext in FILE_EXT:
files.extend(glob.glob(f"{layer_folder}/*.{ext}"))
return files
[docs] def get_raster(self, raster_name, time):
"""return a local path for the raster
:returns: a local path of the raster file
"""
if not "TimeDepRasters" in self.model:
raise Exception("No time-dependent rasters found in this model.")
if not raster_name in self.model["TimeDepRasters"]:
raise Exception(
f"Time-dependent rasters ({raster_name}) not found in this model. {self.model['TimeDepRasters']}"
)
url = self.model["TimeDepRasters"][raster_name].format(time)
if not self.readonly:
self.download_raster(url, f"{self.get_model_dir()}/Rasters/{raster_name}")
file_name = url.split("/")[-1]
local_path = f"{self.get_model_dir()}/Rasters/{raster_name}/{file_name}"
if os.path.isfile(local_path):
return local_path
elif self.readonly:
raise Exception(
f"You are in readonly mode and the raster {url} has not been downloaded yet."
)
else:
raise Exception(f"Failed to download {url}")
[docs] def get_rasters(self, raster_name, times):
"""return local paths for the raster files
:param times: a list of times
:returns: a list of local paths
"""
if not "TimeDepRasters" in self.model:
raise Exception("No time-dependent rasters found in this model.")
if not raster_name in self.model["TimeDepRasters"]:
raise Exception(
f"Time-dependent rasters ({raster_name}) not found in this model. {self.model['TimeDepRasters']}"
)
if not self.readonly:
self.download_time_dependent_rasters(raster_name, times)
paths = []
for time in times:
url = self.model["TimeDepRasters"][raster_name].format(time)
file_name = url.split("/")[-1]
local_path = f"{self.get_model_dir()}/Rasters/{raster_name}/{file_name}"
if os.path.isfile(local_path):
paths.append(local_path)
elif self.readonly:
raise Exception(
f"You are in readonly mode and the raster {url} has not been downloaded yet."
)
else:
raise Exception(f"Failed to download {url}")
return paths
[docs] def create_model_dir(self):
"""create a model folder with a .metadata.json file in it"""
if self.readonly:
raise Exception("Unable to create model dir in readonly mode.")
if not self.model_dir:
raise Exception(f"Error: model dir is {self.model_dir}")
# model dir already exists
if PlateModel.is_model_dir(self.model_dir):
return self.model_dir
model_path = self.model_dir
if os.path.isfile(model_path):
raise Exception(
f"Fatal: the model folder {model_path} already exists and is a file!! Remove the file or use another path."
)
Path(model_path).mkdir(parents=True, exist_ok=True)
metadata_file = f"{model_path}/.metadata.json"
if not os.path.isfile(metadata_file):
with open(metadata_file, "w+") as f:
json.dump(self.model, f)
return model_path
[docs] @staticmethod
def is_model_dir(folder_path):
"""return True if it is a model dir, otherwise False"""
return os.path.isdir(folder_path) and os.path.isfile(
f"{folder_path}/.metadata.json"
)
[docs] def purge(self):
"""remove the model folder and everything inside it"""
if os.path.isdir(self.model_dir):
shutil.rmtree(self.model_dir)
[docs] def purge_layer(self, layer_name):
"""remove the layer folder of the given layer name"""
layer_path = f"{self.model_dir}/{layer_name}"
if os.path.isdir(layer_path):
shutil.rmtree(layer_path)
[docs] def purge_time_dependent_rasters(self, raster_name):
"""remove the raster folder of the given raster name"""
raster_path = f"{self.model_dir}/{raster_name}"
if os.path.isdir(raster_path):
shutil.rmtree(raster_path)
[docs] def download_layer_files(self, layer_name):
"""given the layer name, download the layer files.
The layer files are in a .zip file. download and unzip it.
:param layer_name: such as "Rotations","Coastlines", "StaticPolygons", "ContinentalPolygons", "Topologies", etc
:returns: the folder path which contains the layer files
"""
if self.readonly:
raise Exception("Unable to download layer files in readonly mode.")
# print(f"downloading {layer_name}")
# find layer file url. two parts. one is the rotation, the other is all other geometry layers
if layer_name in self.model:
layer_file_url = self.model[layer_name]
elif "Layers" in self.model and layer_name in self.model["Layers"]:
layer_file_url = self.model["Layers"][layer_name]
else:
raise Exception(f"Fatal: No {layer_name} files in configuration file!")
model_folder = self.create_model_dir()
layer_folder = f"{model_folder}/{layer_name}"
metadata_file = f"{layer_folder}/{self.meta_filename}"
download_utils.download_file(layer_file_url, metadata_file, model_folder)
return layer_folder
[docs] def download_all_layers(self):
"""download all layers. Call download_layer_files() on every layer"""
if self.readonly:
raise Exception("Unable to download all layers in readonly mode.")
async def f():
tasks = []
if "Rotations" in self.model:
tasks.append(self.run(self.download_layer_files, "Rotations"))
if "Layers" in self.model:
for layer in self.model["Layers"]:
tasks.append(self.run(self.download_layer_files, layer))
# print(tasks)
await asyncio.wait(tasks)
try:
self.loop.run_until_complete(f())
except RuntimeError:
import nest_asyncio
nest_asyncio.apply()
self.loop.run_until_complete(f())
[docs] def get_avail_time_dependent_raster_names(self):
"""return the names of all time dependent rasters which have been configurated in this model."""
if not "TimeDepRasters" in self.model:
return []
else:
return [name for name in self.model["TimeDepRasters"]]
[docs] def download_time_dependent_rasters(self, raster_name, times=None):
"""download time dependent rasters, such agegrids
:param raster_name: raster name, such as AgeGrids. see the models.json
:param times: if not given, download from begin to end with 1My interval
"""
if self.readonly:
raise Exception(
"Unable to download time dependent rasters in readonly mode."
)
if (
"TimeDepRasters" in self.model
and raster_name in self.model["TimeDepRasters"]
):
async def f():
nonlocal times
tasks = []
dst_path = f"{self.get_model_dir()}/Rasters/{raster_name}"
if not times:
times = range(self.model["SmallTime"], self.model["BigTime"])
for time in times:
tasks.append(
self.run(
self.download_raster,
self.model["TimeDepRasters"][raster_name].format(time),
dst_path,
)
)
# print(tasks)
await asyncio.wait(tasks)
try:
self.loop.run_until_complete(f())
except RuntimeError:
import nest_asyncio
nest_asyncio.apply()
self.loop.run_until_complete(f())
else:
raise Exception(
f"Unable to find {raster_name} configuration in this model {self.model_name}."
)
[docs] def download_raster(self, url, dst_path):
"""download a single raster file from "url" and save the file in "dst_path"
a metadata file will also be created for the raster file in folder f"{dst_path}/metadata"
:param url: the url to the raster file
:param dst_path: the folder path to save the raster file
"""
if self.readonly:
raise Exception("Unable to download raster in readonly mode.")
filename = url.split("/")[-1]
metadata_folder = f"{dst_path}/.metadata"
metadata_file = f"{metadata_folder}/{filename}.json"
download_utils.download_file(url, metadata_file, dst_path)
[docs] def download_all(self):
"""download everything in this plate model"""
if self.readonly:
raise Exception("Unable to download all in readonly mode.")
self.download_all_layers()
if "TimeDepRasters" in self.model:
for raster in self.model["TimeDepRasters"]:
self.download_time_dependent_rasters(raster)