Skip to content

aggregation_calc

Primary functions for calculating an aggregation for a single, site, IMT, etc.

AggSharedArgs dataclass

A class to store arguments shared by multiple aggregation jobs (used for parallelization).

Attribues
Source code in toshi_hazard_post/aggregation_calc.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclass
class AggSharedArgs:
    """A class to store arguments shared by multiple aggregation jobs (used for parallelization).

    Attribues:
        agg_types: the types of aggregation to perform (e.g. 'mean', '0.9', etc.).
        compatibility_key: the toshi-hazard-store compatibility key.
        hazard_model_id: the name of the model to use when storing the result.
        weights_shape: the shape of the weights array.
        branch_hash_table_shape: the shape of the branch hash table array.
        skip_save: set to True if skipping saving the aggregations. Used when debugging to avoid writing to a database.
    """

    agg_types: list[str]
    compatibility_key: str
    hazard_model_id: str
    weights_shape: tuple[int, ...]
    branch_hash_table_shape: tuple[int, ...]
    skip_save: bool

AggTaskArgs dataclass

The arguments for a specific aggregation task.

A aggregation task is for a single location, vs30, imt, etc.

Attributes:

Name Type Description
location CodedLocation

the site locaiton.

vs30 int

the site vs30.

imt str

the intensity measure type.

table_filepath Path

the location of the realization data ORC format file.

Source code in toshi_hazard_post/aggregation_calc.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class AggTaskArgs:
    """The arguments for a specific aggregation task.

    A aggregation task is for a single location, vs30, imt, etc.

    Attributes:
        location: the site locaiton.
        vs30: the site vs30.
        imt: the intensity measure type.
        table_filepath: the location of the realization data ORC format file.
    """

    location: 'CodedLocation'
    vs30: int
    imt: str
    table_filepath: Path

build_branch_rates(branch_hash_table, component_rates)

Calculate the rate for the composite branches in the logic tree.

Parameters:

Name Type Description Default
branch_hash_table NDArray

composite branches represented as a list of hashes of the component branches

required
component_rates Dict[str, NDArray]

component realization rates keyed by component branch hash

required

Returns:

Type Description
NDArray

The rates array with shape (n branches, n IMTL)

Source code in toshi_hazard_post/aggregation_calc.py
193
194
195
196
197
198
199
200
201
202
203
204
def build_branch_rates(branch_hash_table: 'npt.NDArray', component_rates: Dict[str, 'npt.NDArray']) -> 'npt.NDArray':
    """Calculate the rate for the composite branches in the logic tree.

    Args:
        branch_hash_table: composite branches represented as a list of hashes of the component branches
        component_rates: component realization rates keyed by component branch hash

    Returns:
        The rates array with shape (n branches, n IMTL)
    """
    nimtl = len(next(iter(component_rates.values())))
    return np.array([calc_composite_rates(branch, component_rates, nimtl) for branch in branch_hash_table])

calc_aggregation(task_args, shared_args)

Calculate hazard aggregation for a single site and imt and save result.

Parameters:

Name Type Description Default
task_args AggTaskArgs

The arguments fot the specific aggregation calculation.

required
shared_args AggSharedArgs

The arguments shared among all workers.

