common module¶
A module with some common functions to be used with ipyleaflet and folium.
csv_to_shp(in_csv, out_shp=None, latitude='latitude', longitude='longitude')
¶
Converts a csv file with latlon info to a point shapefile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_csv |
str |
The input csv file containing longitude and latitude columns. |
required |
out_shp |
str |
The file path to the output shapefile. |
None |
latitude |
str |
The column name of the latitude column. Defaults to 'latitude'. |
'latitude' |
longitude |
str |
The column name of the longitude column. Defaults to 'longitude'. |
'longitude' |
Source code in nclpy/common.py
def csv_to_shp(in_csv, out_shp=None, latitude="latitude", longitude="longitude"):
"""Converts a csv file with latlon info to a point shapefile.
Args:
in_csv (str): The input csv file containing longitude and latitude columns.
out_shp (str): The file path to the output shapefile.
latitude (str, optional): The column name of the latitude column. Defaults to 'latitude'.
longitude (str, optional): The column name of the longitude column. Defaults to 'longitude'.
"""
import shapefile as shp
if in_csv.startswith("http") and in_csv.endswith(".csv"):
out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
out_name = os.path.basename(in_csv)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
download_from_url(in_csv, out_dir=out_dir)
in_csv = os.path.join(out_dir, out_name)
out_dir = os.path.dirname(out_shp)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
try:
points = shp.Writer(out_shp, shapeType=shp.POINT)
with open(in_csv, encoding="utf-8") as csvfile:
csvreader = csv.DictReader(csvfile)
header = csvreader.fieldnames
[points.field(field) for field in header]
for row in csvreader:
points.point((float(row[longitude])), (float(row[latitude])))
points.record(*tuple([row[f] for f in header]))
out_prj = out_shp.replace(".shp", ".prj")
with open(out_prj, "w") as f:
prj_str = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137,298.257223563]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]] '
f.write(prj_str)
except Exception as e:
print(e)
ee_initialize(token_name='EARTHENGINE_TOKEN')
¶
Authenticates Earth Engine and initialize an Earth Engine session
Source code in nclpy/common.py
def ee_initialize(token_name="EARTHENGINE_TOKEN"):
"""Authenticates Earth Engine and initialize an Earth Engine session"""
if ee.data._credentials is None:
try:
ee_token = os.environ.get(token_name)
if ee_token is not None:
credential_file_path = os.path.expanduser("~/.config/earthengine/")
if not os.path.exists(credential_file_path):
credential = '{"refresh_token":"%s"}' % ee_token
os.makedirs(credential_file_path, exist_ok=True)
with open(credential_file_path + "credentials", "w") as file:
file.write(credential)
ee.Initialize()
except Exception:
ee.Authenticate()
ee.Initialize()
geojson_to_ee(geo_json, geodesic=True)
¶
Converts a geojson to ee.Geometry()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geo_json |
dict |
A geojson geometry dictionary or file path. |
required |
geodesic |
bool |
Whether line segments should be interpreted as spherical geodesics. If false, indicates that line segments should be interpreted as planar lines in the specified CRS. If absent, defaults to true if the CRS is geographic (including the default EPSG:4326), or to false if the CRS is projected. |
True |
Returns:
Type | Description |
---|---|
ee_object |
An ee.Geometry object |
Source code in nclpy/common.py
def geojson_to_ee(geo_json, geodesic=True):
"""Converts a geojson to ee.Geometry()
Args:
geo_json (dict): A geojson geometry dictionary or file path.
geodesic (bool, optional): Whether line segments should be interpreted as spherical geodesics. If false, indicates that line segments should be interpreted as planar lines in the specified CRS. If absent, defaults to true if the CRS is geographic (including the default EPSG:4326), or to false if the CRS is projected.
Returns:
ee_object: An ee.Geometry object
"""
try:
import json
if not isinstance(geo_json, dict) and os.path.isfile(geo_json):
with open(os.path.abspath(geo_json), encoding="utf-8") as f:
geo_json = json.load(f)
if geo_json["type"] == "FeatureCollection":
features = ee.FeatureCollection(geo_json["features"])
return features
elif geo_json["type"] == "Feature":
geom = None
keys = geo_json["properties"]["style"].keys()
if "radius" in keys: # Checks whether it is a circle
geom = ee.Geometry(geo_json["geometry"])
radius = geo_json["properties"]["style"]["radius"]
geom = geom.buffer(radius)
elif (
geo_json["geometry"]["type"] == "Point"
): # Checks whether it is a point
coordinates = geo_json["geometry"]["coordinates"]
longitude = coordinates[0]
latitude = coordinates[1]
geom = ee.Geometry.Point(longitude, latitude)
else:
geom = ee.Geometry(geo_json["geometry"], "", geodesic)
return geom
else:
raise Exception("Could not convert the geojson to ee.Geometry()")
except Exception as e:
print("Could not convert the geojson to ee.Geometry()")
raise Exception(e)