Skip to content

data_creation

data_creation

This package encapsulates Anne Hulsey's data processing pipeline.

Modules:

Name Description
constants

defined constants for this module

query_NSHM

get hazard data from the NSHM hazard API.

NSHM_to_hdf5

helper functions for saving hazard data as an HDF5 file.

extract_data

helper functions to read the the NSHM hdf5.

sa_parameter_generation

derives the PGA, Sa,s, and Tc parameters from the NSHM hazard curves.

dm_parameter_generation

produces the magnitude and distances values for the parameter table.

mean_magnitudes

retrieves magnitude data from the NSHM hazard API

gis_data

geospatial analysis for the distance to faults

util

helper function for formatting the latitude and longitude labels

constants

This module contains constants.

AGG_LIST = ['mean', '0.9'] module-attribute

ALL_SITES: Dict[str, Dict[str, Any]] = {location_by_id(_id)['name']: location_by_id(_id)for _id in LOCATION_LISTS[location_list]['locations']} module-attribute

CFM_URL = 'https://www.gns.cri.nz/assets/Data-and-Resources/Download-files/Community-Hazard-Model/NZ_CFM_v1_0_shapefile.zip' module-attribute

DEFAULT_RPS = [25, 50, 100, 250, 500, 1000, 2500] module-attribute

IMTL_LIST = [0.0001, 0.0002, 0.0004, 0.0006, 0.0008, 0.001, 0.002, 0.004, 0.006, 0.008, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] module-attribute

IMT_LIST = ['PGA', 'SA(0.1)', 'SA(0.15)', 'SA(0.2)', 'SA(0.25)', 'SA(0.3)', 'SA(0.35)', 'SA(0.4)', 'SA(0.5)', 'SA(0.6)', 'SA(0.7)', 'SA(0.8)', 'SA(0.9)', 'SA(1.0)', 'SA(1.25)', 'SA(1.5)', 'SA(1.75)', 'SA(2.0)', 'SA(2.5)', 'SA(3.0)', 'SA(3.5)', 'SA(4.0)', 'SA(4.5)', 'SA(5.0)', 'SA(6.0)', 'SA(7.5)', 'SA(10.0)'] module-attribute

LOCATION_REPLACEMENTS: dict[str, LocationReplacement] = {'Auckland': LocationReplacement('Auckland', ['Manukau City']), 'Tauranga': LocationReplacement('Tauranga', ['Mount Maunganui']), 'Wellington': LocationReplacement('Wellington', ['Wellington CBD']), 'Lower Hutt': LocationReplacement('Lower Hutt', ['Wainuiomata', 'Eastbourne'])} module-attribute

LOWER_BOUND_PARAMETERS: dict[str, Any] = {'controlling_site': 'Auckland', 'controlling_percentile': 0.9} module-attribute

PGA_N_DP = 2 module-attribute

PGA_REDUCTIONS: dict[str, PGA_reductions] = {'IV': PGA_reductions('IV', 0.076, 0.123, 0.198), 'V': PGA_reductions('V', 0.114, 0.227, 0.137), 'VI': PGA_reductions('VI', 0.085, 0.171, 0.133)} module-attribute

POE_TO_RP = {poe: rpfor (rp, poe) in RP_TO_POE.items()} module-attribute

POLYGON_PATH = Path(RESOURCES_FOLDER) / 'pipeline/v1/input_data' / 'polygons_locations.geojson' module-attribute

PSV_N_DP = 2 module-attribute

RP_TO_POE = {10000: ProbabilityEnum._05_PCT_IN_50YRS, 5000: ProbabilityEnum._1_PCT_IN_50YRS, 2500: ProbabilityEnum._2_PCT_IN_50YRS, 1000: ProbabilityEnum._5_PCT_IN_50YRS, 500: ProbabilityEnum._10_PCT_IN_50YRS, 250: ProbabilityEnum._18_PCT_IN_50YRS, 100: ProbabilityEnum._39_PCT_IN_50YRS, 50: ProbabilityEnum._63_PCT_IN_50YRS, 25: ProbabilityEnum._86_PCT_IN_50YRS} module-attribute

SAS_N_DP = 2 module-attribute

SITE_CLASSES: dict[str, SiteClass] = {'I': SiteClass('I', 750, 'Site Class I', 750, np.nan), 'II': SiteClass('II', 525, 'Site Class II', 450, 750), 'III': SiteClass('III', 375, 'Site Class III', 300, 450), 'IV': SiteClass('IV', 275, 'Site Class IV', 250, 300), 'V': SiteClass('V', 225, 'Site Class V', 200, 250), 'VI': SiteClass('VI', 175, 'Site Class VI', 150, 200)} module-attribute

TC_N_SF = 2 module-attribute

VS30_LIST = [SITE_CLASSES[sc].representative_vs30 for sc in SITE_CLASSES.keys()] module-attribute

akl_location_id = 'srg_29' module-attribute

location_grid = 'NZ_0_1_NB_1_1' module-attribute

location_list = 'SRWG214' module-attribute

LocationReplacement

Bases: NamedTuple

Source code in nzssdt_2023/data_creation/constants.py
23
24
25
class LocationReplacement(NamedTuple):
    preferred_location: str
    replaced_locations: list[str]

preferred_location: str instance-attribute

replaced_locations: list[str] instance-attribute

PGA_reductions

Bases: NamedTuple

Source code in nzssdt_2023/data_creation/constants.py
28
29
30
31
32
class PGA_reductions(NamedTuple):
    site_class: str
    A0: float
    A1: float
    PGA_threshold: float

A0: float instance-attribute

A1: float instance-attribute

PGA_threshold: float instance-attribute

site_class: str instance-attribute

SiteClass

Bases: NamedTuple

Source code in nzssdt_2023/data_creation/constants.py
15
16
17
18
19
20
class SiteClass(NamedTuple):
    site_class: str
    representative_vs30: int
    label: str
    lower_bound: float
    upper_bound: float

label: str instance-attribute

lower_bound: float instance-attribute

representative_vs30: int instance-attribute

site_class: str instance-attribute

upper_bound: float instance-attribute

lat_lon_from_id(id)

Source code in nzssdt_2023/data_creation/constants.py
35
36
def lat_lon_from_id(id):
    return (location_by_id(id)["latitude"], location_by_id(id)["longitude"])

query_NSHM

Functions to get hazard data from the NSHM hazard API.

log = logging.getLogger(__name__) module-attribute

create_sites_df(named_sites: bool = True, site_list: Optional[List[str]] = None, cropped_grid: bool = False, grid_limits: Tuple[float, float, float, float] = (-np.inf, np.inf, -np.inf, np.inf), site_limit: int = 0) -> pdt.DataFrame

creates a pd dataframe of the sites of interest

Parameters:

Name Type Description Default
named_sites bool

if True returns SRG sites, False returns lat/lon sites

True
site_list Optional[List[str]]

specifies a subset of SRG sites

None
cropped_grid bool

True returns all lat/lon sites, False crops to grid_limits

False
grid_limits Tuple[float, float, float, float]

set outer bound coordinates of the grid [min_lat, max_lat, min_lon, max_lon]

(-np.inf, np.inf, -np.inf, np.inf)
site_limit int

for creating test fixtures

0

Returns: a dataframe idx: sites, cols: ['latlon', 'lat', 'lon']

Source code in nzssdt_2023/data_creation/query_NSHM.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def create_sites_df(
    named_sites: bool = True,
    site_list: Optional[List[str]] = None,
    cropped_grid: bool = False,
    grid_limits: Tuple[float, float, float, float] = (-np.inf, np.inf, -np.inf, np.inf),
    site_limit: int = 0,  # for creating test artefacts
) -> "pdt.DataFrame":
    """
    creates a pd dataframe of the sites of interest

    Args:
        named_sites: if True returns SRG sites, False returns lat/lon sites
        site_list:  specifies a subset of SRG sites
        cropped_grid:  True returns all lat/lon sites, False crops to grid_limits
        grid_limits: set outer bound coordinates of the grid [min_lat, max_lat, min_lon, max_lon]
        site_limit: for creating test fixtures
    Returns:
        a dataframe idx: sites, cols: ['latlon', 'lat', 'lon']
    """

    # create a dataframe with named sites
    if named_sites:
        id_list = LOCATION_LISTS["SRWG214"]["locations"]

        # if no list is passed, include all named sites
        if site_list is None:
            site_list = [location_by_id(loc_id)["name"] for loc_id in id_list]

        if site_limit:
            site_list = site_list[:site_limit]

        controlling_site = LOWER_BOUND_PARAMETERS["controlling_site"]
        if controlling_site not in site_list:
            site_list = [
                controlling_site
            ] + site_list  # ensure the controlling site is included

        # collect the ids for the relevant sites
        id_list = [
            loc_id for loc_id in id_list if location_by_id(loc_id)["name"] in site_list
        ]

        # create the df of named sites
        sites = pd.DataFrame(index=site_list, dtype="str")

        for loc_id in id_list:
            latlon = CodedLocation(
                location_by_id(loc_id)["latitude"],
                location_by_id(loc_id)["longitude"],
                0.001,
            ).code
            lat, lon = latlon.split("~")
            sites.loc[location_by_id(loc_id)["name"], ["latlon", "lat", "lon"]] = [
                latlon,
                lat,
                lon,
            ]

    # create a dataframe with latlon sites
    else:
        site_list_id = "NZ_0_1_NB_1_1"
        # resample = 0.1
        grid = RegionGrid[site_list_id]
        grid_locs = grid.load()

        # remove empty location
        i_loc = grid_locs.index((-34.7, 172.7))
        grid_locs = grid_locs[0:i_loc] + grid_locs[i_loc + 1 :]

        # if no list is passed, include all gridded sites
        if site_list is None:
            site_list = [
                CodedLocation(*gloc, resolution=0.001).code for gloc in grid_locs
            ]
        else:
            # remove named sites
            new_site_list = []
            for latlon in site_list:
                try:
                    lat, lon = latlon.split("~")
                    new_site_list.append(latlon)
                except ValueError as exc:
                    print(f"got {exc} for {latlon}")
                    continue
                site_list = new_site_list

        if site_limit:
            site_list = site_list[:site_limit]

        # print(site_list)
        # create the df of gridded locations
        sites = pd.DataFrame(index=site_list, dtype="str")
        for latlon in site_list:
            try:
                lat, lon = latlon.split("~")
            except ValueError as exc:
                print(f"got {exc} for {latlon}")
                continue
            sites.loc[latlon, ["latlon", "lat", "lon"]] = [latlon, lat, lon]

        # remove sites based on latlon
        if cropped_grid:
            min_lat, max_lat, min_lon, max_lon = grid_limits
            sites["float_lat"] = [float(lat) for lat in sites["lat"]]
            sites = sites[
                (sites["float_lat"] >= min_lat) & (sites["float_lat"] <= max_lat)
            ].drop(["float_lat"], axis=1)
            sites["float_lon"] = [float(lon) for lon in sites["lon"]]
            sites = sites[
                (sites["float_lon"] >= min_lon) & (sites["float_lon"] <= max_lon)
            ].drop(["float_lon"], axis=1)

        if site_limit:
            sites = sites[:site_limit]

    return sites.sort_values(by=["lat", "lon"])

retrieve_hazard_curves(sites: pdt.DataFrame, vs30_list: List[int], imt_list: List[str], agg_list: List[str], hazard_id: str) -> Tuple[npt.NDArray, List[float]]

retrieves the hazard curves for the sites, vs30s, imts, and aggs of interest

Parameters:

Name Type Description Default
sites pdt.DataFrame

idx: sites, cols: ['latlon', 'lat', 'lon']

required
vs30_list List[int]

vs30s of interest

required
imt_list List[str]

imts of interest

required
agg_list List[str]

