#!/usr/bin/env python
"""
utilities.py
Written by Tyler Sutterley (11/2024)
Download and management utilities
PYTHON DEPENDENCIES:
boto3: Amazon Web Services (AWS) SDK for Python
https://boto3.amazonaws.com/v1/documentation/api/latest/index.html
s3fs: FUSE-based file system backed by Amazon S3
https://s3fs.readthedocs.io/en/latest/
UPDATE HISTORY:
Updated 11/2024: change default request type to application/netcdf
Updated 10/2024: update CMR search utility to replace deprecated scrolling
https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html
Updated 08/2024: generalize hash function to use any available algorithm
Updated 05/2024: add wrapper to importlib for optional dependencies
Updated 11/2023: updated ssl context to fix deprecation error
Updated 10/2023: filter CMR request type using regular expressions
Updated 08/2023: added ATL14/15 Release-03 data products
add option to specify the start and end cycle for a local granule
Updated 07/2023: use logging instead of warnings for import attempts
Updated 06/2023: using pathlib to define and expand paths
add functions to retrieve and revoke NASA Earthdata User tokens
updated netCDF4 request type for NSIDC s3 bucket CMR queries
Updated 12/2022: functions for managing and maintaining git repositories
Updated 11/2022: can query for zarr datasets
Updated 10/2022: public release of NSIDC s3 access
Written 07/2022
"""
from __future__ import print_function, division, annotations
import sys
import os
import re
import io
import ssl
import json
import netrc
import shutil
import base64
import getpass
import hashlib
import inspect
import logging
import pathlib
import builtins
import warnings
import importlib
import posixpath
import traceback
import subprocess
import calendar, time
if sys.version_info[0] == 2:
from cookielib import CookieJar
from urllib import urlencode
import urllib2
else:
from http.cookiejar import CookieJar
from urllib.parse import urlencode
import urllib.request as urllib2
# PURPOSE: get absolute path within a package from a relative path
[docs]
def get_data_path(relpath: list | str | pathlib.Path):
"""
Get the absolute path within a package from a relative path
Parameters
----------
relpath: list, str or pathlib.Path
relative path
"""
# current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
filepath = pathlib.Path(filename).absolute().parent
if isinstance(relpath, list):
# use *splat operator to extract from list
return filepath.joinpath(*relpath)
elif isinstance(relpath, (str, pathlib.Path)):
return filepath.joinpath(relpath)
[docs]
def import_dependency(
name: str, extra: str = "", raise_exception: bool = False
):
"""
Import an optional dependency
Adapted from ``pandas.compat._optional::import_optional_dependency``
Parameters
----------
name: str
Module name
extra: str, default ""
Additional text to include in the ``ImportError`` message
raise_exception: bool, default False
Raise an ``ImportError`` if the module is not found
Returns
-------
module: obj
Imported module
"""
# check if the module name is a string
msg = f"Invalid module name: '{name}'; must be a string"
assert isinstance(name, str), msg
# default error if module cannot be imported
err = f"Missing optional dependency '{name}'. {extra}"
module = type("module", (), {})
# try to import the module
try:
module = importlib.import_module(name)
except (ImportError, ModuleNotFoundError) as exc:
if raise_exception:
raise ImportError(err) from exc
else:
logging.debug(err)
# return the module
return module
[docs]
def dependency_available(name: str, minversion: str | None = None):
"""
Checks whether a module is installed without importing it
Adapted from ``xarray.namedarray.utils.module_available``
Parameters
----------
name: str
Module name
minversion : str, optional
Minimum version of the module
Returns
-------
available : bool
Whether the module is installed
"""
# check if module is available
if importlib.util.find_spec(name) is None:
return False
# check if the version is greater than the minimum required
if minversion is not None:
version = importlib.metadata.version(name)
return version >= minversion
# return if both checks are passed
return True
# PURPOSE: get the hash value of a file
[docs]
def get_hash(local: str | io.IOBase | pathlib.Path, algorithm: str = "md5"):
"""
Get the hash value from a local file or ``BytesIO`` object
Parameters
----------
local: obj, str or pathlib.Path
BytesIO object or path to file
algorithm: str, default 'md5'
hashing algorithm for checksum validation
"""
# check if open file object or if local file exists
if isinstance(local, io.IOBase):
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local.getvalue()).hexdigest()
else:
raise ValueError(f"Invalid hashing algorithm: {algorithm}")
elif isinstance(local, (str, pathlib.Path)):
# generate checksum hash for local file
local = pathlib.Path(local).expanduser()
# if file currently doesn't exist, return empty string
if not local.exists():
return ""
# open the local_file in binary read mode
with local.open(mode="rb") as local_buffer:
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local_buffer.read()).hexdigest()
else:
raise ValueError(f"Invalid hashing algorithm: {algorithm}")
else:
return ""
# PURPOSE: get the git hash value
def get_git_revision_hash(refname: str = "HEAD", short: bool = False):
"""
Get the ``git`` hash value for a particular reference
Parameters
----------
refname: str, default HEAD
Symbolic reference name
short: bool, default False
Return the shorted hash value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath(".git")
# build command
cmd = ["git", f"--git-dir={gitpath}", "rev-parse"]
cmd.append("--short") if short else None
cmd.append(refname)
# get output
with warnings.catch_warnings():
return str(subprocess.check_output(cmd), encoding="utf8").strip()
# PURPOSE: get the current git status
def get_git_status():
"""Get the status of a ``git`` repository as a boolean value"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath(".git")
# build command
cmd = ["git", f"--git-dir={gitpath}", "status", "--porcelain"]
with warnings.catch_warnings():
return bool(subprocess.check_output(cmd))
# PURPOSE: recursively split a url path
[docs]
def url_split(s: str):
"""
Recursively split a url path into a list
Parameters
----------
s: str
url string
"""
head, tail = posixpath.split(s)
if head in ("http:", "https:", "ftp:", "s3:"):
return (s,)
elif head in ("", posixpath.sep):
return (tail,)
return url_split(head) + (tail,)
# PURPOSE: returns the Unix timestamp value for a formatted date string
[docs]
def get_unix_time(time_string: str, format: str = "%Y-%m-%d %H:%M:%S"):
"""
Get the Unix timestamp value for a formatted date string
Parameters
----------
time_string: str
formatted time string to parse
format: str, default '%Y-%m-%d %H:%M:%S'
format for input time string
"""
try:
parsed_time = time.strptime(time_string.rstrip(), format)
except (TypeError, ValueError):
pass
else:
return calendar.timegm(parsed_time)
# NASA on-prem DAAC providers
_daac_providers = {
"gesdisc": "GES_DISC",
"ghrcdaac": "GHRC_DAAC",
"lpdaac": "LPDAAC_ECS",
"nsidc": "NSIDC_ECS",
"ornldaac": "ORNL_DAAC",
"podaac": "PODAAC",
}
# NASA Cumulus AWS providers
_s3_providers = {
"gesdisc": "GES_DISC",
"ghrcdaac": "GHRC_DAAC",
"lpdaac": "LPCLOUD",
"nsidc": "NSIDC_CPRD",
"ornldaac": "ORNL_CLOUD",
"podaac": "POCLOUD",
}
# NASA Cumulus AWS S3 credential endpoints
_s3_endpoints = {
"gesdisc": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcdaac": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"lpdaac": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"nsidc": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ornldaac": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
}
# NASA Cumulus AWS S3 buckets
_s3_buckets = {
"gesdisc": "gesdisc-cumulus-prod-protected",
"ghrcdaac": "ghrc-cumulus-dev",
"lpdaac": "lp-prod-protected",
"nsidc": "nsidc-cumulus-prod-protected",
"ornldaac": "ornl-cumulus-prod-protected",
"podaac": "podaac-ops-cumulus-protected",
}
# PURPOSE: get AWS s3 client for NSIDC Cumulus
[docs]
def s3_client(
HOST: str = _s3_endpoints["nsidc"],
timeout: int | None = None,
region_name: str = "us-west-2",
):
"""
Get AWS s3 client for NSIDC data in the cloud
https://data.nsidc.earthdatacloud.nasa.gov/s3credentials
Parameters
----------
HOST: str
NSIDC AWS S3 credential host
timeout: int or NoneType, default None
timeout in seconds for blocking operations
region_name: str, default 'us-west-2'
AWS region name
Returns
-------
client: obj
AWS s3 client for NSIDC Cumulus
"""
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=timeout)
cumulus = json.loads(response.read())
# get AWS client object
boto3 = import_dependency("boto3")
client = boto3.client(
"s3",
aws_access_key_id=cumulus["accessKeyId"],
aws_secret_access_key=cumulus["secretAccessKey"],
aws_session_token=cumulus["sessionToken"],
region_name=region_name,
)
# return the AWS client for region
return client
# PURPOSE: get AWS s3 file system for NSIDC Cumulus
[docs]
def s3_filesystem(
HOST: str = _s3_endpoints["nsidc"],
timeout: int | None = None,
region_name: str = "us-west-2",
):
"""
Get AWS s3 file system object for NSIDC data in the cloud
https://data.nsidc.earthdatacloud.nasa.gov/s3credentials
Parameters
----------
HOST: str
NSIDC AWS S3 credential host
timeout: int or NoneType, default None
timeout in seconds for blocking operations
region_name: str, default 'us-west-2'
AWS region name
Returns
-------
session: obj
AWS s3 file system session for NSIDC Cumulus
"""
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=timeout)
cumulus = json.loads(response.read())
# get AWS file system session object
s3fs = import_dependency("s3fs")
session = s3fs.S3FileSystem(
anon=False,
key=cumulus["accessKeyId"],
secret=cumulus["secretAccessKey"],
token=cumulus["sessionToken"],
client_kwargs=dict(region_name=region_name),
)
# return the AWS session for region
return session
# PURPOSE: get a s3 bucket name from a presigned url
[docs]
def s3_bucket(presigned_url: str):
"""
Get a s3 bucket name from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url
Returns
-------
bucket: str
s3 bucket name
"""
host = url_split(presigned_url)
bucket = re.sub(r"s3:\/\/", r"", host[0], re.IGNORECASE)
return bucket
# PURPOSE: get a s3 bucket key from a presigned url
[docs]
def s3_key(presigned_url: str):
"""
Get a s3 bucket key from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url or https url
Returns
-------
key: str
s3 bucket key for object
"""
host = url_split(presigned_url)
# check if url is https url or s3 presigned url
if presigned_url.startswith("http"):
# use NSIDC format for s3 keys from https
parsed = [p for part in host[-4:-1] for p in part.split(".")]
# join parsed url parts to form bucket key
key = posixpath.join(*parsed, host[-1])
else:
# join presigned url to form bucket key
key = posixpath.join(*host[1:])
# return the s3 bucket key for object
return key
# PURPOSE: get a s3 presigned url from a bucket and key
[docs]
def s3_presigned_url(bucket: str, key: str):
"""
Get a s3 presigned url from a bucket and object key
Parameters
----------
bucket: str
s3 bucket name
key: str
s3 bucket key for object
Returns
-------
presigned_url: str
s3 presigned url
"""
return posixpath.join("s3://", bucket, key)
# PURPOSE: generate a s3 presigned https url from a bucket and key
[docs]
def generate_presigned_url(bucket: str, key: str, expiration: int = 3600):
"""
Generate a presigned https URL to share an S3 object
Parameters
----------
bucket: str
s3 bucket name
key: str
s3 bucket key for object
expiration: int
Time in seconds for the presigned URL to remain valid
Returns
-------
presigned_url: str
s3 presigned https url
"""
# generate a presigned URL for S3 object
boto3 = import_dependency("boto3")
s3 = boto3.client("s3")
try:
response = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expiration,
)
except Exception as exc:
logging.error(exc)
return None
# The response contains the presigned URL
return response
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default SSL context"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an SSL context for unverified connections"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the SSL context"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: attempt to build an opener with netrc
[docs]
def attempt_login(
urs: str = "urs.earthdata.nasa.gov",
context=_default_ssl_context,
password_manager: bool = False,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = True,
**kwargs,
):
"""
attempt to build a ``urllib`` opener for NASA Earthdata
Parameters
----------
urs: str, default urs.earthdata.nasa.gov
Earthdata login URS 3 host
context: obj, default IS2view.utilities._default_ssl_context
SSL context for ``urllib`` opener object
password_manager: bool, default True
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default False
Add base64 encoded authorization header to opener
username: str, default from environmental variable
NASA Earthdata username
password: str, default from environmental variable
NASA Earthdata password
retries: int, default 5
number of retry attempts
netrc: str, default ~/.netrc
path to .netrc file for authentication
Returns
-------
opener: obj
OpenerDirector instance
"""
# set default keyword arguments
kwargs.setdefault("username", os.environ.get("EARTHDATA_USERNAME"))
kwargs.setdefault("password", os.environ.get("EARTHDATA_PASSWORD"))
kwargs.setdefault("retries", 5)
kwargs.setdefault("netrc", pathlib.Path.home().joinpath(".netrc"))
try:
# only necessary on jupyterhub
kwargs["netrc"].chmod(mode=0o600)
# try retrieving credentials from netrc
username, _, password = netrc.netrc(kwargs["netrc"]).authenticators(urs)
except Exception as exc:
logging.error(exc)
# try retrieving credentials from environmental variables
username, password = (kwargs["username"], kwargs["password"])
pass
# if username or password are not available
if not username:
username = builtins.input(f"Username for {urs}: ")
if not password:
password = getpass.getpass(prompt=f"Password for {username}@{urs}: ")
# for each retry
for retry in range(kwargs["retries"]):
# build an opener for urs with credentials
opener = build_opener(
username,
password,
context=context,
password_manager=password_manager,
get_ca_certs=get_ca_certs,
redirect=redirect,
authorization_header=authorization_header,
urs=urs,
)
# try logging in by check credentials
try:
check_credentials()
except Exception as exc:
logging.error(exc)
pass
else:
return opener
# reattempt login
username = builtins.input(f"Username for {urs}: ")
password = getpass.getpass(prompt=f"Password for {username}@{urs}: ")
# reached end of available retries
raise RuntimeError("End of Retries: Check NASA Earthdata credentials")
# PURPOSE: "login" to NASA Earthdata with supplied credentials
[docs]
def build_opener(
username: str,
password: str,
context=_default_ssl_context,
password_manager: bool = True,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = False,
urs: str = "https://urs.earthdata.nasa.gov",
):
"""
Build ``urllib`` opener for NASA Earthdata with supplied credentials
Parameters
----------
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
context: obj, default IS2view.utilities._default_ssl_context
SSL context for ``urllib`` opener object
password_manager: bool, default True
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default False
Add base64 encoded authorization header to opener
urs: str, default 'https://urs.earthdata.nasa.gov'
Earthdata login URS 3 host
Returns
-------
opener: obj
``OpenerDirector`` instance
"""
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# create a password manager
if password_manager:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
# Add the username and password for NASA Earthdata Login system
password_mgr.add_password(None, urs, username, password)
handler.append(urllib2.HTTPBasicAuthHandler(password_mgr))
# Create cookie jar for storing cookies. This is used to store and return
# the session cookie given to use by the data server (otherwise will just
# keep sending us back to Earthdata Login to authenticate).
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
# SSL context handler
if get_ca_certs:
context.get_ca_certs()
handler.append(urllib2.HTTPSHandler(context=context))
# redirect handler
if redirect:
handler.append(urllib2.HTTPRedirectHandler())
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# Encode username/password for request authorization headers
# add Authorization header to opener
if authorization_header:
b64 = base64.b64encode(f"{username}:{password}".encode())
opener.addheaders = [("Authorization", f"Basic {b64.decode()}")]
# Now all calls to urllib2.urlopen use our opener.
urllib2.install_opener(opener)
# All calls to urllib2.urlopen will now use handler
# Make sure not to include the protocol in with the URL, or
# HTTPPasswordMgrWithDefaultRealm will be confused.
return opener
# PURPOSE: generate a NASA Earthdata user token
[docs]
def get_token(
HOST: str = "https://urs.earthdata.nasa.gov/api/users/token",
username: str | None = None,
password: str | None = None,
build: bool = True,
urs: str = "urs.earthdata.nasa.gov",
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
HOST: str or list
NASA Earthdata token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
token: dict
JSON response with NASA Earthdata User Token
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
password_manager=False,
authorization_header=True,
)
# create post response with Earthdata token API
try:
request = urllib2.Request(HOST, method="POST")
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: generate a NASA Earthdata user token
[docs]
def list_tokens(
HOST: str = "https://urs.earthdata.nasa.gov/api/users/tokens",
username: str | None = None,
password: str | None = None,
build: bool = True,
urs: str = "urs.earthdata.nasa.gov",
):
"""
List the current associated NASA Earthdata User Tokens
Parameters
----------
HOST: str
NASA Earthdata list token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
tokens: list
JSON response with NASA Earthdata User Tokens
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
password_manager=False,
authorization_header=True,
)
# create get response with Earthdata list tokens API
try:
request = urllib2.Request(HOST)
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: revoke a NASA Earthdata user token
[docs]
def revoke_token(
token: str,
HOST: str = f"https://urs.earthdata.nasa.gov/api/users/revoke_token",
username: str | None = None,
password: str | None = None,
build: bool = True,
urs: str = "urs.earthdata.nasa.gov",
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
token: str
NASA Earthdata token to be revoked
HOST: str
NASA Earthdata revoke token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
password_manager=False,
authorization_header=True,
)
# full path for NASA Earthdata revoke token API
url = f"{HOST}?token={token}"
# create post response with Earthdata revoke tokens API
try:
request = urllib2.Request(url, method="POST")
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# verbose response
logging.debug(f"Token Revoked: {token}")
# PURPOSE: check that entered NASA Earthdata credentials are valid
[docs]
def check_credentials():
"""
Check that entered NASA Earthdata credentials are valid
"""
try:
remote_path = "https://urs.earthdata.nasa.gov/api/users/tokens"
request = urllib2.Request(url=remote_path)
response = urllib2.urlopen(request, timeout=20)
except urllib2.HTTPError as exc:
raise RuntimeError("Check your NASA Earthdata credentials") from exc
except urllib2.URLError as exc:
raise RuntimeError("Check internet connection") from exc
else:
return True
# PURPOSE: download a file from a NSIDC https server
[docs]
def from_nsidc(
HOST: str | list,
username: str | None = None,
password: str | None = None,
build: bool = True,
timeout: int | None = None,
urs: str = "urs.earthdata.nasa.gov",
local: str | pathlib.Path | None = None,
hash: str = "",
chunk: int = 16384,
verbose: bool = False,
fid=sys.stdout,
mode: oct = 0o775,
):
"""
Download a file from a NSIDC https server
Parameters
----------
HOST: str or list
remote https host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
local: str or NoneType, default None
path to local file
hash: str, default ''
MD5 hash of local file
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
Returns
-------
remote_buffer: obj
BytesIO representation of file
response_error: str or None
notification for response error
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(urs, username=username, password=password)
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try downloading from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout)
except (urllib2.HTTPError, urllib2.URLError) as exc:
logging.error(exc)
response_error = "Download error from {0}".format(posixpath.join(*HOST))
return (False, response_error)
else:
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
shutil.copyfileobj(response, remote_buffer, chunk)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
args = (posixpath.join(*HOST), str(local))
logging.info("{0} -->\n\t{1}".format(*args))
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode="wb") as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode=mode)
# return the bytesIO object
remote_buffer.seek(0)
return (remote_buffer, None)
# available regions and resolutions
_products = ("ATL14", "ATL15")
_regions = ("AA", "A1", "A2", "A3", "A4", "CN", "CS", "GL", "IS", "RA", "SV")
_atl14_resolutions = ("100m",)
_atl15_resolutions = ("01km", "10km", "20km", "40km")
_resolutions = _atl14_resolutions + _atl15_resolutions
# PURPOSE: build formatted query string for ICESat-2 release
[docs]
def cmr_query_release(release: str | int | None):
"""
Build formatted query string for ICESat-2 release
Parameters
----------
release: str
ICESat-2 data release
Returns
-------
query_params: str
formatted string for CMR queries
"""
if release is None:
return ""
# maximum length of version in CMR queries
desired_pad_length = 3
if len(str(release)) > desired_pad_length:
raise RuntimeError(f'Release string too long: "{release}"')
# Strip off any leading zeros
release = str(release).lstrip("0")
query_params = ""
while len(release) <= desired_pad_length:
padded_release = release.zfill(desired_pad_length)
query_params += f"&version={padded_release}"
desired_pad_length -= 1
return query_params
# PURPOSE: check if the submitted ATL14/15 regions are valid
[docs]
def cmr_regions(region: str | list | None):
"""
Check if the submitted ATL14/15 regions are valid
Parameters
----------
region: str, list or NoneType, default None
Returns
-------
region_list: list
formatted available ATL14/15 regions
"""
# all available ICESat-2 ATL14/15 regions
if region is None:
return ["??"]
else:
if isinstance(region, str):
assert region in _regions
region_list = [str(region)]
elif isinstance(region, list):
region_list = []
for r in region:
assert r in _regions
region_list.append(str(r))
else:
raise TypeError("Please enter the region as a list or string")
# check if user-entered region is currently not available
if not set(_regions) & set(region_list):
warnings.filterwarnings("module")
warnings.warn("Listed region is not presently available")
return region_list
# PURPOSE: check if the submitted ATL14/15 regions are valid
[docs]
def cmr_resolutions(resolution: str | list | None):
"""
Check if the submitted ATL14/15 resolutions are valid
Parameters
----------
resolution: str, list or NoneType, default None
ICESat-2 ATL14/15 spatial resolution
Returns
-------
resolution_list: list
formatted available ATL14/15 resolutions
"""
# all available ICESat-2 ATL14/15 resolutions
if resolution is None:
return ["????"]
else:
if isinstance(resolution, str):
assert resolution in _resolutions
resolution_list = [str(resolution)]
elif isinstance(resolution, list):
resolution_list = []
for r in resolution:
assert r in _resolutions
resolution_list.append(str(r))
else:
raise TypeError("Please enter the resolution as a list or string")
# check if user-entered resolution is currently not available
if not set(_resolutions) & set(resolution_list):
warnings.filterwarnings("module")
warnings.warn("Listed resolution is not presently available")
return resolution_list
[docs]
def cmr_readable_granules(product: str, **kwargs):
"""
Create list of readable granule names for CMR queries
Parameters
----------
product: str
ICESat-2 data product
regions: str, list or NoneType, default None
ICESat-2 ATL14/15 region name
resolutions: str, list or NoneType, default None
ICESat-2 ATL14/15 spatial resolution
Returns
-------
readable_granule_list: list
readable granule names for CMR queries
"""
# default keyword arguments
kwargs.setdefault("regions", None)
kwargs.setdefault("resolutions", None)
# list of readable granule names
readable_granule_list = []
# verify inputs
assert product in _products
# gridded land ice products
# for each ATL14/15 parameter
for r in cmr_regions(kwargs["regions"]):
for s in cmr_resolutions(kwargs["resolutions"]):
pattern = f"{product}_{r}_????_{s}_*"
# append the granule pattern
readable_granule_list.append(pattern)
# return readable granules list
return readable_granule_list
# PURPOSE: filter the CMR json response for desired data files
[docs]
def cmr_filter_json(
search_results: dict,
endpoint: str = "data",
request_type: str = r"application/x-hdfeos",
):
"""
Filter the CMR json response for desired data files
Parameters
----------
search_results: dict
json response from CMR query
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-hdfeos'
data type for reducing CMR query
Returns
-------
producer_granule_ids: list
ICESat-2 granules
granule_urls: list
ICESat-2 granule urls from NSIDC
"""
# output list of granule ids and urls
producer_granule_ids = []
granule_urls = []
# check that there are urls for request
if ("feed" not in search_results) or (
"entry" not in search_results["feed"]
):
return (producer_granule_ids, granule_urls)
# descriptor links for each endpoint
rel = {}
rel["data"] = "http://esipfed.org/ns/fedsearch/1.1/data#"
rel["opendap"] = "http://esipfed.org/ns/fedsearch/1.1/service#"
rel["s3"] = "http://esipfed.org/ns/fedsearch/1.1/s3#"
# iterate over references and get cmr location
for entry in search_results["feed"]["entry"]:
producer_granule_ids.append(entry["producer_granule_id"])
for link in entry["links"]:
# skip links without descriptors
if "rel" not in link.keys():
continue
if "type" not in link.keys():
continue
# append if selected endpoint and request type
if (link["rel"] == rel[endpoint]) and re.match(
request_type, link["type"]
):
granule_urls.append(link["href"])
break
# return the list of urls and granule ids
return (producer_granule_ids, granule_urls)
# PURPOSE: cmr queries for gridded land ice products
[docs]
def cmr(
product: str = None,
release: str = None,
regions: str | list | None = None,
resolutions: str | list | None = None,
provider: str = "NSIDC_CPRD",
endpoint: str = "data",
request_type: str = r"application/(x-)?netcdf",
opener=None,
context: ssl.SSLContext = _default_ssl_context,
verbose: bool = False,
fid=sys.stdout,
):
"""
Query the NASA Common Metadata Repository (CMR) for ICESat-2 data
Parameters
----------
product: str or NoneType, default None
ICESat-2 data product
release: str or NoneType, default None
ICESat-2 data release
regions: str, list or NoneType, default None
ICESat-2 ATL14/15 region name
resolutions: str, list or NoneType, default None
ICESat-2 ATL14/15 spatial resolution
provider: str, default 'NSIDC_CPRD'
CMR data provider
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/(x-)?netcdf'
data type for reducing CMR query
opener: obj or NoneType, default None
``OpenerDirector`` instance
context: obj, default IS2view.utilities._default_ssl_context
SSL context for ``urllib`` opener object
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
Returns
-------
producer_granule_ids: list
ICESat-2 granules
granule_urls: list
ICESat-2 granule urls from NSIDC
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# attempt to build urllib2 opener
if opener is None:
# build urllib2 opener with SSL context
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# Create cookie jar for storing cookies
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
handler.append(urllib2.HTTPSHandler(context=context))
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# build CMR query
cmr_query_type = "granules"
cmr_format = "json"
cmr_page_size = 2000
CMR_HOST = [
"https://cmr.earthdata.nasa.gov",
"search",
f"{cmr_query_type}.{cmr_format}",
]
# build list of CMR query parameters
CMR_KEYS = []
CMR_KEYS.append(f"?provider={provider}")
CMR_KEYS.append("&sort_key[]=start_date")
CMR_KEYS.append("&sort_key[]=producer_granule_id")
CMR_KEYS.append(f"&page_size={cmr_page_size}")
# append product string
CMR_KEYS.append(f"&short_name={product}")
# append release strings
CMR_KEYS.append(cmr_query_release(release))
# append keys for querying specific granules
CMR_KEYS.append("&options[readable_granule_name][pattern]=true")
CMR_KEYS.append("&options[spatial][or]=true")
# set as subregions for Release-3+ for merging into single dataset
if (int(release) > 2) and (regions == "AA"):
regions = ["A1", "A2", "A3", "A4"]
# get the list of readable granules
readable_granule_list = cmr_readable_granules(
product, regions=regions, resolutions=resolutions
)
for gran in readable_granule_list:
CMR_KEYS.append(f"&readable_granule_name[]={gran}")
# full CMR query url
cmr_query_url = "".join([posixpath.join(*CMR_HOST), *CMR_KEYS])
logging.info(f"CMR request={cmr_query_url}")
# output list of granule names and urls
producer_granule_ids = []
granule_urls = []
cmr_search_after = None
while True:
req = urllib2.Request(cmr_query_url)
# add CMR search after header
if cmr_search_after:
req.add_header("CMR-Search-After", cmr_search_after)
logging.debug(f"CMR-Search-After: {cmr_search_after}")
response = opener.open(req)
# get search after index for next iteration
headers = {k.lower(): v for k, v in dict(response.info()).items()}
cmr_search_after = headers.get("cmr-search-after")
# read the CMR search as JSON
search_page = json.loads(response.read().decode("utf-8"))
ids, urls = cmr_filter_json(
search_page, endpoint=endpoint, request_type=request_type
)
if not urls or cmr_search_after is None:
break
# extend lists
producer_granule_ids.extend(ids)
granule_urls.extend(urls)
# return the list of granule ids and urls
return (producer_granule_ids, granule_urls)
# available assets for finding data
_assets = ("nsidc-s3", "atlas-s3", "nsidc-https", "atlas-local")
# available formats for accessing data
_formats = ("nc", "zarr")
# PURPOSE: queries CMR or s3 for available granules
[docs]
def query_resources(**kwargs):
"""
Queries CMR or s3 for available granules
Parameters
----------
asset: str, default 'nsidc-https'
Location to get the ICESat-2 gridded land ice data
- ``nsidc-https`` : NSIDC on-prem DAAC
- ``nsidc-s3`` : NSIDC AWS protected s3 bucket
- ``atlas-s3`` : s3 bucket in `us-west-2`
- ``atlas-local`` : local directory
bucket: str, default 'is2view'
Project AWS s3 bucket name
directory: str or NoneType, default None
Working data directory if local
product: str, default 'ATL15'
ICESat-2 gridded land ice product
- ``ATL14`` : land ice height
- ``ATL15`` : land ice height change
release: str, default '004'
ICESat-2 data release
version: str, default '01'
ICESat-2 data version
region: str, default 'AA'
ICESat-2 ATL14/15 region name
- ``AA`` : Antarctic
- ``CN`` : Northern Canadian Archipelagobb
- ``CS`` : Southern Canadian Archipelago
- ``GL`` : Greenland
- ``IS`` : Iceland
- ``SV`` : Svalbard
- ``RA`` : Russian High Arctic
resolution: str, default '01km'
ICESat-2 ATL14/15 spatial resolution
- ``100m`` : 100 meters horizontal
- ``01km`` : 1 kilometer horizontal
- ``10km`` : 10 kilometers horizontal
- ``20km`` : 20 kilometers horizontal
- ``40km`` : 40 kilometers horizontal
format: str, default 'nc'
Data format for ICESat-2 granules
- ``nc`` : Native netCDF4
- ``zarr`` : Cloud-optimized zarr
Returns
-------
granule: str
presigned url or path for granule
"""
kwargs.setdefault("asset", "nsidc-https")
kwargs.setdefault("bucket", "is2view")
kwargs.setdefault("directory", None)
kwargs.setdefault("product", "ATL15")
kwargs.setdefault("release", "004")
kwargs.setdefault("version", "01")
kwargs.setdefault("cycles", None)
kwargs.setdefault("region", "AA")
kwargs.setdefault("resolution", "01km")
kwargs.setdefault("format", "nc")
# CMR providers
provider = {}
provider["nsidc-s3"] = _s3_providers["nsidc"]
provider["atlas-s3"] = _s3_providers["nsidc"]
provider["nsidc-https"] = _s3_providers["nsidc"]
provider["atlas-local"] = _s3_providers["nsidc"]
# CMR endpoints
endpoint = {}
endpoint["nsidc-s3"] = "s3"
endpoint["atlas-s3"] = "s3"
endpoint["nsidc-https"] = "data"
endpoint["atlas-local"] = "data"
# CMR request types
request_type = {}
request_type["nsidc-s3"] = r"application/(x-)?netcdf"
request_type["atlas-s3"] = r"application/(x-)?netcdf"
request_type["nsidc-https"] = r"application/(x-)?netcdf"
request_type["atlas-local"] = r"application/(x-)?netcdf"
# convert region variable to list
if (int(kwargs["release"]) > 2) and (kwargs["region"].upper() == "AA"):
# set Antarctic sub-regions for Release-3+
kwargs["region"] = ["A1", "A2", "A3", "A4"]
elif kwargs["region"].lower() == "north":
# set for merging Arctic regions into a single dataset
kwargs["region"] = ["CS", "CN", "GL", "IS", "RA", "SV"]
elif isinstance(kwargs["region"], str):
kwargs["region"] = [kwargs["region"]]
# verify inputs
assert kwargs["asset"] in _assets
assert kwargs["product"] in _products
assert int(kwargs["release"]) > 0
if kwargs["cycles"] is not None:
assert len(kwargs["cycles"]) == 2, "cycles should be length 2"
for r in kwargs["region"]:
assert r in _regions
assert kwargs["resolution"] in _resolutions
assert kwargs["format"] in _formats
# attempt to get resource
granules = []
try:
# query CMR
ids, urls = cmr(
product=kwargs["product"],
release=kwargs["release"],
regions=kwargs["region"],
resolutions=kwargs["resolution"],
provider=provider[kwargs["asset"]],
endpoint=endpoint[kwargs["asset"]],
request_type=request_type[kwargs["asset"]],
)
# check if granule is available
if not (ids or urls):
raise Exception("Granule not found in asset")
# check if available on s3 or locally
if kwargs["asset"] == "nsidc-s3":
# submit request to create AWS session
attempt_login("urs.earthdata.nasa.gov", authorization_header=True)
# get AWS s3 file system object
session = s3_filesystem(_s3_endpoints["nsidc"])
# append to granules
for i, url in enumerate(urls):
granules.append(session.open(url, mode="rb"))
elif kwargs["asset"] == "atlas-s3":
# for each url from CMR
for i, url in enumerate(urls):
# update url if using different format
if kwargs["format"] in ("zarr",):
prefix, _ = posixpath.splitext(url)
url = f"{prefix}.{kwargs['format']}"
# get presigned url for granule
key = s3_key(url)
# append to granules
granules.append(s3_presigned_url(kwargs["bucket"], key))
elif kwargs["asset"] == "nsidc-https":
# for each url from CMR
for i, url in enumerate(urls):
# verify that granule exists locally
granule = pathlib.Path(ids[i])
if not granule.exists():
from_nsidc(url, local=granule)
# append to granules
granules.append(granule)
elif kwargs["asset"] == "atlas-local":
# for each url from CMR
for i, url in enumerate(urls):
# update url if using different format
if kwargs["format"] in ("zarr",):
prefix, _ = posixpath.splitext(ids[i])
ids[i] = f"{prefix}.{kwargs['format']}"
# verify that granule exists locally
directory = pathlib.Path(kwargs["directory"] or ".")
granule = directory.joinpath(ids[i]).expanduser().absolute()
if not granule.exists() and (kwargs["format"] == "nc"):
from_nsidc(url, local=granule)
elif not granule.exists():
raise FileNotFoundError(str(granule))
# append to granules
granules.append(granule)
except Exception:
# unavailable granule
logging.debug(traceback.format_exc())
pass
else:
# return the granules
if len(granules) == 1:
# return as string for a single granule
return granules[0]
else:
# return as list for multiple granules
return granules
# try getting unreleased or old granule
try:
# file formatting string for ATL14/15 granules
# 1: product
# 2: region identification
# 3-4: start and end cycle
# 5: horizontal spatial resolution
# 6: data release
# 7: data version
file_format = "{0}_{1}_{2:02d}{3:02d}_{4}_{5:03d}_{6:02d}.{7}"
# use default start and end cycle
if kwargs["cycles"] is None:
# start and end cycle for releases
cycles = {}
cycles["001"] = (3, 11)
cycles["002"] = (3, 14)
cycles["003"] = (3, 18)
cycles["004"] = (3, 21)
kwargs["cycles"] = cycles[kwargs["release"]]
# for each requested region
for region in kwargs["region"]:
# format granule for unreleased data
file = file_format.format(
kwargs["product"],
region,
int(kwargs["cycles"][0]),
int(kwargs["cycles"][1]),
kwargs["resolution"],
int(kwargs["release"]),
int(kwargs["version"]),
kwargs["format"],
)
# unreleased granule from local or project s3
if kwargs["asset"] == "atlas-local":
# local granule for unreleased data
directory = pathlib.Path(kwargs["directory"] or ".")
granule = directory.joinpath(file).expanduser().absolute()
# verify that unreleased granule exists locally
if not granule.exists():
raise FileNotFoundError(str(granule))
elif kwargs["asset"] == "atlas-s3":
# s3 urls for unreleased data
# full s3 key path
key = posixpath.join(
"ATLAS", kwargs["product"], kwargs["release"], "2019", file
)
# get presigned url for granule
granule = s3_presigned_url(kwargs["bucket"], key)
# append to granules
granules.append(granule)
except Exception:
# unavailable granule
logging.debug(traceback.format_exc())
pass
else:
# return the granules
if len(granules) == 1:
# return as string for a single granule
return granules[0]
else:
# return as list for multiple granules
return granules
# raise exception if no granule available
if len(granules) == 0:
raise ValueError("Unavailable granule")