required
Source code in toshi_hazard_post/aggregation_calc.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def calc_aggregation(task_args: AggTaskArgs, shared_args: AggSharedArgs) -> None:
    """Calculate hazard aggregation for a single site and imt and save result.

    Args:
        task_args: The arguments fot the specific aggregation calculation.
        shared_args: The arguments shared among all workers.
    """
    time0 = time.perf_counter()
    worker_name = os.getpid()

    location = task_args.location
    vs30 = task_args.vs30
    imt = task_args.imt

    agg_types = shared_args.agg_types
    compatibility_key = shared_args.compatibility_key
    hazard_model_id = shared_args.hazard_model_id

    branch_hash_table_shm = shared_memory.SharedMemory(name=constants.BRANCH_HASH_TABLE_SHM_NAME)
    branch_hash_table: 'npt.NDArray' = np.ndarray(
        shared_args.branch_hash_table_shape, dtype='<U24', buffer=branch_hash_table_shm.buf
    )

    weights_shm = shared_memory.SharedMemory(name=constants.WEIGHTS_SHM_NAME)
    weights: 'npt.NDArray' = np.ndarray(shared_args.weights_shape, dtype=np.float64, buffer=weights_shm.buf)

    log.info("worker %s: loading realizations from %s. . ." % (worker_name, task_args.table_filepath))
    component_probs = load_realizations(task_args.table_filepath)
    log.debug("worker %s: %s rlz_table " % (worker_name, component_probs.shape))

    # convert probabilities to rates
    time1 = time.perf_counter()
    component_rates = convert_probs_to_rates(component_probs)
    del component_probs
    time2 = time.perf_counter()
    log.debug('worker %s: time to convert_probs_to_rates() % 0.2f' % (worker_name, time2 - time1))

    component_rates = create_component_dict(component_rates)

    time3 = time.perf_counter()
    log.debug('worker %s: time to convert to dict and set digest index %0.2f seconds' % (worker_name, time3 - time2))
    log.debug('worker %s: rates_table %d' % (worker_name, len(component_rates)))

    composite_rates = build_branch_rates(branch_hash_table, component_rates)
    time4 = time.perf_counter()
    log.debug('worker %s: time to build_ranch_rates %0.2f seconds' % (worker_name, time4 - time3))

    log.info("worker %s:  calculating aggregates . . . " % worker_name)
    hazard = calculate_aggs(composite_rates, weights, agg_types)
    time5 = time.perf_counter()
    log.debug('worker %s: time to calculate aggs %0.2f seconds' % (worker_name, time5 - time4))

    probs = calculators.rate_to_prob(hazard, 1.0)
    if shared_args.skip_save:
        log.info("worker %s SKIPPING SAVE . . . " % worker_name)
    else:
        log.info("worker %s saving result . . . " % worker_name)
        save_aggregations(probs, location, vs30, imt, agg_types, hazard_model_id, compatibility_key)
    task_args.table_filepath.unlink()
    time6 = time.perf_counter()
    log.info('worker %s time to perform one aggregation %0.2f seconds' % (worker_name, time6 - time0))

calc_composite_rates(branch_hashes, component_rates, nlevels)

Calculate the rate for a single composite branch of the logic tree.

The rate for a composite branch is the sum of rates of the component branches.

Parameters:

Name Type Description Default
branch_hashes list[str]

the branch hashes for the component branches that comprise the composite branch

required
component_rates Dict[str, NDArray]

component realization rates keyed by component branch hash

required
nlevels int

the number of levels (IMTLs) in the rate array

required

Returns:

Name Type Description
rates NDArray

hazard rates for the composite realization D(nlevels,)

Source code in toshi_hazard_post/aggregation_calc.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def calc_composite_rates(
    branch_hashes: list[str], component_rates: Dict[str, 'npt.NDArray'], nlevels: int
) -> 'npt.NDArray':
    """Calculate the rate for a single composite branch of the logic tree.

    The rate for a composite branch is the sum of rates of the component branches.

    Args:
        branch_hashes: the branch hashes for the component branches that comprise the composite branch
        component_rates: component realization rates keyed by component branch hash
        nlevels: the number of levels (IMTLs) in the rate array

    Returns:
        rates: hazard rates for the composite realization D(nlevels,)
    """
    # option 1, iterate and lookup on dict or pd.Series
    rates = np.zeros((nlevels,))
    for branch_hash in branch_hashes:
        rates += component_rates[branch_hash]
    return rates

calculate_aggs(branch_rates, weights, agg_types)

Calculate weighted aggregate statistics of the composite realizations.

Parameters:

Name Type Description Default
branch_rates NDArray

hazard rates for every composite realization of the model with dimensions (branch, IMTL)

required
weights NDArray

one dimensional array of weights for composite branches with dimensions (branch,)

required
agg_types Sequence[str]

the aggregate statistics to be calculated (e.g., "mean", "0.5") with dimension (agg_type,)

required

Returns:

Name Type Description
hazard NDArray

aggregate rates array with dimension (agg_type, IMTL)