agg types of interest (e.g., mean or "0.f" where f is a fractile

required
hazard_id str

query the NSHM

required

Returns:

Type Description
Tuple[npt.NDArray, List[float]]

np.array hazard curves indexed by [n_vs30s, n_sites, n_imts, n_imtls, n_aggs] list intensities included

Source code in nzssdt_2023/data_creation/query_NSHM.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def retrieve_hazard_curves(
    sites: "pdt.DataFrame",
    vs30_list: List[int],
    imt_list: List[str],
    agg_list: List[str],
    hazard_id: str,
) -> Tuple["npt.NDArray", List[float]]:
    """
    retrieves the hazard curves for the sites, vs30s, imts, and aggs of interest

    Args:
        sites: idx: sites, cols: ['latlon', 'lat', 'lon']
        vs30_list:  vs30s of interest
        imt_list:   imts of interest
        agg_list:   agg types of interest (e.g., mean or "0.f" where f is a fractile
        hazard_id:  query the NSHM

    Returns:
        np.array   hazard curves indexed by [n_vs30s, n_sites, n_imts, n_imtls, n_aggs]
             list   intensities included
    """

    log.info(
        f"begin retrieve_hazard_curves for {len(sites)} sites; {len(vs30_list)} vs30;  {len(agg_list)} aggs;"
    )

    # call a location to get the imtls that are returned
    res = next(
        get_hazard_curves(
            sites["latlon"][:1], vs30_list[:1], [hazard_id], imt_list[:1], agg_list[:1]
        )
    )
    imtl_list = [float(val.lvl) for val in res.values]

    # initialize hcurves
    hcurves = -1 * np.ones(
        [len(vs30_list), len(sites), len(imt_list), len(imtl_list), len(agg_list)]
    )

    log.info("Querying hazard curves...")

    # cycle through all hazard parameters
    count = 0
    CHUNK = 1000
    expected_count = (
        len(sites["latlon"]) * len(vs30_list) * len(imt_list) * len(agg_list)
    )
    timings = []
    t0 = dt.datetime.now()
    for res in get_hazard_curves(
        sites["latlon"], vs30_list, [hazard_id], imt_list, agg_list
    ):
        count += 1
        delta = dt.datetime.now() - t0
        duration = delta.seconds + delta.microseconds / 1e6
        timings.append(duration)  # time.delta
        if count % CHUNK == 0:
            eta = dt.datetime.now() + dt.timedelta(
                seconds=(expected_count - count) * sum(timings[-10:]) / 10
            )
            log.info(
                f"retrieve_hazard_curves progress: {count} of {expected_count}. "
                f"Approx {(count/expected_count) * 100:.1f} % progress. "
                f"Last {CHUNK} curves took {sum(timings):.4f}s. "
                f"ETA: {eta}"
            )
            timings = []

        lat = res.lat
        lon = res.lon
        vs30 = res.vs30
        imt = res.imt
        agg = res.agg

        i_site = np.where(sites["latlon"] == f"{lat:.3f}~{lon:.3f}")
        i_vs30 = vs30_list.index(vs30)
        i_imt = imt_list.index(imt)
        i_agg = agg_list.index(agg)

        hcurves[i_vs30, i_site, i_imt, :, i_agg] = [val.val for val in res.values]
        t0 = dt.datetime.now()

    # # identify any missing data and produce a warning
    site_list = list(sites.index)

    if np.sum(hcurves < 0) != 0:
        vs30_idx, site_idx, imt_idx, imtl_idx, agg_idx = np.where(hcurves < 0)

        print("\nMissing NSHM data from:")
        print(f"\t{[vs30_list[idx] for idx in np.unique(vs30_idx)]}")
        if len(np.unique(site_idx)) > 5:
            print(
                f"\t{[site_list[idx] for idx in np.unique(site_idx)[:5]]} and more..."
            )
        else:
            print(f"\t{[site_list[idx] for idx in np.unique(site_idx)]}")
        print(f"\t{[imt_list[idx] for idx in np.unique(imt_idx)]}")
        print(f"\t{[agg_list[idx] for idx in np.unique(agg_idx)]}")

    return hcurves, imtl_list

NSHM_to_hdf5

helper functions for producing an HDF5 file for the NZSSDT tables

g = 9.80665 module-attribute

acc_to_disp(acc, t)

:param acc: float intensity in acceleration [g] :param t: float time in seconds

:return: float intensity in displacement [m]

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
45
46
47
48
49
50
51
52
53
54
def acc_to_disp(acc, t):
    """

    :param acc: float   intensity in acceleration [g]
    :param t: float     time in seconds

    :return: float      intensity in displacement [m]
    """

    return (acc * g) * (t / (2 * np.pi)) ** 2

acc_to_vel(acc, t)

:param acc: float intensity in acceleration [g] :param t: float time in seconds

:return: float intensity in velocity [m/s]

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
57
58
59
60
61
62
63
64
65
66
def acc_to_vel(acc, t):
    """

    :param acc: float   intensity in acceleration [g]
    :param t: float     time in seconds

    :return: float      intensity in velocity [m/s]
    """

    return (acc * g) * (t / (2 * np.pi))

add_uniform_hazard_spectra(data: Dict[str, Any], hazard_rps: Optional[List[int]] = None) -> Dict[str, Any]

Adds uniform hazard spectra to the data dictionary, based on the input hazard_rps

Parameters:

Name Type Description Default
data Dict[str, Any]

dictionary containing hazard curves and metadata for vs30, sites, intensity measures

required
hazard_rps Optional[List[int]]

list of return periods of interest (inverse of annual probability of exceedance, apoe)

None

Returns:

Type Description
Dict[str, Any]

updated dictionary includes design intensities

If hazard_rps is None (default) or empty, the default return periods, constants.DEFAULT_RPS, will be used.

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def add_uniform_hazard_spectra(
    data: Dict[str, Any],
    hazard_rps: Optional[List[int]] = None,
) -> Dict[str, Any]:
    """
    Adds uniform hazard spectra to the data dictionary, based on the input hazard_rps

    Args:
        data: dictionary containing hazard curves and metadata for vs30, sites, intensity measures
        hazard_rps: list of return periods of interest (inverse of annual probability of exceedance, apoe)

    Returns:
        updated dictionary includes design intensities

    If hazard_rps is None (default) or empty, the default return periods, `constants.DEFAULT_RPS`, will be used.
    """

    hazard_rps = hazard_rps or DEFAULT_RPS

    imtls = data["metadata"]["acc_imtls"]
    data["metadata"]["disp_imtls"] = convert_imtls_to_disp(imtls)

    # get poe values
    print("Calculating APoE intensities.")
    data["hazard_design"] = {}
    data["hazard_design"]["hazard_rps"] = hazard_rps

    for intensity_type in ["acc", "disp"]:
        data["hazard_design"][intensity_type] = {}
        data["hazard_design"][intensity_type][
            "stats_im_hazard"
        ] = calculate_hazard_design_intensities(data, hazard_rps, intensity_type)

    return data

calculate_hazard_design_intensities(data: Dict[str, Any], hazard_rps: Union[List[int], npt.NDArray], intensity_type='acc')

calculate design intensities based on an annual probability of exceedance (APoE)

Todo

The hazard curve is interpolated linearly in logspace. However, this function does that in natural log and converts back with the exponent It should be done in log base10 and converted back through 10**x

:param data: dictionary containing hazard curves and metadata for vs30, sites, intensity measures :param hazard_rps: list containing the desired return periods (1 / APoE)

:return: np arrays for all intensities from the hazard curve realizations and stats (mean and quantiles)

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def calculate_hazard_design_intensities(
    data: Dict[str, Any],
    hazard_rps: Union[List[int], "npt.NDArray"],
    intensity_type="acc",
):
    """
    calculate design intensities based on an annual probability of exceedance (APoE)

    Todo:
        The hazard curve is interpolated linearly in logspace.
        However, this function does that in natural log and converts back with the exponent
        It should be done in log base10 and converted back through 10**x

    :param data: dictionary containing hazard curves and metadata for vs30, sites, intensity measures
    :param hazard_rps: list containing the desired return periods (1 / APoE)

    :return: np arrays for all intensities from the hazard curve realizations and stats (mean and quantiles)
    """

    hazard_rps = np.array(hazard_rps)
    imtls = data["metadata"][f"{intensity_type}_imtls"]
    hcurves_stats = np.array(data["hcurves"]["hcurves_stats"])

    [n_vs30, n_sites, n_imts, _, n_stats] = hcurves_stats.shape

    n_rps = len(hazard_rps)

    stats_im_hazard = np.zeros([n_vs30, n_sites, n_imts, n_rps, n_stats])

    for i_vs30 in range(n_vs30):
        for i_site in range(n_sites):
            for i_imt, imt in enumerate(imtls.keys()):

                # loop over the median and any quantiles
                for i_stat in range(n_stats):
                    # the interpolation is done as a linear interpolation in logspace
                    # all inputs are converted to the natural log and the output is converted back via the exponent
                    stats_im_hazard[i_vs30, i_site, i_imt, :, i_stat] = np.exp(
                        np.interp(
                            np.log(1 / hazard_rps),
                            np.log(
                                np.flip(hcurves_stats[i_vs30, i_site, i_imt, :, i_stat])
                            ),
                            np.log(np.flip(imtls[imt])),
                        )
                    )
    return stats_im_hazard

convert_imtls_to_disp(acc_imtls)

converts the acceleration intensity measure types and levels to spectral displacements

:param acc_imtls: dictionary keys: acc intensity measure names, values: intensity levels

:return: dictionary keys: disp intensity measure names, values: intensity levels

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def convert_imtls_to_disp(acc_imtls):
    """
    converts the acceleration intensity measure types and levels to spectral displacements

    :param acc_imtls: dictionary   keys: acc intensity measure names, values: intensity levels

    :return: dictionary   keys: disp intensity measure names, values: intensity levels
    """

    disp_imtls = {}
    for acc_imt in acc_imtls.keys():
        period = period_from_imt(acc_imt)
        disp_imt = acc_imt.replace("A", "D")

        disp_imtls[disp_imt] = acc_to_disp(
            np.array(acc_imtls[acc_imt]), period
        ).tolist()

    return disp_imtls

create_hcurve_dictionary(sites, vs30_list, imt_list, imtl_list, agg_list, hcurves)

compile hazard data into a dictionary

:param sites: pd dataframe idx: sites, cols: ['latlon', 'lat', 'lon'] :param vs30_list: list vs30s of interest :param imt_list: list imts of interest :param imtl_list: list intensity levels :param agg_list: list agg types of interest (e.g., mean or "0.f" where f is a fractile :param hcurves:

:return: dictionary hazard curves including metadata

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def create_hcurve_dictionary(sites, vs30_list, imt_list, imtl_list, agg_list, hcurves):
    """
    compile hazard data into a dictionary

    :param sites: pd dataframe  idx: sites, cols: ['latlon', 'lat', 'lon']
    :param vs30_list: list  vs30s of interest
    :param imt_list:  list  imts of interest
    :param imtl_list: list  intensity levels
    :param agg_list:  list  agg types of interest (e.g., mean or "0.f" where f is a fractile
    :param hcurves:

    :return: dictionary   hazard curves including metadata
    """

    # create dictionary
    data = {}

    # prep metadata
    imtls = {}
    for imt in imt_list:
        imtls[imt] = imtl_list

    data["metadata"] = {}
    data["metadata"]["quantiles"] = [float(q) for q in agg_list if q != "mean"]
    data["metadata"]["acc_imtls"] = imtls
    data["metadata"]["disp_imtls"] = convert_imtls_to_disp(imtls)
    data["metadata"]["sites"] = sites
    data["metadata"]["vs30s"] = vs30_list

    # add hcurves
    data["hcurves"] = {}
    data["hcurves"]["hcurves_stats"] = hcurves

    return data

period_from_imt(imt)

retrieves period in seconds from the intensity measure type

:param imt: string intensity measure type

:return: float period

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def period_from_imt(imt):
    """
    retrieves period in seconds from the intensity measure type

    :param imt: string  intensity measure type

    :return: float  period
    """

    if imt in ["PGA", "PGD"]:
        period = 0
    else:
        period = float(re.split(r"\(|\)", imt)[1])

    return period

query_NSHM_to_hdf5(hf_name: Path, hazard_id: str, site_list: pd.DataFrame, site_limit: int = 0)

Query the NSHM and save the results to an hdf5

Parameters:

Name Type Description Default
hf_name Path

name of hdf5 file with hazard curve data

required
hazard_id str

NSHM model id

required
site_list pd.DataFrame

sites to include in the sa parameter table

required
site_limit int

for building test fixtures

0
Todo
  • Chris BC, the default hazard_id should actually be part of your version control
Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def query_NSHM_to_hdf5(
    hf_name: Path,
    hazard_id: str,
    site_list: pd.DataFrame,
    site_limit: int = 0,
):
    """Query the NSHM and save the results to an hdf5

    Args:
        hf_name: name of hdf5 file with hazard curve data
        hazard_id: NSHM model id
        site_list: sites to include in the sa parameter table
        site_limit: for building test fixtures

    Todo:
        - Chris BC, the default hazard_id should actually be part of your version control

    """

    # query NSHM
    hcurves, _ = q_haz.retrieve_hazard_curves(
        site_list, VS30_LIST, IMT_LIST, AGG_LIST, hazard_id
    )

    # prep hcurves dictionary
    data = create_hcurve_dictionary(
        site_list, VS30_LIST, IMT_LIST, IMTL_LIST, AGG_LIST, hcurves
    )

    # add uhs spectra
    data = add_uniform_hazard_spectra(data)

    # save file
    save_hdf(hf_name, data)

save_hdf(hf_name, data)

Saves the data dictionary as an hdf5 file for later use.

:param hf_name: name of the hdf5 file :param data: dictionary containing hazard curves and metadata for vs30, sites, intensity measures, and design intensities

Source code in nzssdt_2023/data_creation/NSHM_to_hdf5.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def save_hdf(hf_name, data):
    """
    Saves the data dictionary as an hdf5 file for later use.

    :param hf_name: name of the hdf5 file
    :param data: dictionary containing hazard curves and metadata for
                     vs30, sites, intensity measures, and design intensities
    """
    with h5py.File(hf_name, "w") as hf:

        # add metadata
        grp = hf.create_group("metadata")
        grp.attrs["vs30s"] = data["metadata"]["vs30s"]
        grp.attrs["quantiles"] = data["metadata"]["quantiles"]
        grp.attrs["acc_imtls"] = str(data["metadata"]["acc_imtls"])
        grp.attrs["disp_imtls"] = str(data["metadata"]["disp_imtls"])
        grp.attrs["sites"] = str(data["metadata"]["sites"].to_dict())

        # add hazard curves
        grp = hf.create_group("hcurves")
        for dset_name in ["hcurves_stats"]:
            dset = grp.create_dataset(
                dset_name, np.array(data["hcurves"][dset_name]).shape
            )
            dset[:] = np.array(data["hcurves"][dset_name])

        # add poe values
        if "hazard_design" in data.keys():
            grp = hf.create_group("hazard_design")
            grp.attrs["hazard_rps"] = data["hazard_design"]["hazard_rps"]
            for intensity_type in ["acc", "disp"]:
                subgrp = grp.create_group(intensity_type)
                for dset_name in ["stats_im_hazard"]:
                    dset = subgrp.create_dataset(
                        dset_name,
                        np.array(
                            data["hazard_design"][intensity_type][dset_name]
                        ).shape,
                    )
                    dset[:] = np.array(data["hazard_design"][intensity_type][dset_name])

    print(f"\nHazard curve data is saved as {hf_name}")

extract_data

This module extracts the data and metadata in the hdf5 containing the NSHM data.

extract_APoEs(data_file: str | Path) -> Tuple[List[str], List[int]]

Extract uniform hazard spectra annual probabilities of exceedance from the hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
APoEs List[str]

list of APoE strings

hazard_rp_list List[int]

list of return periods

Source code in nzssdt_2023/data_creation/extract_data.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def extract_APoEs(data_file: str | Path) -> Tuple[List[str], List[int]]:
    """Extract uniform hazard spectra annual probabilities of exceedance from the hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        APoEs: list of APoE strings
        hazard_rp_list: list of return periods

    """
    with h5py.File(data_file, "r") as hf:
        hazard_rp_list = list(hf["hazard_design"].attrs["hazard_rps"])
    APoEs = [f"APoE: 1/{hazard_rp}" for hazard_rp in hazard_rp_list]

    return APoEs, hazard_rp_list

extract_quantiles(data_file: str | Path) -> List[float]

Extract hazard quantiles from the hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
quantiles List[float]

list of quantiles

Source code in nzssdt_2023/data_creation/extract_data.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def extract_quantiles(data_file: str | Path) -> List[float]:
    """Extract hazard quantiles from the hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        quantiles: list of quantiles

    """
    with h5py.File(data_file, "r") as hf:
        quantiles = list(hf["metadata"].attrs["quantiles"])

    return quantiles

extract_sites(data_file: str | Path) -> pdt.DataFrame

Extract sites from the hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
sites pdt.DataFrame

dataframe of sites with lat/lons

Source code in nzssdt_2023/data_creation/extract_data.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def extract_sites(data_file: str | Path) -> "pdt.DataFrame":
    """Extract sites from the hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        sites: dataframe of sites with lat/lons

    """
    with h5py.File(data_file, "r") as hf:
        sites = pd.DataFrame(ast.literal_eval(hf["metadata"].attrs["sites"]))

    return sites

extract_spectra(data_file: str | Path) -> Tuple[npt.NDArray, dict]

Extract the uniform hazard spectra from the hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
acc_spectra npt.NDArray

acceleration spectra (dimensions: vs30, site, return period, statistic)

imtls dict

keys: intensity measures e.g., SA(1.0), values: list of intensity levels

Source code in nzssdt_2023/data_creation/extract_data.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def extract_spectra(data_file: str | Path) -> Tuple["npt.NDArray", dict]:
    """Extract the uniform hazard spectra from the hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        acc_spectra: acceleration spectra (dimensions: vs30, site, return period, statistic)
        imtls: keys: intensity measures e.g., SA(1.0), values: list of intensity levels

    """
    with h5py.File(data_file, "r") as hf:
        imtls = ast.literal_eval(hf["metadata"].attrs["acc_imtls"])
        acc_spectra = hf["hazard_design"]["acc"]["stats_im_hazard"][:]

    return acc_spectra, imtls

extract_vs30s(data_file: str | Path) -> List[int]

Extract the vs30 values from the hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
vs30_list List[int]

list of vs30s included in hdf5

Source code in nzssdt_2023/data_creation/extract_data.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def extract_vs30s(data_file: str | Path) -> List[int]:
    """Extract the vs30 values from the hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        vs30_list: list of vs30s included in hdf5

    """
    with h5py.File(data_file, "r") as hf:
        vs30_list = list(hf["metadata"].attrs["vs30s"])

    return vs30_list

sa_parameter_generation

This module derives the PGA, Sa,s, Tc, and Td parameters from the NSHM hazard curves.

PGA_REDUCTION_ENABLED = True module-attribute

PGA_ROUNDING_ENABLED = True module-attribute

log = logging.getLogger(__name__) module-attribute

Td_fit_error(td: float, relevant_periods: npt.NDArray, relevant_spectrum: npt.NDArray, pga=float, sas=float, tc=float) -> float

Calculate the spectral fit error term for the selected td value. The error is evaluated as the sum of the square root of the absolute difference between the fit spectrum and the original spectrum, at relevant periods.

Parameters:

Name Type Description Default
td float

spectral-velocity-plateau corner period [seconds]

required
relevant_periods npt.NDArray

periods included in the relevant domain [seconds]

required
relevant_spectrum npt.NDArray

spectrum over the relevant periods [g]

required
pga

peak ground acceleration [g]

float
sas

short-period spectral acceleration [g]

float
tc

spectral-acceleration-plateau corner period [seconds]

float

Returns:

Name Type Description
error float

error term

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def Td_fit_error(
    td: float,
    relevant_periods: "npt.NDArray",
    relevant_spectrum: "npt.NDArray",
    pga=float,
    sas=float,
    tc=float,
) -> float:
    """Calculate the spectral fit error term for the selected td value.
    The error is evaluated as the sum of the square root of the absolute difference between
    the fit spectrum and the original spectrum, at relevant periods.

    Args:
        td: spectral-velocity-plateau corner period [seconds]
        relevant_periods: periods included in the relevant domain [seconds]
        relevant_spectrum: spectrum over the relevant periods [g]
        pga: peak ground acceleration [g]
        sas: short-period spectral acceleration [g]
        tc: spectral-acceleration-plateau corner period [seconds]

    Returns:
        error: error term
    """
    fitted_spectrum = np.array(
        [uhs_value(period, pga, sas, tc, td) for period in relevant_periods]
    )
    error_array = np.abs(relevant_spectrum - fitted_spectrum)
    error = np.sum(error_array**2)
    return error

acc_spectra_to_vel(acc_spectra: npt.NDArray, imtls: dict) -> npt.NDArray

Convert uniform hazard spectra from acceleration to velocity

Parameters:

Name Type Description Default
acc_spectra npt.NDArray

acceleration spectra

required
imtls dict

keys: intensity measures e.g., SA(1.0), values: list of intensity levels

required

Returns:

Name Type Description
vel_spectra npt.NDArray

velocity spectra

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def acc_spectra_to_vel(acc_spectra: "npt.NDArray", imtls: dict) -> "npt.NDArray":
    """Convert uniform hazard spectra from acceleration to velocity

    Args:
        acc_spectra: acceleration spectra
        imtls: keys: intensity measures e.g., SA(1.0), values: list of intensity levels

    Returns:
        vel_spectra: velocity spectra

    """
    vel_spectra = np.zeros_like(acc_spectra)
    period_list = [period_from_imt(imt) for imt in list(imtls.keys())]

    for i_p, period in enumerate(period_list):
        vel_spectra[:, :, i_p, :, :] = acc_to_vel(acc_spectra[:, :, i_p, :, :], period)

    return vel_spectra

calc_R_PGA(pga: float, site_class: str) -> float

Calculate the reduction factor for the peak ground acceleration (Eq. C3.15)

Parameters:

Name Type Description Default
pga float

peak ground acceleration [g]

required
site_class str

roman numeral

required

Returns:

Name Type Description
r_pga float

reduction factor for peak ground acceleration

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def calc_R_PGA(pga: float, site_class: str) -> float:
    """Calculate the reduction factor for the peak ground acceleration (Eq. C3.15)

    Args:
        pga: peak ground acceleration [g]
        site_class: roman numeral

    Returns:
        r_pga: reduction factor for peak ground acceleration

    """
    r_pga = 0

    if site_class in PGA_REDUCTIONS.keys():
        A0 = PGA_REDUCTIONS[site_class].A0
        A1 = PGA_REDUCTIONS[site_class].A1
        PGA_threshold = PGA_REDUCTIONS[site_class].PGA_threshold

        if pga >= PGA_threshold:
            r_pga = A0 * np.log(pga) + A1

    return r_pga

calc_reduced_PGA(pga: float, site_class: str) -> float

Calculate the adjusted peak ground acceleration (Eq. C3.14)

Parameters:

Name Type Description Default
pga float

peak ground acceleration [g]

required
site_class str

roman numeral

required

Returns:

Name Type Description
reduced_pga float

adjusted peak ground acceleration

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def calc_reduced_PGA(pga: float, site_class: str) -> float:
    """Calculate the adjusted peak ground acceleration (Eq. C3.14)

    Args:
        pga: peak ground acceleration [g]
        site_class: roman numeral

    Returns:
        reduced_pga: adjusted peak ground acceleration
    """
    r_pga = calc_R_PGA(pga, site_class)
    reduced_pga = pga * (1 - r_pga)

    return reduced_pga

calculate_parameter_arrays(data_file: str | Path) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray, npt.NDArray]

Calculate PGA, Sa,s, and Tc values for uniform hazard spectra in hdf5

Parameters:

Name Type Description Default
data_file str | Path

name of hazard hdf5 file

required

Returns:

Name Type Description
PGA npt.NDArray

adjusted peak ground acceleration [g] (Eqn C3.14)

Sas npt.NDArray

short-period spectral acceleration [g] (90% of maximum spectral acceleration)

PSV npt.NDArray

95% of maximum spectral velocity [m/s]

Tc npt.NDArray

spectral-acceleration-plateau corner period [seconds]

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def calculate_parameter_arrays(
    data_file: str | Path,
) -> Tuple["npt.NDArray", "npt.NDArray", "npt.NDArray", "npt.NDArray"]:
    """Calculate PGA, Sa,s, and Tc values for uniform hazard spectra in hdf5

    Args:
        data_file: name of hazard hdf5 file

    Returns:
        PGA: adjusted peak ground acceleration [g] (Eqn C3.14)
        Sas: short-period spectral acceleration [g] (90% of maximum spectral acceleration)
        PSV: 95% of maximum spectral velocity [m/s]
        Tc : spectral-acceleration-plateau corner period [seconds]
    """

    acc_spectra, imtls = extract_spectra(data_file)
    vel_spectra = acc_spectra_to_vel(acc_spectra, imtls)

    PGA = acc_spectra[:, :, IMT_LIST.index("PGA"), :, :]

    if PGA_REDUCTION_ENABLED:
        log.debug(f"PGA array {PGA}")
        PGA = reduce_PGAs(PGA)
    else:
        log.warning(
            f"PGA reduction skipped because `PGA_REDUCTION_ENABLED` == {PGA_REDUCTION_ENABLED}"
        )

    if PGA_ROUNDING_ENABLED:
        PGA = np.round(PGA, PGA_N_DP)
    else:
        log.warning(
            f"PGA rounding skipped because `PGA_ROUNDING_ENABLED` == {PGA_ROUNDING_ENABLED}"
        )

    Sas: npt.NDArray = 0.9 * np.max(acc_spectra, axis=2)
    Sas = np.round(Sas, SAS_N_DP)

    PSV: npt.NDArray = 0.95 * np.max(vel_spectra, axis=2)
    Tc: npt.NDArray = 2 * np.pi * PSV / (Sas * g)
    Tc = sig_figs(Tc, TC_N_SF)

    return PGA, Sas, PSV, Tc

choose_site_class(vs30: Union[int, float], lower_bound: bool = False) -> str

Returns the site class for the selected vs30 value Site class definitions can be found in TS Table 3.1

Note that the site class returned here is based on vs30 alone, i.e. the a) clauses in Table 3.1

