Skip to content

Inversion solution file

inversion_solution_file

An InversionSolution archive file helper.

Supports files in OpenSHA InversionSolution archive format.

It provides conversions from the original file formats to pandas dataframe instances with caching and some error handling.

Attributes

ZIP_METHOD = zipfile.ZIP_STORED module-attribute

Classes

InversionSolutionFile()

Class to handle the OpenSHA modular archive file form.

Methods:

Name Description
to_archive

serialise an instance to a zip archive.

Initializes the InversionSolutionFile object.

Source code in solvis/solution/inversion_solution/inversion_solution_file.py
86
87
88
89
90
91
92
93
94
def __init__(self) -> None:
    """Initializes the InversionSolutionFile object."""
    self._rates: Optional[pd.DataFrame] = None
    self._ruptures: Optional[pd.DataFrame] = None
    self._indices: Optional[pd.DataFrame] = None
    self._section_target_slip_rates: Optional[pd.DataFrame] = None
    self._average_slips: Optional[pd.DataFrame] = None
    self._archive_path: Optional[Path] = None
    self._archive: Optional[io.BytesIO] = None
Attributes
RATES_PATH = 'solution/rates.csv' class-attribute instance-attribute

description of RATES_PATH

RUPTS_PATH = 'ruptures/properties.csv' class-attribute instance-attribute
INDICES_PATH = 'ruptures/indices.csv' class-attribute instance-attribute
AVG_SLIPS_PATH = 'ruptures/average_slips.csv' class-attribute instance-attribute
FAULTS_PATH = 'ruptures/fault_sections.geojson' class-attribute instance-attribute
METADATA_PATH = 'metadata.json' class-attribute instance-attribute
LOGIC_TREE_PATH = 'ruptures/logic_tree_branch.json' class-attribute instance-attribute
SECT_SLIP_RATES_PATH = 'ruptures/sect_slip_rates.csv' class-attribute instance-attribute
DATAFRAMES = [RATES_PATH, RUPTS_PATH, INDICES_PATH, AVG_SLIPS_PATH] class-attribute instance-attribute
archive_path: Optional[Path] property

Get the path to the current archive file.

Returns:

Type Description
Optional[Path]

The path to the archive file, or None if the archive is in a buffer.

archive: zipfile.ZipFile property

Open and cache the zip archive.

This property opens the zip archive from the specified path or buffer, caches it in memory for efficient access, and returns a zipfile.ZipFile object. If the archive is already cached, it simply returns the cached version.

Returns:

Type Description
zipfile.ZipFile

A zipfile.ZipFile object representing the open archive.

fault_sections: DataFrame[FaultSectionSchema] cached property

Get the fault sections with target slip rates.

Returns:

Type Description
DataFrame[FaultSectionSchema]

A DataFrame containing fault section data with target slip rates.

fault_regime: str cached property

Get the fault regime from the logic tree branches.

Returns:

Type Description
str

The fault regime name as a string CRUSTAL or SUBDUCTION respectively.

rupture_rates: DataFrame[RuptureRateSchema] property

Get the rupture rates from the archive.

Returns:

Type Description
DataFrame[RuptureRateSchema]

A DataFrame containing rupture rate data.

ruptures: DataFrame[RuptureSchema] cached property

Get the ruptures from the archive.

Returns:

Type Description
DataFrame[RuptureSchema]

A DataFrame containing rupture data.

indices: gpd.GeoDataFrame cached property

Get the rupture indices from the archive.

Returns:

Type Description
gpd.GeoDataFrame

A GeoDataFrame containing rupture index data.

average_slips: gpd.GeoDataFrame cached property

Get the average slips from the archive.

Returns:

Type Description
gpd.GeoDataFrame

A GeoDataFrame containing average slip data.

section_target_slip_rates: gpd.GeoDataFrame property

Get the section target slip rates from the archive.

Returns:

Type Description
gpd.GeoDataFrame

A GeoDataFrame containing section target slip rate data.

Functions
to_archive(archive_path_or_buffer: Union[Path, str, io.BytesIO], base_archive_path=None, compat=False)

Write the current solution file to a new zip archive.

Optionally cloning data from a base archive.

In non-compatible mode (the default) rupture ids may not be a contiguous, 0-based sequence, so the archive will not be suitable for use with opensha. Compatible mode will reindex rupture tables, so that the original rutpure ids are lost.