Source code in toshi_hazard_post/aggregation_calc.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def calculate_aggs(branch_rates: 'npt.NDArray', weights: 'npt.NDArray', agg_types: Sequence[str]) -> 'npt.NDArray':
    """Calculate weighted aggregate statistics of the composite realizations.

    Args:
        branch_rates: hazard rates for every composite realization of the model with dimensions (branch, IMTL)
        weights: one dimensional array of weights for composite branches with dimensions (branch,)
        agg_types: the aggregate statistics to be calculated (e.g., "mean", "0.5") with dimension (agg_type,)

    Returns:
        hazard: aggregate rates array with dimension (agg_type, IMTL)
    """
    log.debug(f"branch_rates with shape {branch_rates.shape}")
    log.debug(f"weights with shape {weights.shape}")
    log.debug(f"agg_types {agg_types}")

    def is_float(value):
        try:
            float(value)
            return True
        except ValueError:
            return False

    def index(lst, value):
        try:
            return lst.index(value)
        except ValueError:
            pass
        return None

    idx_mean = index(agg_types, "mean")
    idx_std = index(agg_types, "std")
    idx_cov = index(agg_types, "cov")
    idx_quantile = [is_float(agg) for agg in agg_types]
    quantile_points = [float(pt) for pt in agg_types if is_float(pt)]

    nlevels = branch_rates.shape[1]
    naggs = len(agg_types)
    aggs = np.empty((naggs, nlevels))

    if (idx_mean is not None) | (idx_std is not None) | (idx_cov is not None):
        mean, std = calculators.weighted_avg_and_std(branch_rates, weights)
        cov = calculators.cov(mean, std)
    if quantile_points:
        #  Have not figured out a faster way to do this than a loop. Each level has an independent interpolation
        for i in range(nlevels):
            aggs[idx_quantile, i] = calculators.weighted_quantiles(branch_rates[:, i], weights, quantile_points)

    if idx_mean is not None:
        aggs[idx_mean, :] = mean
    if idx_std is not None:
        aggs[idx_std, :] = std
    if idx_cov is not None:
        aggs[idx_cov, :] = cov

    log.debug(f"agg with shape {aggs.shape}")
    return aggs

convert_probs_to_rates(probs)

Convert probabilies to rates assuming probabilies are Poissonian.

The 'values' column in the input dataframe will be used to calculate rates assuming they are probabilities in one year. The output dataframe will have a 'rates' column.

Parameters:

Name Type Description Default
probs DataFrame

the probabilities dataframe.

required

Returns:

Type Description
DataFrame

the rates dataframe.

Source code in toshi_hazard_post/aggregation_calc.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def convert_probs_to_rates(probs: 'pd.DataFrame') -> 'pd.DataFrame':
    """Convert probabilies to rates assuming probabilies are Poissonian.

    The 'values' column in the input dataframe will be used to calculate rates assuming they are probabilities
    in one year. The output dataframe will have a 'rates' column.

    Args:
        probs: the probabilities dataframe.

    Returns:
        the rates dataframe.
    """
    probs['rates'] = probs['values'].apply(calculators.prob_to_rate, inv_time=1.0)
    return probs.drop('values', axis=1)

create_component_dict(component_rates)

Convert component branch rates DataFrame to dict.

The 'digest' is constructed by concatenating sources digest and gmms digest. The source and gmm digests are then dropped.

Parameters:

Name Type Description Default
component_rates DataFrame

data frame with the component branch rates.

required

Returns:

Type Description
Dict[str, NDArray]

The dictionary of rates keyed by the branch digest.

Source code in toshi_hazard_post/aggregation_calc.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def create_component_dict(component_rates: 'pd.DataFrame') -> Dict[str, 'npt.NDArray']:
    """Convert component branch rates DataFrame to dict.

    The 'digest' is constructed by concatenating sources digest and gmms digest. The source and gmm digests
    are then dropped.

    Args:
        component_rates: data frame with the component branch rates.

    Returns:
        The dictionary of rates keyed by the branch digest.
    """
    component_rates['digest'] = component_rates['sources_digest'] + component_rates['gmms_digest']
    component_rates.drop(['sources_digest', 'gmms_digest'], axis=1)
    component_rates.set_index('digest', inplace=True)

    return component_rates['rates'].to_dict()

load_realizations(filepath)

Load the realizations from an Appache ORC format file.

Parameters:

Name Type Description Default
filepath Path

the path of the ORC file.

required

Returns:

Type Description
DataFrame

The realization data.

Source code in toshi_hazard_post/aggregation_calc.py
82
83
84
85
86
87
88
89
90
91
92
def load_realizations(filepath: Path) -> 'pd.DataFrame':
    """Load the realizations from an Appache ORC format file.

    Args:
        filepath: the path of the ORC file.

    Returns:
        The realization data.
    """
    data_table = orc.read_table(filepath)
    return data_table.to_pandas()