Parameters:

Name Type Description Default
vs30 Union[int, float]

vs30 of interest [m/s]

required
lower_bound bool

False returns the site class as defined in TS Table 3.1 True returns the alternate site class at the vs30 discontinuity

False

Returns:

Name Type Description
sc str

site class

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def choose_site_class(vs30: Union[int, float], lower_bound: bool = False) -> str:
    """Returns the site class for the selected vs30 value
    Site class definitions can be found in TS Table 3.1

    Note that the site class returned here is based on vs30 alone, i.e. the a) clauses in Table 3.1

    Args:
        vs30: vs30 of interest [m/s]
        lower_bound: False returns the site class as defined in TS Table 3.1
                     True returns the alternate site class at the vs30 discontinuity

    Returns:
        sc: site class
    """

    min_vs30 = SITE_CLASSES["VI"].lower_bound

    if vs30 > min_vs30:
        boundaries = np.array([SITE_CLASSES[sc].lower_bound for sc in SITE_CLASSES])

        # switches which SC is selected at the boundary
        if lower_bound:
            side = "left"
        else:
            side = "right"

        sc_idx = np.searchsorted(-boundaries, -vs30, side=side)  # type: ignore
        sc = [SITE_CLASSES[sc].site_class for sc in SITE_CLASSES][sc_idx]

    elif (vs30 == min_vs30) & lower_bound:
        sc = "VI"

    else:
        sc = "VII"

    return sc