Parameters:

Name Type Description Default
archive_path_or_buffer Union[Path, str, io.BytesIO]

path or buffrer to write.

required
base_archive_path

path to an InversionSolution archive to clone data from.

None
compat

if True reindex the dataframes so that the archive remains compatible with opensha.

False
Source code in solvis/solution/inversion_solution/inversion_solution_file.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def to_archive(self, archive_path_or_buffer: Union[Path, str, io.BytesIO], base_archive_path=None, compat=False):
    """Write the current solution file to a new zip archive.

    Optionally cloning data from a base archive.

    In non-compatible mode (the default) rupture ids may not be a contiguous, 0-based sequence,
    so the archive will not be suitable for use with opensha. Compatible mode will reindex rupture tables,
    so that the original rutpure ids are lost.

    Args:
        archive_path_or_buffer: path or buffrer to write.
        base_archive_path: path to an InversionSolution archive to clone data from.
        compat: if True reindex the dataframes so that the archive remains compatible with opensha.
    """
    if base_archive_path is None:
        # try to use this archive, rather than a base archive
        zin = self.archive
    else:
        zin = zipfile.ZipFile(base_archive_path, 'r')

    log.debug('create zipfile %s with method %s' % (archive_path_or_buffer, ZIP_METHOD))
    zout = zipfile.ZipFile(archive_path_or_buffer, 'w', ZIP_METHOD)

    log.debug('to_archive: skipping files: %s' % self.DATAFRAMES)
    # this copies in memory, skipping the dataframe files we'll want to overwrite
    for item in zin.infolist():
        if item.filename in self.DATAFRAMES:
            continue
        log.debug("writing to zipfile: %s" % item.filename)
        buffer = zin.read(item.filename)
        zout.writestr(item, buffer)

    if compat:
        self._write_dataframes(zout, reindex=True)
    else:
        self._write_dataframes(zout, reindex=False)

    data_to_zip_direct(zout, WARNING, "WARNING.md")

    if isinstance(archive_path_or_buffer, io.BytesIO):
        self._archive = archive_path_or_buffer
    else:
        self._archive_path = cast(Path, archive_path_or_buffer)
set_props(rates: pd.DataFrame, ruptures: pd.DataFrame, indices: pd.DataFrame, fault_sections: pd.DataFrame, average_slips: pd.DataFrame)

Set the properties for this object.

Parameters:

Name Type Description Default
rates pd.DataFrame

A DataFrame containing rupture rate data.

required
ruptures pd.DataFrame

A DataFrame containing rupture data.

required
indices pd.DataFrame

A DataFrame containing rupture index data.

required
fault_sections pd.DataFrame

A DataFrame containing fault section data with target slip rates.

required
average_slips pd.DataFrame

A GeoDataFrame containing average slip data.

required
Source code in solvis/solution/inversion_solution/inversion_solution_file.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def set_props(
    self,
    rates: pd.DataFrame,
    ruptures: pd.DataFrame,
    indices: pd.DataFrame,
    fault_sections: pd.DataFrame,
    average_slips: pd.DataFrame,
):
    """
    Set the properties for this object.

    Args:
        rates: A DataFrame containing rupture rate data.
        ruptures: A DataFrame containing rupture data.
        indices: A DataFrame containing rupture index data.
        fault_sections: A DataFrame containing fault section data with target slip rates.
        average_slips: A GeoDataFrame containing average slip data.
    """
    self._rates = rates
    self._ruptures = ruptures
    self._fault_sections = fault_sections
    self._indices = indices
    self._average_slips = average_slips

Functions

data_to_zip_direct(z, data, name)

Source code in solvis/solution/inversion_solution/inversion_solution_file.py
50
51
52
53
54
def data_to_zip_direct(z, data, name):
    log.debug('data_to_zip_direct %s' % name)
    zinfo = zipfile.ZipInfo(name, time.localtime()[:6])
    zinfo.compress_type = zipfile.ZIP_DEFLATED
    z.writestr(zinfo, data)

reindex_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame

Source code in solvis/solution/inversion_solution/inversion_solution_file.py
57
58
59
60
61
def reindex_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
    new_df = dataframe.copy().reset_index(drop=True).drop(columns=['Rupture Index'])  # , errors='ignore')
    new_df.index = new_df.index.rename('Rupture Index')
    # print("NEW DF", new_df)
    return new_df