Source code for geoserver_pyadm.layer

import requests, json

from . import _auth as a
from ._auth import auth


[docs] @auth def publish_layer(workspace_name, store_name, layer_name): """Publish a layer in the data store :param workspace_name: the name of the workspace in which the data store is :param store_name: the name of the data store in which the layer you would like to publish is :param layer_name: the name of layer which you would like to publish. the name could be the shapefiles name without .shp """ # a.username, a.passwd, a.server_url = get_cfg() url = f"{a.server_url}/rest/workspaces/{workspace_name}/datastores/{store_name}/featuretypes/" layer_xml = f"<featureType><name>{layer_name}</name></featureType>" headers = {"content-type": "text/xml"} r = requests.post( url, data=layer_xml, auth=(a.username, a.passwd), headers=headers, ) if r.status_code not in [200, 201]: print(f"Unable to publish layer {layer_name}. {r.status_code}, {r.content}") return r
[docs] @auth def publish_raster_layer(workspace_name, store_name, layer_name): """Publish a coverage/raster layer from a coverage store. It seems ,for some reason, that only one raster is allowed per coverage store. :param workspace_name: the name of the workspace :param store_name: the name of the coverage store in which the raster layer reside :param layer_name: the name of the raster layer """ # a.username, a.passwd, a.server_url = get_cfg() url = f"{a.server_url}/rest/workspaces/{workspace_name}/coveragestores/{store_name}/coverages/" layer_xml = f"<coverage><name>{layer_name}</name><nativeName>{layer_name}</nativeName></coverage>" headers = {"content-type": "text/xml"} r = requests.post( url, data=layer_xml, auth=(a.username, a.passwd), headers=headers, ) if r.status_code not in [200, 201]: print(f"Unable to publish layer {layer_name}. {r.status_code}, {r.content}") return r
[docs] @auth def get_layer(layer_name: str, workspace=None): """Get the definition of the layer :param layer_name: str: the name of the layer to retrieve :param workspace: (Default value = None) if the workspace name is not given, return the first layer which matches the given layer name. This is odd! """ # a.username, a.passwd, a.server_url = get_cfg() if workspace: url = f"{a.server_url}/rest/workspaces/{workspace}/layers/{layer_name}" else: url = f"{a.server_url}/rest/layers/{layer_name}" r = requests.get( url, auth=(a.username, a.passwd), ) # print(r.json()) if r.status_code in [200, 201]: return r.json() else: return None
[docs] @auth def get_layers(workspace=None): """Get all the layers in geoserver if workspace name is not given. Return all the layers in a workspace if workspace name is given. :param workspace: (Default value = None) If workspace name is not given, the function will return all the layers in the geoserver. """ # a.username, a.passwd, a.server_url = get_cfg() if workspace: url = f"{a.server_url}/rest/workspaces/{workspace}/layers" else: url = f"{a.server_url}/rest/layers" r = requests.get( url, auth=(a.username, a.passwd), ) # print(r.json()) if r.status_code in [200, 201]: ret = [] data = r.json() if "layer" in data["layers"]: ret = [d["name"] for d in data["layers"]["layer"]] return ret return None
[docs] @auth def delete_layer(layer_name, workspace=None): """Delete a layer/layers by name :param layer_name: the name of the layer which you would like to delete :param workspace: (Default value = None) If the workspace name is not given, delete all layers with the given name. """ payload = {"recurse": "True", "quietOnNotFound": "True"} if workspace: url = f"{a.server_url}/rest/workspaces/{workspace}/layers/{layer_name}" r = requests.delete( url=url, params=payload, auth=(a.username, a.passwd), ) if r.status_code in [200, 201]: print(f"The layer {layer_name} has been deleted!") return r.content else: print(f"Failed to delete layer:{layer_name}") return r.content else: url = f"{a.server_url}/rest/layers/{layer_name}" while True: r = requests.delete( url=url, params=payload, auth=(a.username, a.passwd), ) if r.status_code not in [200, 201]: break else: print(f"The layer {layer_name} has been deleted!") # print(r.content) return "done"
[docs] @auth def get_layer_styles(full_layer_name: str): """Get the styles associated with a layer :param full_layer_name: str: the layer name including the workspace_name, such as workspace_name:layer_name """ # a.username, a.passwd, a.server_url = get_cfg() url = f"{a.server_url}/rest/layers/{full_layer_name}/styles" r = requests.get( url, auth=(a.username, a.passwd), ) # print(r.content) if r.status_code in [200, 201]: ret = [] data = r.json() if "style" in data["styles"]: ret = [d["name"] for d in data["styles"]["style"]] return ret else: return None
[docs] @auth def publish_geopackage_layer( workspace_name, store_name, layer_name, geom_type="polygon" ): """Publish a geopackage layer in the data store :param workspace_name: the name of the workspace in which the data store is :param store_name: the name of the data store in which the layer you would like to publish is :param layer_name: the name of geopackage layer which you would like to publish. """ url = f"{a.server_url}/rest/workspaces/{workspace_name}/datastores/{store_name}/featuretypes/" layer_cfg = { "featureType": { "name": f"{layer_name}_{geom_type}", "nativeName": f"{layer_name}_{geom_type}", "title": f"{layer_name}_{geom_type}", } } headers = {"content-type": "application/json"} r = requests.post( url, data=json.dumps(layer_cfg), auth=(a.username, a.passwd), headers=headers, ) if r.status_code not in [200, 201]: print(f"Unable to publish layer {layer_name}. {r.status_code}, {r.content}") return r