create_mean_sa_table(PGA, Sas, PSV, Tc, mean_Td, site_list, vs30_list, hazard_rp_list)

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def create_mean_sa_table(
    PGA, Sas, PSV, Tc, mean_Td, site_list, vs30_list, hazard_rp_list
):
    i_stat = 0  # spectra index for stats in ['mean'] + quantiles

    index = site_list
    APoEs = [f"APoE: 1/{rp}" for rp in hazard_rp_list]
    site_class_list = [f"{SITE_CLASSES[sc].label}" for sc in SITE_CLASSES]
    parameters = ["PGA", "Sas", "PSV", "Tc", "Td"]
    columns = pd.MultiIndex.from_product([APoEs, site_class_list, parameters])
    df = pd.DataFrame(index=index, columns=columns).astype(float)

    log.info("create_mean_sa_table() processesing site classes")
    for sc in SITE_CLASSES.keys():
        sc_label = SITE_CLASSES[sc].label
        vs30 = int(SITE_CLASSES[sc].representative_vs30)
        i_vs30 = vs30_list.index(vs30)

        log.info(f"site class {sc} label: {sc_label} vs30: {vs30}")

        for APoE, rp in zip(APoEs, hazard_rp_list):
            i_rp = hazard_rp_list.index(rp)

            df.loc[:, (APoE, sc_label, "PGA")] = PGA[i_vs30, :, i_rp, i_stat]
            df.loc[:, (APoE, sc_label, "Sas")] = Sas[i_vs30, :, i_rp, i_stat]
            df.loc[:, (APoE, sc_label, "PSV")] = PSV[i_vs30, :, i_rp, i_stat]
            df.loc[:, (APoE, sc_label, "Tc")] = Tc[i_vs30, :, i_rp, i_stat]
            df.loc[:, (APoE, sc_label, "Td")] = mean_Td[i_vs30, :, i_rp]

    return df

create_sa_table(hf_path: Path, lower_bound_flags: bool = True) -> pdt.DataFrame

Creates a pandas dataframe with the sa parameters

Parameters:

Name Type Description Default
hf_path Path

hdf5 filename, containing the hazard data

required
lower_bound_flags bool

True includes the metadata for updating the lower bound hazard

True

Returns:

Name Type Description
df pdt.DataFrame

dataframe of sa parameters

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
def create_sa_table(hf_path: Path, lower_bound_flags: bool = True) -> "pdt.DataFrame":
    """Creates a pandas dataframe with the sa parameters

    Args:
        hf_path: hdf5 filename, containing the hazard data
        lower_bound_flags: True includes the metadata for updating the lower bound hazard

    Returns:
        df: dataframe of sa parameters
    """
    site_list = list(extract_sites(hf_path).index)
    APoEs, hazard_rp_list = extract_APoEs(hf_path)
    quantile_list = extract_quantiles(hf_path)
    vs30_list = VS30_LIST

    log.info("begin calculate_parameter_arrays")
    PGA, Sas, PSV, Tc = calculate_parameter_arrays(hf_path)

    acc_spectra, imtls = extract_spectra(hf_path)

    log.info("begin fit_Td_array for mean Tds")
    mean_Td = fit_Td_array(
        PGA, Sas, Tc, acc_spectra, imtls, site_list, vs30_list, hazard_rp_list
    )

    log.info("begin create_mean_sa_table")
    mean_df = create_mean_sa_table(
        PGA, Sas, PSV, Tc, mean_Td, site_list, vs30_list, hazard_rp_list
    )

    log.info("begin update_lower_bound_sa")
    df = update_lower_bound_sa(
        mean_df,
        PGA,
        Sas,
        Tc,
        PSV,
        acc_spectra,
        imtls,
        vs30_list,
        hazard_rp_list,
        quantile_list,
    )

    df = replace_relevant_locations(df)

    # if not lower_bound_flags:
    #     print('before', df)
    #     print()
    #     df = remove_lower_bound_metadata(df)
    #     print('after', df)

    df = set_coded_location_resolution(df)
    return df

fit_Td(spectrum: npt.NDArray, periods: npt.NDArray, pga: float, sas: float, tc: float) -> float

Fit the Td value to obtain the best fit over the response spectrum

Parameters:

Name Type Description Default
spectrum npt.NDArray

acceleration spectrum [g]

required
periods npt.NDArray

periods over which spectrum is defined [seconds]

required
pga float

peak ground acceleration [g]

required
sas float

short-period spectral acceleration [g]

required
tc float

spectral-acceleration-plateau corner period [seconds]

required

Returns:

Name Type Description
td float

spectral-velocity-plateau corner period [seconds]

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def fit_Td(
    spectrum: "npt.NDArray", periods: "npt.NDArray", pga: float, sas: float, tc: float
) -> float:
    """Fit the Td value to obtain the best fit over the response spectrum

    Args:
        spectrum: acceleration spectrum [g]
        periods:  periods over which spectrum is defined [seconds]
        pga: peak ground acceleration [g]
        sas: short-period spectral acceleration [g]
        tc: spectral-acceleration-plateau corner period [seconds]

    Returns:
        td: spectral-velocity-plateau corner period [seconds]
    """
    relevant_spectrum, relevant_periods = relevant_spectrum_domain(
        spectrum, periods, tc
    )

    # select period with the minimum error
    td_error = [
        Td_fit_error(td, relevant_periods, relevant_spectrum, pga, sas, tc)
        for td in relevant_periods
    ]
    td = relevant_periods[np.argmin(td_error)]

    return td

fit_Td_array(PGA: npt.NDArray, Sas: npt.NDArray, Tc: npt.NDArray, acc_spectra: npt.NDArray, imtls: dict, site_list: List[str], vs30_list: List[int], hazard_rp_list: List[int], i_stat: int = 0, sites_of_interest: Optional[List[str]] = None) -> npt.NDArray

Fit the Td values for all sites, site classes and APoE of interest

Parameters:

Name Type Description Default
PGA npt.NDArray

adjusted peak ground acceleration [g] (Eqn C3.14)

required
Sas npt.NDArray

short-period spectral acceleration [g] (90% of maximum spectral acceleration)

required
Tc

spectral-acceleration-plateau corner period [seconds]

required
acc_spectra npt.NDArray

acceleration spectra [g]

required
imtls dict

keys: intensity measures e.g., SA(1.0), values: list of intensity levels

required
site_list List[str]

sites included in acc_spectra

required
vs30_list List[int]

vs30s included in acc_spectra

required
hazard_rp_list List[int]

return periods included in acc_spectra

required
i_stat int

spectra index for stats in ['mean'] + quantiles

0
sites_of_interest Optional[List[str]]

subset of sites

None

Returns:

Name Type Description
Td npt.NDArray

spectral-velocity-plateau corner period [seconds]

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def fit_Td_array(
    PGA: "npt.NDArray",
    Sas: "npt.NDArray",
    Tc: "npt.NDArray",
    acc_spectra: "npt.NDArray",
    imtls: dict,
    site_list: List[str],
    vs30_list: List[int],
    hazard_rp_list: List[int],
    i_stat: int = 0,
    sites_of_interest: Optional[List[str]] = None,
) -> "npt.NDArray":
    """Fit the Td values for all sites, site classes and APoE of interest

    Args:
        PGA: adjusted peak ground acceleration [g] (Eqn C3.14)
        Sas: short-period spectral acceleration [g] (90% of maximum spectral acceleration)
        Tc : spectral-acceleration-plateau corner period [seconds]
        acc_spectra: acceleration spectra [g]
        imtls: keys: intensity measures e.g., SA(1.0), values: list of intensity levels
        site_list: sites included in acc_spectra
        vs30_list: vs30s included in acc_spectra
        hazard_rp_list: return periods included in acc_spectra
        i_stat: spectra index for stats in ['mean'] + quantiles
        sites_of_interest: subset of sites

    Returns:
        Td: spectral-velocity-plateau corner period [seconds]
    """
    if sites_of_interest is None:
        sites_of_interest = site_list

    interpolated_spectra, periods = interpolate_spectra(acc_spectra, imtls)
    n_vs30s, _, n_periods, n_apoes, n_stats = interpolated_spectra.shape
    n_sites = len(sites_of_interest)
    Td = np.zeros([n_vs30s, n_sites, n_apoes])
    # print(type(Td))

    # cycle through all hazard parameters
    count = 0
    expected_count = n_sites
    for i_site_int, site in enumerate(sites_of_interest):
        count += 1
        log.info(
            f"fit_Td_array progress: Site #{count} of {expected_count}. "
            f"Approx {(count / expected_count) * 100:.1f} % progress. "
        )

        for vs30 in vs30_list:
            for rp in hazard_rp_list:
                i_site = site_list.index(site)
                i_vs30 = vs30_list.index(vs30)
                i_rp = hazard_rp_list.index(rp)

                spectrum = interpolated_spectra[i_vs30, i_site, :, i_rp, i_stat]

                pga = PGA[i_vs30, i_site, i_rp, i_stat]
                sas = Sas[i_vs30, i_site, i_rp, i_stat]
                tc = Tc[i_vs30, i_site, i_rp, i_stat]

                Td[i_vs30, i_site_int, i_rp] = fit_Td(spectrum, periods, pga, sas, tc)

    return Td

interpolate_spectra(spectra: npt.NDArray, imtls: dict, dt: float = 0.1) -> Tuple[npt.NDArray, npt.NDArray]

Linearly interpolate spectra over the original domain, in increments of dt

Inputs

spectra: acceleration spectra imtls: keys: intensity measures e.g., SA(1.0), values: list of intensity levels dt: period increments at which to interpolate

Returns:

Name Type Description
new_spectra npt.NDArray

interpolated spectra

new_period_list npt.NDArray

periods at which the spectra are defined

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def interpolate_spectra(
    spectra: "npt.NDArray", imtls: dict, dt: float = 0.1
) -> Tuple["npt.NDArray", "npt.NDArray"]:
    """Linearly interpolate spectra over the original domain, in increments of dt

    Inputs:
        spectra: acceleration spectra
        imtls: keys: intensity measures e.g., SA(1.0), values: list of intensity levels
        dt: period increments at which to interpolate

    Returns:
        new_spectra: interpolated spectra
        new_period_list: periods at which the spectra are defined
    """
    n_vs30s, n_sites, n_periods, n_apoes, n_stats = spectra.shape

    period_list = np.array([period_from_imt(imt) for imt in imtls.keys()])
    new_period_list = np.arange(min(period_list), max(period_list) + dt, dt)

    new_spectra = np.zeros([n_vs30s, n_sites, len(new_period_list), n_apoes, n_stats])
    for i_vs30 in range(n_vs30s):
        for i_site in range(n_sites):
            for i_apoe in range(n_apoes):
                for i_stat in range(n_stats):
                    new_spectra[i_vs30, i_site, :, i_apoe, i_stat] = np.interp(
                        new_period_list,
                        period_list,
                        spectra[i_vs30, i_site, :, i_apoe, i_stat],
                    )

    return new_spectra, new_period_list

