Skip to content

Bottom-up segmentation#

Bases: BaseEstimator

Bottom-up segmentation.

Source code in ruptures/detection/bottomup.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class BottomUp(BaseEstimator):
    """Bottom-up segmentation."""

    def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None):
        """Initialize a BottomUp instance.

        Args:
            model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None.
            custom_cost (BaseCost, optional): custom cost function. Defaults to None.
            min_size (int, optional): minimum segment length. Defaults to 2 samples.
            jump (int, optional): subsample (one every *jump* points). Defaults to 5 samples.
            params (dict, optional): a dictionary of parameters for the cost instance.
        """
        if custom_cost is not None and isinstance(custom_cost, BaseCost):
            self.cost = custom_cost
        else:
            if params is None:
                self.cost = cost_factory(model=model)
            else:
                self.cost = cost_factory(model=model, **params)
        self.min_size = max(min_size, self.cost.min_size)
        self.jump = jump
        self.n_samples = None
        self.signal = None
        self.leaves = None

    def _grow_tree(self):
        """Grow the entire binary tree."""
        partition = [(-self.n_samples, (0, self.n_samples))]
        stop = False
        while not stop:  # recursively divide the signal
            stop = True
            _, (start, end) = partition[0]
            mid = (start + end) * 0.5
            bkps = list()
            for bkp in range(start, end):
                if bkp % self.jump == 0:
                    if bkp - start >= self.min_size and end - bkp >= self.min_size:
                        bkps.append(bkp)
            if len(bkps) > 0:  # at least one admissible breakpoint was found
                bkp = min(bkps, key=lambda x: abs(x - mid))
                heapq.heappop(partition)
                heapq.heappush(partition, (-bkp + start, (start, bkp)))
                heapq.heappush(partition, (-end + bkp, (bkp, end)))
                stop = False

        partition.sort(key=lambda x: x[1])
        # compute segment costs
        leaves = list()
        for _, (start, end) in partition:
            val = self.cost.error(start, end)
            leaf = Bnode(start, end, val)
            leaves.append(leaf)
        return leaves

    @lru_cache(maxsize=None)
    def merge(self, left, right):
        """Merge two contiguous segments."""
        assert left.end == right.start, "Segments are not contiguous."
        start, end = left.start, right.end
        val = self.cost.error(start, end)
        node = Bnode(start, end, val, left=left, right=right)
        return node

    def _seg(self, n_bkps=None, pen=None, epsilon=None):
        """Compute the bottom-up segmentation.

        The stopping rule depends on the parameter passed to the function.

        Args:
            n_bkps (int): number of breakpoints to find before stopping.
            penalty (float): penalty value (>0)
            epsilon (float): reconstruction budget (>0)

        Returns:
            dict: partition dict {(start, end): cost value,...}
        """
        leaves = sorted(self.leaves)
        keys = [leaf.start for leaf in leaves]
        removed = set()
        merged = []
        for left, right in pairwise(leaves):
            candidate = self.merge(left, right)
            heapq.heappush(merged, (candidate.gain, candidate))
        # bottom up fusion
        stop = False
        while not stop:
            stop = True

            try:
                gain, leaf = heapq.heappop(merged)
                # Ignore any merge candidates whose left or right children
                # no longer exist (because they were merged with another node).
                # It's cheaper to do this here than during the initial merge.
                while leaf.left in removed or leaf.right in removed:
                    gain, leaf = heapq.heappop(merged)
            # if merged is empty (all nodes have been merged).
            except IndexError:
                break

            if n_bkps is not None:
                if len(leaves) > n_bkps + 1:
                    stop = False
            elif pen is not None:
                if gain < pen:
                    stop = False
            elif epsilon is not None:
                if sum(leaf_tmp.val for leaf_tmp in leaves) < epsilon:
                    stop = False

            if not stop:
                # updates the list of leaves (i.e. segments of the partitions)
                # find the merged segments indexes
                left_idx = bisect_left(keys, leaf.left.start)
                # replace leaf.left
                leaves[left_idx] = leaf
                keys[left_idx] = leaf.start
                # remove leaf.right
                del leaves[left_idx + 1]
                del keys[left_idx + 1]
                # add to the set of removed segments.
                removed.add(leaf.left)
                removed.add(leaf.right)
                # add new merge candidates
                if left_idx > 0:
                    left_candidate = self.merge(leaves[left_idx - 1], leaf)
                    heapq.heappush(merged, (left_candidate.gain, left_candidate))
                if left_idx < len(leaves) - 1:
                    right_candidate = self.merge(leaf, leaves[left_idx + 1])
                    heapq.heappush(merged, (right_candidate.gain, right_candidate))

        partition = {(leaf.start, leaf.end): leaf.val for leaf in leaves}
        return partition

    def fit(self, signal) -> "BottomUp":
        """Compute params to segment signal.

        Args:
            signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,).

        Returns:
            self
        """
        # update some params
        self.cost.fit(signal)
        self.merge.cache_clear()
        if signal.ndim == 1:
            (n_samples,) = signal.shape
        else:
            n_samples, _ = signal.shape
        self.n_samples = n_samples
        self.leaves = self._grow_tree()
        return self

    def predict(self, n_bkps=None, pen=None, epsilon=None):
        """Return the optimal breakpoints.

        Must be called after the fit method. The breakpoints are associated with the signal passed
        to [`fit()`][ruptures.detection.bottomup.BottomUp.fit].
        The stopping rule depends on the parameter passed to the function.

        Args:
            n_bkps (int): number of breakpoints to find before stopping.
            pen (float): penalty value (>0)
            epsilon (float): reconstruction budget (>0)

        Raises:
            AssertionError: if none of `n_bkps`, `pen`, `epsilon` is set.
            BadSegmentationParameters: in case of impossible segmentation
                configuration

        Returns:
            list: sorted list of breakpoints
        """
        msg = "Give a parameter."
        assert any(param is not None for param in (n_bkps, pen, epsilon)), msg

        # raise an exception in case of impossible segmentation configuration
        if not sanity_check(
            n_samples=self.cost.signal.shape[0],
            n_bkps=0 if n_bkps is None else n_bkps,
            jump=self.jump,
            min_size=self.min_size,
        ):
            raise BadSegmentationParameters

        partition = self._seg(n_bkps=n_bkps, pen=pen, epsilon=epsilon)
        bkps = sorted(e for s, e in partition.keys())
        return bkps

    def fit_predict(self, signal, n_bkps=None, pen=None, epsilon=None):
        """Fit to the signal and return the optimal breakpoints.

        Helper method to call fit and predict once

        Args:
            signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
            n_bkps (int): number of breakpoints.
            pen (float): penalty value (>0)
            epsilon (float): reconstruction budget (>0)

        Returns:
            list: sorted list of breakpoints
        """
        self.fit(signal)
        return self.predict(n_bkps=n_bkps, pen=pen, epsilon=epsilon)

