Skip to content

Pelt#

ruptures.detection.pelt.Pelt #

Penalized change point detection.

For a given model and penalty level, computes the segmentation which minimizes the constrained sum of approximation errors.

__init__(self, model='l2', custom_cost=None, min_size=2, jump=5, params=None) special #

Initialize a Pelt instance.

Parameters:

Name Type Description Default
model str

segment model, ["l1", "l2", "rbf"]. Not used if 'custom_cost' is not None.

'l2'
custom_cost BaseCost

custom cost function. Defaults to None.

None
min_size int

minimum segment length.

2
jump int

subsample (one every jump points).

5
params dict

a dictionary of parameters for the cost instance.

None
Source code in ruptures/detection/pelt.py
def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None):
    """Initialize a Pelt instance.

    Args:
        model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None.
        custom_cost (BaseCost, optional): custom cost function. Defaults to None.
        min_size (int, optional): minimum segment length.
        jump (int, optional): subsample (one every *jump* points).
        params (dict, optional): a dictionary of parameters for the cost instance.
    """
    if custom_cost is not None and isinstance(custom_cost, BaseCost):
        self.cost = custom_cost
    else:
        if params is None:
            self.cost = cost_factory(model=model)
        else:
            self.cost = cost_factory(model=model, **params)
    self.min_size = max(min_size, self.cost.min_size)
    self.jump = jump
    self.n_samples = None

fit(self, signal) #

Set params.

Parameters:

Name Type Description Default
signal array

signal to segment. Shape (n_samples, n_features) or (n_samples,).

required

Returns:

Type Description
Pelt

self

Source code in ruptures/detection/pelt.py
def fit(self, signal) -> "Pelt":
    """Set params.

    Args:
        signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,).

    Returns:
        self
    """
    # update params
    self.cost.fit(signal)
    if signal.ndim == 1:
        (n_samples,) = signal.shape
    else:
        n_samples, _ = signal.shape
    self.n_samples = n_samples
    return self

fit_predict(self, signal, pen) #

Fit to the signal and return the optimal breakpoints.

Helper method to call fit and predict once

Parameters:

Name Type Description Default
signal array

signal. Shape (n_samples, n_features) or (n_samples,).

required
pen float

penalty value (>0)

required

Returns:

Type Description
list

sorted list of breakpoints

Source code in ruptures/detection/pelt.py
def fit_predict(self, signal, pen):
    """Fit to the signal and return the optimal breakpoints.

    Helper method to call fit and predict once

    Args:
        signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
        pen (float): penalty value (>0)

    Returns:
        list: sorted list of breakpoints
    """
    self.fit(signal)
    return self.predict(pen)

predict(self, pen) #

Return the optimal breakpoints.

Must be called after the fit method. The breakpoints are associated with the signal passed to fit().

Parameters:

Name Type Description Default
pen float

penalty value (>0)

required

Exceptions:

Type Description
BadSegmentationParameters

in case of impossible segmentation configuration

Returns:

Type Description
list

sorted list of breakpoints

Source code in ruptures/detection/pelt.py
def predict(self, pen):
    """Return the optimal breakpoints.

    Must be called after the fit method. The breakpoints are associated with the signal passed
    to [`fit()`][ruptures.detection.pelt.Pelt.fit].

    Args:
        pen (float): penalty value (>0)

    Raises:
        BadSegmentationParameters: in case of impossible segmentation
            configuration

    Returns:
        list: sorted list of breakpoints
    """
    # raise an exception in case of impossible segmentation configuration
    if not sanity_check(
        n_samples=self.cost.signal.shape[0],
        n_bkps=1,
        jump=self.jump,
        min_size=self.min_size,
    ):
        raise BadSegmentationParameters

    partition = self._seg(pen)
    bkps = sorted(e for s, e in partition.keys())
    return bkps