reduce_PGAs(PGA: npt.NDArray) -> npt.NDArray

Apply peak ground acceleration adjustments to all PGA values (Eq. C3.14)

Parameters:

Name Type Description Default
PGA npt.NDArray

peak ground acceleration

required

Returns:

Name Type Description
reduced_PGA npt.NDArray

adjusted peak ground acceleration

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def reduce_PGAs(PGA: "npt.NDArray") -> "npt.NDArray":
    """Apply peak ground acceleration adjustments to all PGA values (Eq. C3.14)

    Args:
        PGA: peak ground acceleration

    Returns:
        reduced_PGA: adjusted peak ground acceleration
    """
    n_vs30s, n_sites, n_rps, n_stats = PGA.shape
    reduced_PGA = PGA.copy()

    for sc in PGA_REDUCTIONS.keys():
        vs30 = int(SITE_CLASSES[sc].representative_vs30)
        i_vs30 = VS30_LIST.index(vs30)
        for i_site in range(n_sites):
            for i_rp in range(n_rps):
                for i_stat in range(n_stats):
                    pga = PGA[i_vs30, i_site, i_rp, i_stat]
                    reduced_PGA[i_vs30, i_site, i_rp, i_stat] = calc_reduced_PGA(
                        pga, sc
                    )

    return reduced_PGA

relevant_spectrum_domain(spectrum: npt.NDArray, periods: npt.NDArray, tc: float, inclusive: bool = False) -> Tuple[npt.NDArray, npt.NDArray]

Return the spectrum over the relevant domain for potential values of Td

Parameters:

Name Type Description Default
spectrum npt.NDArray

acceleration spectrum [g]

required
periods npt.NDArray

periods as which spectrum is defined [seconds]

required
tc float

spectral-acceleration-plateau corner period [seconds]

required
inclusive bool

if true, the range includes the tc+0.5 value (e.g. for Tc+0.5 = 0.75, 0.7 <= Td rather than 0.8 <= Td)

False

Returns:

Name Type Description
relevant_spectrum npt.NDArray

spectrum over the relevant periods

relevant_periods npt.NDArray

periods included in the relevant domain

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def relevant_spectrum_domain(
    spectrum: "npt.NDArray", periods: "npt.NDArray", tc: float, inclusive: bool = False
) -> Tuple["npt.NDArray", "npt.NDArray"]:
    """Return the spectrum over the relevant domain for potential values of Td

    Args:
        spectrum: acceleration spectrum [g]
        periods:  periods as which spectrum is defined [seconds]
        tc:       spectral-acceleration-plateau corner period [seconds]
        inclusive: if true, the range includes the tc+0.5 value
                   (e.g. for Tc+0.5 = 0.75, 0.7 <= Td rather than 0.8 <= Td)

    Returns:
        relevant_spectrum: spectrum over the relevant periods
        relevant_periods: periods included in the relevant domain
    """
    max_T = 6
    max_idx = np.searchsorted(periods, max_T, side="right")

    min_T = tc + 0.5
    min_idx = np.searchsorted(periods, min_T)
    if (sig_figs(periods[min_idx], 2) > min_T) & inclusive:
        periods[min_idx]
        min_idx -= 1

    relevant_periods = periods[min_idx:max_idx]
    relevant_spectrum = spectrum[min_idx:max_idx]

    return relevant_spectrum, relevant_periods

remove_irrelevant_location_replacements(site_list: List[str], location_replacements: dict[str, LocationReplacement]) -> dict[str, LocationReplacement]

Parameters:

Name Type Description Default
site_list List[str]

list of sites included in the sa tables

required
location_replacements dict[str, LocationReplacement]

list of the location replacements

required

Returns:

Name Type Description
location_replacements dict[str, LocationReplacement]

a new dictionary of replacements

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def remove_irrelevant_location_replacements(
    site_list: List[str], location_replacements: dict[str, LocationReplacement]
) -> dict[str, LocationReplacement]:
    """

    Args:
        site_list: list of sites included in the sa tables
        location_replacements: list of the location replacements

    Returns:
        location_replacements: a new dictionary of replacements

    """
    new_location_replacements = {}

    for key in location_replacements.keys():
        if key in site_list:
            replace_list = location_replacements[key].replaced_locations
            new_location_replacements[key] = LocationReplacement(
                key, [site for site in replace_list if site in site_list]
            )

    return new_location_replacements

replace_relevant_locations(df: pdt.DataFrame, print_locations: bool = False) -> pdt.DataFrame

replace parameters for locations that are tied to other locations

Parameters:

Name Type Description Default
df pdt.DataFrame

mutli-index dataframe including all sites, annual probabilities of exceedance, and site classes

required
print_locations bool

if True, print relevant locations before and after replacements

False

Returns:

Name Type Description
df pdt.DataFrame

mutli-index dataframe with replaced locations

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
def replace_relevant_locations(
    df: "pdt.DataFrame", print_locations: bool = False
) -> "pdt.DataFrame":
    """replace parameters for locations that are tied to other locations

    Args:
        df: mutli-index dataframe including all sites, annual probabilities of exceedance, and site classes
        print_locations: if True, print relevant locations before and after replacements

    Returns:
        df: mutli-index dataframe with replaced locations

    """
    site_list = list(df.index)
    location_replacements = remove_irrelevant_location_replacements(
        site_list, LOCATION_REPLACEMENTS
    )

    if print_locations:
        check_replaced_locations = []
        for location in location_replacements:
            check_replaced_locations.append(location)
            for replaced_location in location_replacements[location].replaced_locations:
                check_replaced_locations.append(replaced_location)
        print("\n\noriginal values for replaced locations:")
        print(df.loc[check_replaced_locations, :])

    for location in location_replacements:
        for replaced_location in location_replacements[location].replaced_locations:
            df.loc[replaced_location, :] = df.loc[location, :]

    if print_locations:
        print("\n\nnew values for replaced locations:")
        print(df.loc[check_replaced_locations, :])

    return df

sig_figs(x: Union[float, List[float], npt.NDArray], n: int) -> npt.NDArray

Rounds all values of x to n significant figures

Inputs

x: float value(s) n: number of significant digits

Returns:

Name Type Description
x_rounded npt.NDArray

x rounded to n digits

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def sig_figs(x: Union[float, List[float], "npt.NDArray"], n: int) -> "npt.NDArray":
    """Rounds all values of x to n significant figures

    Inputs:
        x: float value(s)
        n: number of significant digits

    Returns:
        x_rounded: x rounded to n digits
    """
    x = np.asarray(x)
    x_positive = np.where(np.isfinite(x) & (x != 0), np.abs(x), 10 ** (n - 1))
    mags = 10 ** (n - 1 - np.floor(np.log10(x_positive)))
    x_rounded: "npt.NDArray" = np.round(x * mags) / mags
    return x_rounded

uhs_value(period: Union[float, npt.NDArray], PGA: float, Sas: float, Tc: float, Td: float) -> float

Derive the spectral acceleration Sa(T) at a given period (T), based on the seismic demand parameters. Sa(T) equations come from TS Eq. 3.2-3.5

Parameters:

Name Type Description Default
period Union[float, npt.NDArray]

period, T, a which the spectral acceleration is calculated [seconds]

required
PGA float

peak ground acceleration [g]

required
Sas float

short-period spectral acceleration [g]

required
Tc float

spectral-acceleration-plateau corner period [seconds]

required
Td float

spectral-velocity-plateau corner period [seconds]

required
Todo