__init__(model='l2', custom_cost=None, min_size=2, jump=5, params=None) #

Initialize a BottomUp instance.

Parameters:

Name Type Description Default
model str

segment model, ["l1", "l2", "rbf"]. Not used if 'custom_cost' is not None.

'l2'
custom_cost BaseCost

custom cost function. Defaults to None.

None
min_size int

minimum segment length. Defaults to 2 samples.

2
jump int

subsample (one every jump points). Defaults to 5 samples.

5
params dict

a dictionary of parameters for the cost instance.

None
Source code in ruptures/detection/bottomup.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None):
    """Initialize a BottomUp instance.

    Args:
        model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None.
        custom_cost (BaseCost, optional): custom cost function. Defaults to None.
        min_size (int, optional): minimum segment length. Defaults to 2 samples.
        jump (int, optional): subsample (one every *jump* points). Defaults to 5 samples.
        params (dict, optional): a dictionary of parameters for the cost instance.
    """
    if custom_cost is not None and isinstance(custom_cost, BaseCost):
        self.cost = custom_cost
    else:
        if params is None:
            self.cost = cost_factory(model=model)
        else:
            self.cost = cost_factory(model=model, **params)
    self.min_size = max(min_size, self.cost.min_size)
    self.jump = jump
    self.n_samples = None
    self.signal = None
    self.leaves = None

