Skip to content

data

Functions for loading realizations and saving aggregations.

get_batch_table(dataset, compatibility_key, sources_digests, gmms_digests, vs30, nloc_0, imts)

Get the realization datatable for a batch of aggregation jobs.

Filtering is done for comatibility key, branch digests, vs30, nloc_0, and (multiple) imts.

Parameters:

Name Type Description Default
dataset Dataset

the realization dataset.

required
compatibility_key str

the toshi-hazard-store compatibility key.

required
sources_digests list[str]

the digests of the source branches.

required
gmms_digests list[str]

the digests of the gmcm branches.

required
vs30 int

the vs30 of the sites.

required
nloc_0 str

the nloc_0 (1.0 degree location code).

required
imts list[str]

the intensity measure types.

required

Returns:

Type Description
Table

The filtered datatable.

Source code in toshi_hazard_post/data.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_batch_table(
    dataset: ds.Dataset,
    compatibility_key: str,
    sources_digests: list[str],
    gmms_digests: list[str],
    vs30: int,
    nloc_0: str,
    imts: list[str],
) -> pa.Table:
    """Get the realization datatable for a batch of aggregation jobs.

    Filtering is done for comatibility key, branch digests, vs30, nloc_0, and (multiple) imts.

    Args:
        dataset: the realization dataset.
        compatibility_key: the toshi-hazard-store compatibility key.
        sources_digests: the digests of the source branches.
        gmms_digests: the digests of the gmcm branches.
        vs30: the vs30 of the sites.
        nloc_0: the nloc_0 (1.0 degree location code).
        imts: the intensity measure types.

    Returns:
        The filtered datatable.
    """
    t0 = time.perf_counter()
    columns = ['nloc_001', 'imt', 'sources_digest', 'gmms_digest', 'values']
    flt = (
        (pc.field('compatible_calc_id') == pc.scalar(compatibility_key))
        & (pc.is_in(pc.field('sources_digest'), pa.array(sources_digests)))
        & (pc.is_in(pc.field('gmms_digest'), pa.array(gmms_digests)))
        & (pc.is_in(pc.field('imt'), pa.array(imts)))
    )

    # if we used the partitioning when fetching the dataset then vs30 and nloc_0 will not be in the
    # schema (we will have already implicitly filtered on them)
    dataset_columns = dataset.schema.names
    if 'vs30' in dataset_columns:
        flt = flt & (pc.field('vs30') == pc.scalar(vs30))
    if 'nloc_0' in dataset_columns:
        flt = flt & (pc.field('nloc_0') == pc.scalar(nloc_0))

    batch_datatable = dataset.to_table(columns=columns, filter=flt)
    t1 = time.perf_counter()
    log.debug("time to create batch table: %0.1f seconds" % (t1 - t0))
    return batch_datatable

get_job_datatable(batch_datatable, location, imt, n_expected)

Get the realization datatable for a specific aggregation job (one IMT, location, etc.).

The batch_datatable is expected to be produced by get_batch_table which will have applied broader filters on the dataset for vs30, nloc_0, etc.

Parameters:

Name Type Description Default
batch_datatable Table

the pre-filtered datatable to be further filtered for a specific aggregation job.

required
location CodedLocation

the location of the site.

required
imt str

the intensity measure type.

required
n_expected int

the number of records expected (typically the number of branches).

required

Returns:

Type Description
Table

The filtered data table.

Source code in toshi_hazard_post/data.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_job_datatable(
    batch_datatable: pa.Table,
    location: 'CodedLocation',
    imt: str,
    n_expected: int,
) -> pa.Table:
    """Get the realization datatable for a specific aggregation job (one IMT, location, etc.).

    The batch_datatable is expected to be produced by get_batch_table which will have applied broader
    filters on the dataset for vs30, nloc_0, etc.

    Args:
        batch_datatable: the pre-filtered datatable to be further filtered for a specific aggregation job.
        location: the location of the site.
        imt: the intensity measure type.
        n_expected: the number of records expected (typically the number of branches).

    Returns:
        The filtered data table.
    """
    t0 = time.perf_counter()
    table = batch_datatable.filter((pc.field("imt") == imt) & (pc.field("nloc_001") == location.downsample(0.001).code))
    table = pa.table(
        {
            "sources_digest": table['sources_digest'].to_pylist(),
            "gmms_digest": table['gmms_digest'].to_pylist(),
            "values": table['values'],
        }
    )

    if len(table) == 0:
        raise KeyError(f"no records found for location: {location}, imt: {imt}")
    if len(table) != n_expected:
        msg = (
            f"incorrect number of records found for location: "
            f"{location}, imt: {imt}. Expected {n_expected}, got {len(table)}"
        )
        raise KeyError(msg)

    t1 = time.perf_counter()
    log.debug("time to create job table: %0.5f seconds" % (t1 - t0))
    return table