this works (i.e. it handles the code as it's called now) - but I think there's a better way to do this sort of thing whe your data is in numpy arrays see: https://stackoverflow.com/questions/42594695/ how-to-apply-a-function-map-values-of-each-element-in-a-2d-numpy-array-matrix

Returns:

Name Type Description
SaT float

spectral acceleration [g]

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def uhs_value(
    period: Union[float, "npt.NDArray"], PGA: float, Sas: float, Tc: float, Td: float
) -> float:
    """Derive the spectral acceleration Sa(T) at a given period (T), based on the seismic demand parameters.
    Sa(T) equations come from TS Eq. 3.2-3.5

    Args:
        period: period, T, a which the spectral acceleration is calculated [seconds]
        PGA: peak ground acceleration [g]
        Sas: short-period spectral acceleration [g]
        Tc: spectral-acceleration-plateau corner period [seconds]
        Td: spectral-velocity-plateau corner period [seconds]

    Todo:
        this works (i.e. it handles the code as it's called now) -  but
        I think there's a better way to do this sort of thing whe your data is in numpy arrays
        see: https://stackoverflow.com/questions/42594695/
        how-to-apply-a-function-map-values-of-each-element-in-a-2d-numpy-array-matrix

    Returns:
        SaT: spectral acceleration [g]
    """
    if isinstance(period, np.ndarray):
        assert len(period) == 1
        period = float(period[0])

    if period == 0:
        SaT = PGA
    elif period < 0.1:
        SaT = PGA + (Sas - PGA) * (period / 0.1)
    elif period < Tc:
        SaT = Sas
    elif period < Td:
        SaT = Sas * Tc / period
    else:
        SaT = Sas * Tc / period * (Td / period) ** 0.5

    return float(SaT)

update_lower_bound_sa(mean_df, PGA, Sas, Tc, PSV, acc_spectra, imtls, vs30_list, hazard_rp_list, quantile_list)

Source code in nzssdt_2023/data_creation/sa_parameter_generation.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def update_lower_bound_sa(
    mean_df,
    PGA,
    Sas,
    Tc,
    PSV,
    acc_spectra,
    imtls,
    vs30_list,
    hazard_rp_list,
    quantile_list,
):
    site_list = list(mean_df.index)
    index = site_list

    APoEs = [f"APoE: 1/{rp}" for rp in hazard_rp_list]
    site_class_list = [f"{SITE_CLASSES[sc].label}" for sc in SITE_CLASSES]
    parameters = ["PGA Floor", "Sas Floor", "PSV Floor", "Td Floor", "PSV adjustment"]
    columns = pd.MultiIndex.from_product([APoEs, site_class_list, parameters])
    df = pd.concat([mean_df, pd.DataFrame(index=index, columns=columns)], axis=1)

    controlling_site = LOWER_BOUND_PARAMETERS["controlling_site"]
    i_site = site_list.index(controlling_site)
    i_stat = 1 + quantile_list.index(
        float(LOWER_BOUND_PARAMETERS["controlling_percentile"])
    )
    log.debug(
        f"update_lower_bound_sa() controlling_site: {controlling_site};"
        f' controlling_percentile {LOWER_BOUND_PARAMETERS["controlling_percentile"]}'
    )
    lower_bound_Td = fit_Td_array(
        PGA,
        Sas,
        Tc,
        acc_spectra,
        imtls,
        site_list,
        vs30_list,
        hazard_rp_list,
        i_stat,
        [controlling_site],
    )

    log.info("update_lower_bound_sa() processesing site classes")
    for sc in SITE_CLASSES.keys():
        sc_label = SITE_CLASSES[sc].label
        vs30 = int(SITE_CLASSES[sc].representative_vs30)
        i_vs30 = vs30_list.index(vs30)

        log.info(f"site class {sc} label: {sc_label} vs30: {vs30}")

        for APoE, rp in zip(APoEs, hazard_rp_list):
            i_rp = hazard_rp_list.index(rp)

            # update the controlling site to use the qth %ile
            df.loc[controlling_site, (APoE, sc_label, "PGA")] = PGA[
                i_vs30, i_site, i_rp, i_stat
            ]
            df.loc[controlling_site, (APoE, sc_label, "Sas")] = Sas[
                i_vs30, i_site, i_rp, i_stat
            ]
            df.loc[controlling_site, (APoE, sc_label, "PSV")] = PSV[
                i_vs30, i_site, i_rp, i_stat
            ]
            df.loc[controlling_site, (APoE, sc_label, "Tc")] = Tc[
                i_vs30, i_site, i_rp, i_stat
            ]
            df.loc[controlling_site, (APoE, sc_label, "Td")] = lower_bound_Td[
                i_vs30, 0, i_rp
            ]

            # apply lower bound to all sites for PGA, Sas, and PSV
            df.loc[:, (APoE, sc_label, "PGA")] = np.maximum(
                df.loc[:, (APoE, sc_label, "PGA")],
                df.loc[controlling_site, (APoE, sc_label, "PGA")],
            )
            df.loc[:, (APoE, sc_label, "Sas")] = np.maximum(
                df.loc[:, (APoE, sc_label, "Sas")],
                df.loc[controlling_site, (APoE, sc_label, "Sas")],
            )
            df.loc[:, (APoE, sc_label, "PSV")] = np.maximum(
                df.loc[:, (APoE, sc_label, "PSV")],
                df.loc[controlling_site, (APoE, sc_label, "PSV")],
            )

            # record locations that were controlled by the lower bound
            df.loc[:, (APoE, sc_label, "PGA Floor")] = ~(
                df.loc[:, (APoE, sc_label, "PGA")]
                > df.loc[controlling_site, (APoE, sc_label, "PGA")]
            )
            df.loc[:, (APoE, sc_label, "Sas Floor")] = ~(
                df.loc[:, (APoE, sc_label, "Sas")]
                > df.loc[controlling_site, (APoE, sc_label, "Sas")]
            )
            df.loc[:, (APoE, sc_label, "PSV Floor")] = ~(
                df.loc[:, (APoE, sc_label, "PSV")]
                > df.loc[controlling_site, (APoE, sc_label, "PSV")]
            )

            # set new Tc values
            tc = (
                2
                * np.pi
                * df.loc[:, (APoE, sc_label, "PSV")]
                / (df.loc[:, (APoE, sc_label, "Sas")] * g)
            )
            df.loc[:, (APoE, sc_label, "Tc")] = sig_figs(tc, TC_N_SF)

            # infer new rounded PSV values from rounded Tcs
            psv = (
                df.loc[:, (APoE, sc_label, "Tc")]
                * df.loc[:, (APoE, sc_label, "Sas")]
                * g
            ) / (2 * np.pi)
            df.loc[:, (APoE, sc_label, "PSV")] = np.round(psv, PSV_N_DP)
            psv_original = df.loc[:, (APoE, sc_label, "PSV")]
            df.loc[:, (APoE, sc_label, "PSV adjustment")] = (
                df.loc[:, (APoE, sc_label, "PSV")] - psv_original
            )
            # log.info(f"site class {sc}, APoE: {APoE}, max PSV adjustment: "
            #    "{df.loc[:, (APoE, sc_label, 'PSV adjustment')]}")
            # log.info(df.loc[:, (APoE, sc_label, slice(None))])

            # set new Td if PSV is controlled by the lower bound
            df.loc[:, (APoE, sc_label, "Td Floor")] = False
            for site in site_list:
                if df.loc[site, (APoE, sc_label, "PSV Floor")]:
                    if (
                        df.loc[controlling_site, (APoE, sc_label, "Td")]
                        >= df.loc[site, (APoE, sc_label, "Td")]
                    ):
                        df.loc[site, (APoE, sc_label, "Td")] = df.loc[
                            controlling_site, (APoE, sc_label, "Td")
                        ]
                        df.loc[site, (APoE, sc_label, "Td Floor")] = True

    return df

dm_parameter_generation

This module compiles the magnitude and distances values for the parameter table.

TODO
  • extract_m_values uses/builds entire mean mag tables independent of what locations and PoEs are requested in function args.
  • Consolidate the mean mag csv files into one cache rather than 3 separate files. Any locations/poes not available can be looked up and added to the cache.

create_D_and_M_df(site_list: List[str], rp_list: List[int] = DEFAULT_RPS, no_cache: bool = False, legacy: bool = False) -> pdt.DataFrame

Compiles the D and M parameter tables

Parameters:

Name Type Description Default
site_list List[str]

list of sites of interest

required
rp_list

list of return periods of interest

DEFAULT_RPS
no_cache bool

if True, ignore the cache file

False
legacy bool

if True double rounds magnitudes to match original mean mags from v1 of the workflow.

False

Returns:

Name Type Description
D_and_M pdt.DataFrame

dataframe of the d and m tables

Source code in nzssdt_2023/data_creation/dm_parameter_generation.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def create_D_and_M_df(
    site_list: List[str],
    rp_list: List[int] = DEFAULT_RPS,
    no_cache: bool = False,
    legacy: bool = False,
) -> "pdt.DataFrame":
    """Compiles the D and M parameter tables

    Args:
        site_list: list of sites of interest
        rp_list    : list of return periods of interest
        no_cache: if True, ignore the cache file
        legacy: if True double rounds magnitudes to match original mean mags from v1 of the workflow.

    Returns:
        D_and_M: dataframe of the d and m tables
    """

    d_values_path = Path(WORKING_FOLDER, "D_values.json")
    if not d_values_path.exists():
        D_values = build_d_value_dataframe()
        D_values.to_json(d_values_path)
    else:
        D_values = pd.read_json(d_values_path)

    D_sites = [site for site in list(D_values.index) if site in site_list]

    APoEs = [f"APoE: 1/{rp}" for rp in rp_list]

    M_mean = extract_m_values(site_list, APoEs, AggregationEnum.MEAN, no_cache, legacy)
    M_p90 = extract_m_values(["Auckland"], APoEs, AggregationEnum._90, no_cache, legacy)

    D_and_M = pd.DataFrame(index=site_list, columns=["D"] + APoEs)

    # include D values
    D_and_M.loc[D_sites, "D"] = D_values.loc[D_sites, "D"]

    # include M values
    for APoE in APoEs:
        # M value is >= the 90th %ile values for Auckland
        D_and_M.loc[site_list, APoE] = np.maximum(
            M_mean.loc[site_list, APoE], M_p90.loc["Auckland", APoE]
        )

    D_and_M = replace_relevant_locations(D_and_M)
    D_and_M = set_coded_location_resolution(D_and_M)
    return D_and_M

extract_m_values(site_names: List[str], freqs: List[str], agg: AggregationEnum, no_cache: bool = False, legacy: bool = False) -> pdt.DataFrame

Extracts the magnitudes from the input .csv folder into a manageable dataframe

Parameters:

Name Type Description Default
site_names List[str]

names of sites of interest

required
frequencies

list of the frequencys (1/return period) of interest

required
agg AggregationEnum

the aggregate hazard curve at which to calculate mean magnitude (e.g., "mean", "0.9", ...)

required
no_cache bool

if True, ignore the cache file

False
legacy bool

if True double rounds magnitudes to match origional mean mags from v1 of the workflow.

False

Returns:

Name Type Description
mags pdt.DataFrame

mean magnitudes

The format of the frequencies entries is e.g. "APoE: 1/25"

If no_cache is False then the mean magnitudes will be looked up in a cache file. If not found there, they will be calculated and added to the cache. The cache file is in the WORKING_FOLDER named mags_{agg}.csv where {agg} is the hazard curve aggregate.

site names are location names or lat~lon codes e.g. "Pukekohe" or "-45.500~166.600"

Source code in nzssdt_2023/data_creation/dm_parameter_generation.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def extract_m_values(
    site_names: List[str],
    freqs: List[str],
    agg: AggregationEnum,
    no_cache: bool = False,
    legacy: bool = False,
) -> "pdt.DataFrame":
    """Extracts the magnitudes from the input .csv folder into a manageable dataframe

    Args:
        site_names: names of sites of interest
        frequencies: list of the frequencys (1/return period) of interest
        agg: the aggregate hazard curve at which to calculate mean magnitude (e.g., "mean", "0.9", ...)
        no_cache: if True, ignore the cache file
        legacy: if True double rounds magnitudes to match origional mean mags from v1 of the workflow.

    Returns:
        mags: mean magnitudes

    The format of the frequencies entries is e.g. "APoE: 1/25"

    If no_cache is False then the mean magnitudes will be looked up in a cache file. If not found
    there, they will be calculated and added to the cache. The cache file is in the WORKING_FOLDER
    named mags_{agg}.csv where {agg} is the hazard curve aggregate.

    site names are location names or lat~lon codes e.g. "Pukekohe" or "-45.500~166.600"
    """

    locations = [site_name_to_coded_location(name) for name in site_names]
    poes = [frequency_to_poe(freq) for freq in freqs]

    if no_cache:
        return get_mean_mag_df(DISAGG_HAZARD_ID, locations, poes, agg, legacy)

    cache_filepath = Path(WORKING_FOLDER) / f"mag_agg-{agg.name}.csv"
    if cache_filepath.exists():
        mags = read_mean_mag_df(cache_filepath)
    else:
        poes = [frequency_to_poe(freq) for freq in freqs]
        mags = empty_mean_mag_df(poes)
        mags = get_mean_mag_df(DISAGG_HAZARD_ID, locations, poes, agg, legacy)
        mags.to_csv(cache_filepath)
        return mags.loc[site_names, freqs]

    # fill in the missing values one at a time
    for site, freq in itertools.product(site_names, freqs):
        if (
            (site not in mags.index)
            or (freq not in mags.columns)
            or (pd.isnull(mags.loc[site, freq]))
        ):
            location = site_name_to_coded_location(site)
            poe = frequency_to_poe(freq)
            mag = get_mean_mag(DISAGG_HAZARD_ID, location, poe, agg, legacy)
            mags.loc[site, freq] = mag

    mags.to_csv(cache_filepath)
    return mags.loc[site_names, freqs]

mean_magnitudes

This module contains functions for extracting mean magnitudes from disaggregations and packaging into DataFrame objects.

DTYPE = float module-attribute

IMT = 'PGA' module-attribute

INV_TIME = 1.0 module-attribute

VS30 = 275 module-attribute

coded_locations_with_id = [CodedLocation(*lat_lon_from_id(_id), 0.001) for _id in LOCATION_LISTS['ALL']['locations']] module-attribute

df = get_mean_mag_df(hazard_id, locations, poes, hazard_agg) module-attribute

hazard_agg = model.AggregationEnum.MEAN module-attribute

hazard_id = 'NSHM_v1.0.4_mag' module-attribute

imts = ['PGA'] module-attribute

location_codes_with_id = [loc.code for loc in coded_locations_with_id] module-attribute

locations = [CodedLocation(*lat_lon_from_id(_id), 0.001) for _id in LOCATION_LISTS['SRWG214']['locations']] module-attribute

mean_mag_filepath = 'mean_mag.csv' module-attribute

poes = [model.ProbabilityEnum._2_PCT_IN_50YRS, model.ProbabilityEnum._5_PCT_IN_50YRS, model.ProbabilityEnum._10_PCT_IN_50YRS, model.ProbabilityEnum._18_PCT_IN_50YRS, model.ProbabilityEnum._39_PCT_IN_50YRS, model.ProbabilityEnum._63_PCT_IN_50YRS, model.ProbabilityEnum._86_PCT_IN_50YRS] module-attribute

vs30s = [275] module-attribute

calculate_mean_magnitude(disagg: npt.NDArray, bins: npt.NDArray)

Calculate the mean magnitude for a single site using the magnitude disaggregation.

Parameters:

Name Type Description Default
disaggregation

The probability contribution to hazard of the magnitude bins (in apoe).

required
bins npt.NDArray

The bin centers of the magnitude disaggregation.

required

Returns:

Type Description

the mean magnitude

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def calculate_mean_magnitude(disagg: "npt.NDArray", bins: "npt.NDArray"):
    """
    Calculate the mean magnitude for a single site using the magnitude disaggregation.

    Parameters:
        disaggregation: The probability contribution to hazard of the magnitude bins (in apoe).
        bins: The bin centers of the magnitude disaggregation.

    Returns:
        the mean magnitude
    """
    shape_bins = bins.shape
    shape_disagg = disagg.shape
    if (
        len(shape_bins) != 2
        or shape_bins[0] != 1
        or shape_bins[1] != len(disagg)
        or len(shape_disagg) != 1
    ):
        raise Exception(
            """Disaggregation does not have the correct shape.
            This function assumes that disaggregations are for magnitude only."""
        )

    disagg = prob_to_rate(disagg)

    return np.sum(disagg / np.sum(disagg) * bins[0])

empty_mean_mag_df(poes: List[model.ProbabilityEnum]) -> pd.DataFrame

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
213
214
215
216
217
def empty_mean_mag_df(poes: List[model.ProbabilityEnum]) -> pd.DataFrame:
    rp_strs = get_sorted_rp_strs(poes)
    return pd.DataFrame(
        index=pd.Series([], name="site_name"), columns=rp_strs, dtype=DTYPE
    )

frequency_to_poe(frequency: str) -> model.ProbabilityEnum

converts a frequency string (e.g., "APoE: 1/25") to a annual probability of exceedance

Parameters:

Name Type Description Default
frequency str

the frequency string

required

Returns:

Type Description
model.ProbabilityEnum

the annual probability of exceedance

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
167
168
169
170
171
172
173
174
175
176
177
178
179
def frequency_to_poe(frequency: str) -> model.ProbabilityEnum:
    """
    converts a frequency string (e.g., "APoE: 1/25") to a annual probability of exceedance

    Args:
        frequency: the frequency string

    Returns:
        the annual probability of exceedance
    """

    rp = int(frequency.split("/")[1])
    return RP_TO_POE[rp]

get_loc_id_and_name(location_code)

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
32
33
34
35
36
37
38
39
40
41
42
43
def get_loc_id_and_name(location_code):

    if location_code in location_codes_with_id:
        location_id = LOCATION_LISTS["ALL"]["locations"][
            location_codes_with_id.index(location_code)
        ]
        name = location_by_id(location_id)["name"]
    else:
        location_id = location_code
        name = location_code

    return location_id, name

get_mean_mag(hazard_id: str, location: CodedLocation, poe: model.ProbabilityEnum, hazard_agg: model.AggregationEnum, legacy: bool = False) -> float

Get a mean mantitudefor a single location and poe.

Parameters:

Name Type Description Default
hazard_id str

the toshi-hazard-post ID of the hazard model from which to get disaggregations.

required
location CodedLocation

the location at which to calculate mean magnitude.

required
poe model.ProbabilityEnum

the annual probability of exceedence at which to calculate mean magnitude.

required
hazard_agg model.AggregationEnum

the hazard aggregate (e.g. mean or a fractile) at which to calculate mean magnitude.

required
legacy bool

if True double rounds magnitude to match origional mean mag from v1 of the workflow.

False

Returns:

Type Description
float

the mean magnitue

The legacy calculation is necessary to match the origional mean magnitude dataframe becuase the orignal csv file had magnitudes rounded to 2 decimal places. When the final ouput is rounded to one decimal place, this results in rounding errors. For example:

>>> round(5.948071422587211, 1)
5.9
>>> round(round(5.948071422587211, 2), 1)
6.0

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def get_mean_mag(
    hazard_id: str,
    location: CodedLocation,
    poe: model.ProbabilityEnum,
    hazard_agg: model.AggregationEnum,
    legacy: bool = False,
) -> float:
    """
    Get a mean mantitudefor a single location and poe.

    Args:
        hazard_id: the toshi-hazard-post ID of the hazard model from which to get disaggregations.
        location: the location at which to calculate mean magnitude.
        poe: the annual probability of exceedence at which to calculate mean magnitude.
        hazard_agg: the hazard aggregate (e.g. mean or a fractile) at which to calculate mean magnitude.
        legacy: if True double rounds magnitude to match origional mean mag from v1 of the workflow.

    Returns:
        the mean magnitue

    The legacy calculation is necessary to match the origional mean magnitude dataframe becuase the orignal
    csv file had magnitudes rounded to 2 decimal places. When the final ouput is rounded to one decimal
    place, this results in rounding errors. For example:
    ```python
    >>> round(5.948071422587211, 1)
    5.9
    >>> round(round(5.948071422587211, 2), 1)
    6.0
    ```
    """

    disagg = next(
        get_mean_mags(hazard_id, [location], [VS30], [IMT], [poe], hazard_agg)
    )
    if legacy:
        return np.round(np.round(disagg["mag"], 2), 1)
    else:
        return np.round(disagg["mag"], 1)

get_mean_mag_df(hazard_id: str, locations: List[CodedLocation], poes: List[model.ProbabilityEnum], hazard_agg: model.AggregationEnum, legacy: bool = False) -> pd.DataFrame

Get the mean magnitude table for the requested locations and annual probabilities.

Parameters:

Name Type Description Default
hazard_id str

the toshi-hazard-post ID of the hazard model from which to get disaggregations.

required
locations List[CodedLocation]

the locations at which to calculate mean magnitudes.

required
poes List[model.ProbabilityEnum]

the annual probabilities of exceedences at which to calculate mean magnitudes.

required
hazard_agg model.AggregationEnum

the hazard aggregate (e.g. mean or a fractile) at which to calculate mean magnitudes.

required
legacy bool

if True double rounds magnitudes to match origional mean mags from v1 of the workflow.

False

Returns:

Type Description
pd.DataFrame

the mean magnitudes. The DataFrame index is the location name and the columns are frequencies.

The legacy calculation is necessary to match the original mean magnitude dataframe because the original csv file had magnitudes rounded to 2 decimal places. When the final output is rounded to one decimal place, this results in rounding errors. For example:

>>> round(5.948071422587211, 1)
5.9
>>> round(round(5.948071422587211, 2), 1)
6.0

NB: "APoE in the column name is a misnomer as they are approximate return frequencies not probabilities. Magnitudes are rounded to the nearest decimal. The rounding error introduced in the original workflow (incurred by rounding to the nearest 2 decimal places and then nearest 1) have been reproduced here to ensure output is stable.

The format of the output DataFrame is:

                APoE: 1/25 APoE: 1/50 APoE: 1/100 APoE: 1/250 APoE: 1/500 APoE: 1/1000 APoE: 1/2500
Kaitaia             5.7        5.8         5.8         5.9         6.0          6.0          6.1
Kerikeri            5.7        5.8         5.9         5.9         6.0          6.0          6.1
...                 ...        ...         ...         ...         ...          ...          ...
Bluff               6.7        6.8         6.9         7.0         7.0          7.1          7.1
Oban                6.7        6.8         6.9         7.0         7.1          7.2          7.3
Source code in nzssdt_2023/data_creation/mean_magnitudes.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_mean_mag_df(
    hazard_id: str,
    locations: List[CodedLocation],
    poes: List[model.ProbabilityEnum],
    hazard_agg: model.AggregationEnum,
    legacy: bool = False,
) -> pd.DataFrame:
    """
    Get the mean magnitude table for the requested locations and annual probabilities.

    Args:
        hazard_id: the toshi-hazard-post ID of the hazard model from which to get disaggregations.
        locations: the locations at which to calculate mean magnitudes.
        poes: the annual probabilities of exceedences at which to calculate mean magnitudes.
        hazard_agg: the hazard aggregate (e.g. mean or a fractile) at which to calculate mean magnitudes.
        legacy: if True double rounds magnitudes to match origional mean mags from v1 of the workflow.

    Returns:
        the mean magnitudes. The DataFrame index is the location name and the columns are frequencies.

    The legacy calculation is necessary to match the original mean magnitude dataframe because the original
    csv file had magnitudes rounded to 2 decimal places. When the final output is rounded to one decimal
    place, this results in rounding errors. For example:
    ```python
    >>> round(5.948071422587211, 1)
    5.9
    >>> round(round(5.948071422587211, 2), 1)
    6.0
    ```

    NB: "APoE in the column name is a misnomer as they are approximate return frequencies not probabilities.
    Magnitudes are rounded to the nearest decimal. The rounding error introduced in the original workflow
    (incurred by rounding to the nearest 2 decimal places and then nearest 1) have been reproduced
    here to ensure output is stable.

    The format of the output DataFrame is:

    ```
                    APoE: 1/25 APoE: 1/50 APoE: 1/100 APoE: 1/250 APoE: 1/500 APoE: 1/1000 APoE: 1/2500
    Kaitaia             5.7        5.8         5.8         5.9         6.0          6.0          6.1
    Kerikeri            5.7        5.8         5.9         5.9         6.0          6.0          6.1
    ...                 ...        ...         ...         ...         ...          ...          ...
    Bluff               6.7        6.8         6.9         7.0         7.0          7.1          7.1
    Oban                6.7        6.8         6.9         7.0         7.1          7.2          7.3
    ```
    """

    rp_strs = get_sorted_rp_strs(poes)
    site_names = [
        get_loc_id_and_name(loc.downsample(0.001).code)[1] for loc in locations
    ]
    df = pd.DataFrame(index=site_names, columns=rp_strs, dtype=DTYPE)
    for disagg in get_mean_mags(hazard_id, locations, [VS30], [IMT], poes, hazard_agg):
        rp = poe_to_rp(disagg["poe"])
        rp_str = rp_to_freqstr(rp)
        site_name = disagg["name"]
        if legacy:
            df.loc[site_name, rp_str] = np.round(np.round(disagg["mag"], 2), 1)
        else:
            df.loc[site_name, rp_str] = np.round(disagg["mag"], 1)

    df.index.name = "site_name"
    return df

get_mean_mags(hazard_id: str, locations: Iterable[CodedLocation], vs30s: Iterable[int], imts: Iterable[str], poes: Iterable[model.ProbabilityEnum], hazard_agg: model.AggregationEnum) -> Generator[Dict[str, Any], None, None]

The mean magnitudes for a collection of sites.

This function uses toshi-hazard-store to retrieve disaggregations from the database and therefore requires read access. It assumes all disaggregations are done for magnitude only (i.e., not distance, epsilon, etc.)

Parameters:

Name Type Description Default
hazard_id str

the hazard id of the model in the database.

required
locations Iterable[CodedLocation]

the site locations for which to obtain the mean magnitude.

required
vs30s Iterable[int]

the site vs30 values for which to obtain the mean magnitude.

required
imts Iterable[str]

the intensity measure types for which to obtain the mean magnitude.

required
poes Iterable[model.ProbabilityEnum]

the probability of exeedances for which to obtain the mean magnitude.

required
aggregate

the hazard curve aggregation at which to obtain the mean magnitude (e.g. mean, 20th percentile, etc.)

required

Yields:

Type Description
Dict[str, Any]

A dict for each location, vs30, imt, poe, aggregate with keys: location_id: str, the locaiton id from nzshm-common if the location has one name: str, the location name from nzshm-common if the location has one lat: float, the latitude lon: float, the longitude vs30: int, the vs30 poe: model.ProbabilityEnum, the probability of exeedance imt: str, the intensity measure type imtl: float, the intensity level of the hazard curve at the poe of interest mag: float, the mean magnitude

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_mean_mags(
    hazard_id: str,
    locations: Iterable[CodedLocation],
    vs30s: Iterable[int],
    imts: Iterable[str],
    poes: Iterable[model.ProbabilityEnum],
    hazard_agg: model.AggregationEnum,
) -> Generator[Dict[str, Any], None, None]:
    """
    The mean magnitudes for a collection of sites.

    This function uses `toshi-hazard-store` to retrieve disaggregations from the database and therefore requires read
    access. It assumes all disaggregations are done for magnitude only (i.e., not distance, epsilon, etc.)

    Parameters:
        hazard_id: the hazard id of the model in the database.
        locations: the site locations for which to obtain the mean magnitude.
        vs30s: the site vs30 values for which to obtain the mean magnitude.
        imts: the intensity measure types for which to obtain the mean magnitude.
        poes: the probability of exeedances for which to obtain the mean magnitude.
        aggregate: the hazard curve aggregation at which to obtain the mean magnitude (e.g. mean, 20th percentile, etc.)

    Yields:
        A dict for each location, vs30, imt, poe, aggregate with keys:
            location_id: str, the locaiton id from `nzshm-common` if the location has one
            name: str, the location name from `nzshm-common` if the location has one
            lat: float, the latitude
            lon: float, the longitude
            vs30: int, the vs30
            poe: model.ProbabilityEnum, the probability of exeedance
            imt: str, the intensity measure type
            imtl: float, the intensity level of the hazard curve at the poe of interest
            mag: float, the mean magnitude
    """

    clocs = [loc.code for loc in locations]
    disaggs = query.get_disagg_aggregates(
        hazard_model_ids=[hazard_id],
        disagg_aggs=[model.AggregationEnum.MEAN],
        hazard_aggs=[hazard_agg],
        locs=clocs,
        vs30s=vs30s,
        imts=imts,
        probabilities=poes,
    )
    for disagg in disaggs:
        mean_mag = calculate_mean_magnitude(disagg.disaggs, disagg.bins)
        location_id, name = get_loc_id_and_name(disagg.nloc_001)

        d = dict(
            location_id=location_id,
            name=name,
            lat=disagg.lat,
            lon=disagg.lon,
            vs30=disagg.vs30,
            poe=disagg.probability,
            imt=disagg.imt,
            imtl=disagg.shaking_level,
            mag=mean_mag,
        )
        yield d

get_sorted_rp_strs(poes: List[model.ProbabilityEnum]) -> List[str]

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
207
208
209
210
def get_sorted_rp_strs(poes: List[model.ProbabilityEnum]) -> List[str]:
    return_periods = np.array([poe_to_rp(poe) for poe in poes])
    return_periods = np.sort(return_periods)
    return [rp_to_freqstr(rp) for rp in return_periods]

poe_to_rp(apoe: model.ProbabilityEnum) -> int

Converts annual probability to a rounded return period. The return periods are "conventional" return periods used by the hazard and risk community that are roughly equivalent to the (more exact) annual probabilities.

Parameters:

Name Type Description Default
apoe model.ProbabilityEnum

annual probability of exceedance

required

Returns:

Type Description
int

the approximate return period rounded to the nearest "conventional" number.

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def poe_to_rp(apoe: model.ProbabilityEnum) -> int:
    """
    Converts annual probability to a rounded return period. The return periods are "conventional"
    return periods used by the hazard and risk community that are roughly equivalent to the
    (more exact) annual probabilities.

    Args:
        apoe: annual probability of exceedance

    Returns:
        the approximate return period rounded to the nearest "conventional" number.
    """

    return POE_TO_RP[apoe]

prob_to_rate(prob: npt.NDArray) -> npt.NDArray

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
71
72
def prob_to_rate(prob: "npt.NDArray") -> "npt.NDArray":
    return -np.log(1.0 - prob) / INV_TIME

read_mean_mag_df(filepath: Union[Path, str]) -> pd.DataFrame

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
198
199
200
def read_mean_mag_df(filepath: Union[Path, str]) -> pd.DataFrame:
    df = pd.read_csv(Path(filepath), index_col=["site_name"])
    return df.astype(DTYPE)

rp_to_freqstr(rp: int)

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
203
204
def rp_to_freqstr(rp: int):
    return f"APoE: 1/{rp}"

site_name_to_coded_location(site_name: str) -> CodedLocation

get the coded location from the site name

Parameters:

Name Type Description Default
site_name str

the site name

required

Returns:

Type Description
CodedLocation

the coded location

site names are e.g. "Pukekohe", "Tairua", "-45.600~166.600", or "-45.500~166.600"

Source code in nzssdt_2023/data_creation/mean_magnitudes.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def site_name_to_coded_location(site_name: str) -> CodedLocation:
    """get the coded location from the site name

    Args:
        site_name: the site name

    Returns:
        the coded location

    site names are e.g. "Pukekohe", "Tairua", "-45.600~166.600", or "-45.500~166.600"
    """

    if not ("~" in site_name or site_name in ALL_SITES):
        raise ValueError(
            "site_name must be in list of site names or take form `lat~lon`"
        )

    if "~" in site_name:
        lat, lon = map(float, site_name.split("~"))
        return CodedLocation(lat, lon, resolution=0.001)

    loc = ALL_SITES[site_name]
    return CodedLocation(loc["latitude"], loc["longitude"], resolution=0.001)

gis_data

This module creates .geojson version of all gis data and calculates D (distance) values between polygons and faults

log = logging.getLogger(__name__) module-attribute

build_d_value_dataframe() -> pdt.DataFrame

Calculates the distance from faults to each named location and grid point

The number of sites considered is always the full amount, regardless of whether the site_list has been reduced for the rest of the table generation

Returns:

Source code in nzssdt_2023/data_creation/gis_data.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def build_d_value_dataframe() -> "pdt.DataFrame":
    """Calculates the distance from faults to each named location and grid point

    The number of sites considered is always the full amount, regardless of whether
    the site_list has been reduced for the rest of the table generation

    Returns:


    """
    log.info("build_d_value_dataframe() processesing distance to fault for each site")

    faults, polygons = create_fault_and_polygon_gpds()

    D_polygons = calc_distance_to_faults(polygons, faults)

    grid = create_grid_gpd()
    D_grid = calc_distance_to_faults(grid, faults)

    D_values = pd.concat([D_polygons, D_grid])
    D_values.index.name = "Location"

    return D_values

calc_distance_to_faults(gdf: gpdt.DataFrame, faults: gpdt.DataFrame) -> pdt.DataFrame

Calculates the closest distance of polygons or points to a set of fault lines

Parameters:

Name Type Description Default
gdf gpdt.DataFrame

geodataframe of locations (polygons or points)

required
faults gpdt.DataFrame

geodataframe of fault lines

required

Returns:

Name Type Description
df pdt.DataFrame

dataframe of distance from each location to the closest fault

Source code in nzssdt_2023/data_creation/gis_data.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def calc_distance_to_faults(
    gdf: "gpdt.DataFrame", faults: "gpdt.DataFrame"
) -> "pdt.DataFrame":
    """Calculates the closest distance of polygons or points to a set of fault lines

    Args:
        gdf: geodataframe of locations (polygons or points)
        faults: geodataframe of fault lines

    Returns:
        df: dataframe of distance from each location to the closest fault
    """
    meter_epsg = 2193
    faults.to_crs(epsg=meter_epsg, inplace=True)
    gdf.to_crs(epsg=meter_epsg, inplace=True)

    gdf["distance"] = round(
        gdf.geometry.apply(lambda x: faults.distance(x).min()) / 1000.0
    )

    gdf["D"] = gdf["distance"].astype("int")
    gdf.loc[gdf["D"] >= 20, "D"] = None

    wgs_epsg = 4326
    gdf.to_crs(epsg=wgs_epsg, inplace=True)

    gdf.index.names = [""]

    return gdf[["D"]]

cleanup_polygon_gpd(polygon_path) -> gpdt.DataFrame

Removes extra columns from input polygon file

Parameters:

Name Type Description Default
polygon_path

path to the original polygon file

required

Returns:

Name Type Description
gdf gpdt.DataFrame

geodataframe with only relevant columns

Source code in nzssdt_2023/data_creation/gis_data.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def cleanup_polygon_gpd(polygon_path) -> "gpdt.DataFrame":
    """Removes extra columns from input polygon file

    Args:
        polygon_path: path to the original polygon file

    Returns:
        gdf: geodataframe with only relevant columns
    """

    # read original file
    gdf = gpd.read_file(polygon_path).set_index("UR2022_V_2")[["geometry"]]
    gdf["Name"] = gdf.index
    gdf.set_index("Name", inplace=True)

    # sort the order
    polygon_list = polygon_location_list()
    gdf = gdf.loc[polygon_list, :]

    # convert to WGS84
    wgs_epsg = 4326
    gdf = gdf.to_crs(epsg=wgs_epsg)

    return gdf

create_fault_and_polygon_gpds() -> Tuple[gpdt.DataFrame, gpdt.DataFrame]

Creates the two geodataframes for resource output

Returns:

Name Type Description
faults gpdt.DataFrame

geodataframe of major faults

polygons gpdt.DataFrame

geodataframe of urban area polygons

Source code in nzssdt_2023/data_creation/gis_data.py
160
161
162
163
164
165
166
167
168
169
170
171
def create_fault_and_polygon_gpds() -> Tuple["gpdt.DataFrame", "gpdt.DataFrame"]:
    """Creates the two geodataframes for resource output

    Returns:
        faults: geodataframe of major faults
        polygons: geodataframe of urban area polygons
    """

    polygons = cleanup_polygon_gpd(POLYGON_PATH)
    faults = filter_cfm_by_sliprate(CFM_URL)

    return faults, polygons

create_geojson_files(polygons_path: Union[str | Path], faults_path: Union[str | Path], grid_path: Union[str | Path], override: bool = False)

Create the .geojsons for the version resources

Parameters:

Name Type Description Default
polygons_path Union[str | Path]

path to polygon .geojson

required
faults_path Union[str | Path]

path to faults .geojson

required
grid_path Union[str | Path]

path to grid points .geojson

required
override bool

if True, rewrite all files

False
Source code in nzssdt_2023/data_creation/gis_data.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def create_geojson_files(
    polygons_path: Union[str | Path],
    faults_path: Union[str | Path],
    grid_path: Union[str | Path],
    override: bool = False,
):
    """Create the .geojsons for the version resources

    Args:
        polygons_path: path to polygon .geojson
        faults_path: path to faults .geojson
        grid_path: path to grid points .geojson
        override: if True, rewrite all files

    """

    if (
        override
        | (not Path(polygons_path).exists())
        | (not Path(faults_path).exists())
        | (not Path(grid_path).exists())
    ):

        faults, polygons = create_fault_and_polygon_gpds()

        save_gdf_to_geojson(faults, faults_path)
        save_gdf_to_geojson(polygons, polygons_path, include_idx=True)

        grid = create_grid_gpd()
        save_gdf_to_geojson(grid, grid_path, include_idx=True)

    if override:
        d_values_path = Path(WORKING_FOLDER, "D_values.json")
        if d_values_path.exists():
            os.remove(d_values_path)

create_grid_gpd() -> gpdt.DataFrame

Creates one geodataframe for resource output

Returns:

Name Type Description
grid gpdt.DataFrame

geodataframe of lat/lon grid points

Source code in nzssdt_2023/data_creation/gis_data.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def create_grid_gpd() -> "gpdt.DataFrame":
    """Creates one geodataframe for resource output

    Returns:
        grid: geodataframe of lat/lon grid points
    """

    grid_df = create_sites_df(named_sites=False)
    grid_df = set_coded_location_resolution(grid_df)
    grid_df.index.name = "Name"

    grid = gpd.GeoDataFrame(
        geometry=gpd.points_from_xy(grid_df.lon, grid_df.lat, crs="EPSG:4326"),
        data=grid_df,
    )

    return grid[["geometry"]]

filter_cfm_by_sliprate(cfm_url, slip_rate: float = 5.0) -> gpdt.DataFrame

Filters the original Community Fault Model (CFM) .shp file

The faults are filtered by the (Slip Rate Preferred >=5 mmyr) criterion.

Parameters:

Name Type Description Default
slip_rate float

slip rate for filter criterion, >= slip_rate

5.0

Returns:

Name Type Description
gdf gpdt.DataFrame

geodataframe of filtered faults

Source code in nzssdt_2023/data_creation/gis_data.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def filter_cfm_by_sliprate(cfm_url, slip_rate: float = 5.0) -> "gpdt.DataFrame":
    """Filters the original Community Fault Model (CFM) .shp file

    The faults are filtered by the (Slip Rate Preferred >=5 mmyr) criterion.

    Args:
        slip_rate: slip rate for filter criterion, >= slip_rate

    Returns:
        gdf: geodataframe of filtered faults
    """

    gdf = gpd.read_file(cfm_url)

    idx = gdf["SR_pref"] >= slip_rate
    gdf = gdf[idx].sort_values("Name").reset_index()
    gdf.drop("index", axis=1, inplace=True)

    wgs_epsg = 4326
    gdf = gdf.to_crs(epsg=wgs_epsg)

    gdf["Slip rate preferred value"] = gdf["SR_pref"]
    gdf["Slip rate filter"] = f"≥{slip_rate} mm/yr"
    gdf[
        "Source for linework and slip rate assessment"
    ] = "NZ CFM v1.0 (Seebeck et al. 2022, 2023)"
    gdf = gdf[
        [
            "Name",
            "Slip rate preferred value",
            "Slip rate filter",
            "Source for linework and slip rate assessment",
            "geometry",
        ]
    ]

    return gdf

polygon_location_list() -> List[str]

Returns the urban and rural settlement names, excluding those that do not have their own polygon

Returns:

Name Type Description
polygon_list List[str]

ordered list of polygon names

Source code in nzssdt_2023/data_creation/gis_data.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def polygon_location_list() -> List[str]:
    """Returns the urban and rural settlement names, excluding those that do not have their own polygon

    Returns:
        polygon_list: ordered list of polygon names
    """
    ts_urban_locations_list = list(create_sites_df().index)

    replaced_locations = []
    for location in LOCATION_REPLACEMENTS:
        for replaced_location in LOCATION_REPLACEMENTS[location].replaced_locations:
            replaced_locations.append(replaced_location)

    polygon_list = [
        loc for loc in ts_urban_locations_list if loc not in replaced_locations
    ]

    return polygon_list

save_gdf_to_geojson(gdf: gpdt.DataFrame, path, include_idx=False)

Saves a geodataframe to a .geojson file

TODO: set up typing for the path input

Parameters:

Name Type Description Default
gdf gpdt.DataFrame

geodataframe to save as .geosjon

required
path

path of new geojson

required
include_idx

False drops the index from the gpd (e.g. if index is just a range, 0 -> n)

False
Source code in nzssdt_2023/data_creation/gis_data.py
30
31
32
33
34
35
36
37
38
39
40
41
def save_gdf_to_geojson(gdf: "gpdt.DataFrame", path, include_idx=False):
    """Saves a geodataframe to a .geojson file

    TODO: set up typing for the path input

    Args:
        gdf: geodataframe to save as .geosjon
        path: path of new geojson
        include_idx: False drops the index from the gpd (e.g. if index is just a range, 0 -> n)
    """

    gdf.to_file(path, driver="GeoJSON", index=include_idx)

util

utility functions

set_coded_location_resolution(dataframe_with_location: pd.DataFrame)

Sets the dataframe index to coded_locations with res 0.1

Source code in nzssdt_2023/data_creation/util.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def set_coded_location_resolution(dataframe_with_location: pd.DataFrame):
    """Sets the dataframe index to coded_locations with res 0.1"""

    def replace_coded_location(loc):
        """user function (ufunc) to update coded location values"""
        if "~" not in loc:
            return loc
        lat, lon = map(float, loc.split("~"))
        return CodedLocation(lat, lon, resolution=0.1).code

    loc_series = dataframe_with_location.index.to_series().apply(replace_coded_location)

    return dataframe_with_location.set_index(loc_series)