fit(signal) #

Compute params to segment signal.

Parameters:

Name Type Description Default
signal array

signal to segment. Shape (n_samples, n_features) or (n_samples,).

required

Returns:

Type Description
BottomUp

self

Source code in ruptures/detection/bottomup.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def fit(self, signal) -> "BottomUp":
    """Compute params to segment signal.

    Args:
        signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,).

    Returns:
        self
    """
    # update some params
    self.cost.fit(signal)
    self.merge.cache_clear()
    if signal.ndim == 1:
        (n_samples,) = signal.shape
    else:
        n_samples, _ = signal.shape
    self.n_samples = n_samples
    self.leaves = self._grow_tree()
    return self

fit_predict(signal, n_bkps=None, pen=None, epsilon=None) #

Fit to the signal and return the optimal breakpoints.

Helper method to call fit and predict once

Parameters:

Name Type Description Default
signal array

signal. Shape (n_samples, n_features) or (n_samples,).

required
n_bkps int

number of breakpoints.

None
pen float

penalty value (>0)

None
epsilon float

reconstruction budget (>0)

None

Returns:

Name Type Description
list

sorted list of breakpoints

Source code in ruptures/detection/bottomup.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def fit_predict(self, signal, n_bkps=None, pen=None, epsilon=None):
    """Fit to the signal and return the optimal breakpoints.

    Helper method to call fit and predict once

    Args:
        signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
        n_bkps (int): number of breakpoints.
        pen (float): penalty value (>0)
        epsilon (float): reconstruction budget (>0)

    Returns:
        list: sorted list of breakpoints
    """
    self.fit(signal)
    return self.predict(n_bkps=n_bkps, pen=pen, epsilon=epsilon)

merge(left, right) cached #

Merge two contiguous segments.

Source code in ruptures/detection/bottomup.py
67
68
69
70
71
72
73
74
@lru_cache(maxsize=None)
def merge(self, left, right):
    """Merge two contiguous segments."""
    assert left.end == right.start, "Segments are not contiguous."
    start, end = left.start, right.end
    val = self.cost.error(start, end)
    node = Bnode(start, end, val, left=left, right=right)
    return node

predict(n_bkps=None, pen=None, epsilon=None) #

Return the optimal breakpoints.

Must be called after the fit method. The breakpoints are associated with the signal passed to fit(). The stopping rule depends on the parameter passed to the function.

Parameters:

Name Type Description Default
n_bkps int

number of breakpoints to find before stopping.

None
pen float

penalty value (>0)

None
epsilon float

reconstruction budget (>0)

None

Raises:

Type Description
AssertionError

if none of n_bkps, pen, epsilon is set.

BadSegmentationParameters

in case of impossible segmentation configuration

Returns:

Name Type Description
list

sorted list of breakpoints

Source code in ruptures/detection/bottomup.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def predict(self, n_bkps=None, pen=None, epsilon=None):
    """Return the optimal breakpoints.

    Must be called after the fit method. The breakpoints are associated with the signal passed
    to [`fit()`][ruptures.detection.bottomup.BottomUp.fit].
    The stopping rule depends on the parameter passed to the function.

    Args:
        n_bkps (int): number of breakpoints to find before stopping.
        pen (float): penalty value (>0)
        epsilon (float): reconstruction budget (>0)

    Raises:
        AssertionError: if none of `n_bkps`, `pen`, `epsilon` is set.
        BadSegmentationParameters: in case of impossible segmentation
            configuration

    Returns:
        list: sorted list of breakpoints
    """
    msg = "Give a parameter."
    assert any(param is not None for param in (n_bkps, pen, epsilon)), msg

    # raise an exception in case of impossible segmentation configuration
    if not sanity_check(
        n_samples=self.cost.signal.shape[0],
        n_bkps=0 if n_bkps is None else n_bkps,
        jump=self.jump,
        min_size=self.min_size,
    ):
        raise BadSegmentationParameters

    partition = self._seg(n_bkps=n_bkps, pen=pen, epsilon=epsilon)
    bkps = sorted(e for s, e in partition.keys())
    return bkps