get_realizations_dataset(vs30=None, nloc_0=None, rlz_dir=None)

Get a pyarrow Dataset for realizations.

Optional parameters take advantage of partitioning of dataset for faster retrieval. The partitioning is assumed to be vs30/nloc_0. See toshi-hazard-store documentation for details.

Parameters:

Name Type Description Default
vs30 Optional[int]

the site vs30

None
nloc_0 Optional[str]

the 1 degree grid location (e.g. '-41.0~175.0')

None
rlz_dir Optional[str | Path]

location of realization dataset. If not passed, function will use env var.

None

Returns:

Name Type Description
dataset Dataset

the relization dataset

Source code in toshi_hazard_post/data.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_realizations_dataset(
    vs30: Optional[int] = None, nloc_0: Optional[str] = None, rlz_dir: Optional[str | Path] = None
) -> ds.Dataset:
    """Get a pyarrow Dataset for realizations.

    Optional parameters take advantage of partitioning of dataset for faster retrieval. The partitioning is
    assumed to be vs30/nloc_0. See toshi-hazard-store documentation for details.

    Args:
        vs30: the site vs30
        nloc_0: the 1 degree grid location (e.g. '-41.0~175.0')
        rlz_dir: location of realization dataset. If not passed, function will use env var.

    Returns:
        dataset: the relization dataset
    """
    rlz_dir_tmp = str(RLZ_DIR) if rlz_dir is None else str(rlz_dir)
    if vs30 is not None:
        rlz_dir_tmp += f"/vs30={vs30}"
        if nloc_0 is not None:
            rlz_dir_tmp += f"/nloc_0={nloc_0}"
    rlz_dir, filesystem = pyarrow_dataset.configure_output(rlz_dir_tmp)

    t0 = time.monotonic()
    dataset = ds.dataset(rlz_dir, format='parquet', filesystem=filesystem, partitioning='hive')
    t1 = time.monotonic()
    log.debug("time to get realizations dataset %0.6f" % (t1 - t0))

    return dataset

save_aggregations(hazard, location, vs30, imt, agg_types, hazard_model_id, compatibility_key)

Save the aggregated hazard to the database.

Converts hazard as rates to proabilities before saving.

Parameters:

Name Type Description Default
hazard NDArray

the aggregate hazard rates (not proabilities)

required
location CodedLocation

the site location

required
vs30 int

the site vs30

required
imt str

the intensity measure type (e.g. "PGA", "SA(1.5)")

required
agg_types list[str]

the statistical aggregate types (e.g. "mean", "0.5")

required
hazard_model_id str

the model id for storing in the database

required
compatibility_key str

the toshi-hazard-store compatibility key.

required
Source code in toshi_hazard_post/data.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def save_aggregations(
    hazard: 'npt.NDArray',
    location: 'CodedLocation',
    vs30: int,
    imt: str,
    agg_types: list[str],
    hazard_model_id: str,
    compatibility_key: str,
) -> None:
    """Save the aggregated hazard to the database.

    Converts hazard as rates to proabilities before saving.

    Args:
        hazard: the aggregate hazard rates (not proabilities)
        location: the site location
        vs30: the site vs30
        imt: the intensity measure type (e.g. "PGA", "SA(1.5)")
        agg_types: the statistical aggregate types (e.g. "mean", "0.5")
        hazard_model_id: the model id for storing in the database
        compatibility_key: the toshi-hazard-store compatibility key.
    """

    def generate_models():
        for i, agg in enumerate(agg_types):
            yield HazardAggregateCurve(
                compatible_calc_id=compatibility_key,
                hazard_model_id=hazard_model_id,
                nloc_001=location.code,
                nloc_0=location.downsample(1.0).code,
                imt=imt,
                vs30=vs30,
                aggr=agg,
                values=hazard[i, :],
            )

    agg_dir, filesystem = pyarrow_dataset.configure_output(AGG_DIR)
    partitioning = ['vs30', 'imt', 'nloc_001']
    pyarrow_aggr_dataset.append_models_to_dataset(
        models=generate_models(), base_dir=agg_dir, filesystem=filesystem, partitioning=partitioning
    )