Skip to content

Efficient kernel change point detection#

Bases: BaseEstimator

Find optimal change points (using dynamic programming or pelt) for the special case where the cost function derives from a kernel function.

Given a segment model, it computes the best partition for which the sum of errors is minimum.

See the user guide for more information.

Source code in ruptures/detection/kernelcpd.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class KernelCPD(BaseEstimator):
    """Find optimal change points (using dynamic programming or pelt) for the
    special case where the cost function derives from a kernel function.

    Given a segment model, it computes the best partition for which the
    sum of errors is minimum.

    See the [user guide](../../../user-guide/detection/kernelcpd) for
    more information.
    """

    def __init__(self, kernel="linear", min_size=2, jump=1, params=None):
        r"""Creates a KernelCPD instance.

        Available kernels:

        - `linear`: $k(x,y) = x^T y$.
        - `rbf`: $k(x, y) = exp(\gamma \|x-y\|^2)$ where $\gamma>0$
        (`gamma`) is a user-defined parameter.
        - `cosine`: $k(x,y)= (x^T y)/(\|x\|\|y\|)$.

        Args:
            kernel (str, optional): name of the kernel, ["linear", "rbf", "cosine"]
            min_size (int, optional): minimum segment length.
            jump (int, optional): not considered, set to 1.
            params (dict, optional): a dictionary of parameters for the kernel instance

        Raises:
            AssertionError: if the kernel is not implemented.
        """
        self.kernel_name = kernel
        err_msg = "Kernel not found: {}.".format(self.kernel_name)
        assert self.kernel_name in ["linear", "rbf", "cosine"], err_msg
        self.model_name = "l2" if self.kernel_name == "linear" else self.kernel_name
        self.params = params
        # load the associated cost function
        if self.params is None:
            self.cost = cost_factory(model=self.model_name)
        else:
            self.cost = cost_factory(model=self.model_name, **self.params)
        self.min_size = max(min_size, self.cost.min_size)

        self.jump = 1  # set to 1
        self.n_samples = None
        self.segmentations_dict = dict()  # {n_bkps: bkps_list}

    def fit(self, signal) -> "KernelCPD":
        """Update some parameters (no computation in this function).

        Args:
            signal (array): signal. Shape (n_samples, n_features) or (n_samples,).

        Returns:
            self
        """
        # update some params
        self.segmentations_dict = dict()
        self.cost.fit(signal.astype(np.double))
        self.n_samples = signal.shape[0]
        return self

    def predict(self, n_bkps=None, pen=None):
        """Return the optimal breakpoints. Must be called after the fit method.

        The breakpoints are associated with the signal passed to
        [`fit()`][ruptures.detection.kernelcpd.KernelCPD.fit].

        Args:
            n_bkps (int, optional): Number of change points. Defaults to None.
            pen (float, optional): penalty value (>0). Defaults to None. Not considered
                if n_bkps is not None.

        Raises:
            AssertionError: if `pen` or `n_bkps` is not strictly positive.
            BadSegmentationParameters: in case of impossible segmentation
                configuration

        Returns:
            list[int]: sorted list of breakpoints
        """
        # Our KernelCPD implementation with Pelt implies that we have at least one change point
        # raise an exception in case of impossible segmentation configuration
        if not sanity_check(
            n_samples=self.cost.signal.shape[0],
            n_bkps=1 if n_bkps is None else n_bkps,
            jump=self.jump,
            min_size=self.min_size,
        ):
            raise BadSegmentationParameters

        # dynamic programming if the user passed a number change points
        if n_bkps is not None:
            n_bkps = int(n_bkps)
            err_msg = "The number of changes must be positive: {}".format(n_bkps)
            assert n_bkps > 0, err_msg
            # if we have already computed it, return it without computations.
            if n_bkps in self.segmentations_dict:
                return self.segmentations_dict[n_bkps]
            # otherwise, call the C function
            if self.kernel_name == "linear":
                path_matrix_flat = ekcpd_L2(self.cost.signal, n_bkps, self.min_size)
            elif self.kernel_name == "rbf":
                path_matrix_flat = ekcpd_Gaussian(
                    self.cost.signal, n_bkps, self.min_size, self.cost.gamma
                )
            elif self.kernel_name == "cosine":
                path_matrix_flat = ekcpd_cosine(self.cost.signal, n_bkps, self.min_size)
            # from the path matrix, get all segmentation for k=1,...,n_bkps changes
            for k in range(1, n_bkps + 1):
                self.segmentations_dict[k] = from_path_matrix_to_bkps_list(
                    path_matrix_flat, k, self.n_samples, n_bkps, self.jump
                )
            return self.segmentations_dict[n_bkps]

        # Call pelt if the user passed a penalty
        if pen is not None:
            assert pen > 0, "The penalty must be positive: {}".format(pen)
            if self.kernel_name == "linear":
                path_matrix = ekcpd_pelt_L2(self.cost.signal, pen, self.min_size)
            elif self.kernel_name == "rbf":
                path_matrix = ekcpd_pelt_Gaussian(
                    self.cost.signal, pen, self.min_size, self.cost.gamma
                )
            elif self.kernel_name == "cosine":
                path_matrix = ekcpd_pelt_cosine(self.cost.signal, pen, self.min_size)

            my_bkps = list()
            ind = self.n_samples
            while ind > 0:
                my_bkps.append(ind)
                ind = path_matrix[ind]
            return my_bkps[::-1]

    def fit_predict(self, signal, n_bkps=None, pen=None):
        """Fit to the signal and return the optimal breakpoints.

        Helper method to call fit and predict once

        Args:
            signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
            n_bkps (int, optional): Number of change points. Defaults to None.
            pen (float, optional): penalty value (>0). Defaults to None. Not considered if n_bkps is not None.

        Returns:
            list: sorted list of breakpoints
        """
        self.fit(signal)
        return self.predict(n_bkps=n_bkps, pen=pen)

__init__(kernel='linear', min_size=2, jump=1, params=None) #

Creates a KernelCPD instance.

Available kernels:

  • linear: \(k(x,y) = x^T y\).
  • rbf: \(k(x, y) = exp(\gamma \|x-y\|^2)\) where \(\gamma>0\) (gamma) is a user-defined parameter.
  • cosine: \(k(x,y)= (x^T y)/(\|x\|\|y\|)\).

Parameters:

Name Type Description Default
kernel str

name of the kernel, ["linear", "rbf", "cosine"]

'linear'
min_size int

minimum segment length.

2
jump int

not considered, set to 1.

1
params dict

a dictionary of parameters for the kernel instance

None

Raises:

Type Description
AssertionError

if the kernel is not implemented.

Source code in ruptures/detection/kernelcpd.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self, kernel="linear", min_size=2, jump=1, params=None):
    r"""Creates a KernelCPD instance.

    Available kernels:

    - `linear`: $k(x,y) = x^T y$.
    - `rbf`: $k(x, y) = exp(\gamma \|x-y\|^2)$ where $\gamma>0$
    (`gamma`) is a user-defined parameter.
    - `cosine`: $k(x,y)= (x^T y)/(\|x\|\|y\|)$.

    Args:
        kernel (str, optional): name of the kernel, ["linear", "rbf", "cosine"]
        min_size (int, optional): minimum segment length.
        jump (int, optional): not considered, set to 1.
        params (dict, optional): a dictionary of parameters for the kernel instance

    Raises:
        AssertionError: if the kernel is not implemented.
    """
    self.kernel_name = kernel
    err_msg = "Kernel not found: {}.".format(self.kernel_name)
    assert self.kernel_name in ["linear", "rbf", "cosine"], err_msg
    self.model_name = "l2" if self.kernel_name == "linear" else self.kernel_name
    self.params = params
    # load the associated cost function
    if self.params is None:
        self.cost = cost_factory(model=self.model_name)
    else:
        self.cost = cost_factory(model=self.model_name, **self.params)
    self.min_size = max(min_size, self.cost.min_size)

    self.jump = 1  # set to 1
    self.n_samples = None
    self.segmentations_dict = dict()  # {n_bkps: bkps_list}

fit(signal) #

Update some parameters (no computation in this function).

Parameters:

Name Type Description Default
signal array

signal. Shape (n_samples, n_features) or (n_samples,).

required

Returns:

Type Description
KernelCPD

self

Source code in ruptures/detection/kernelcpd.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def fit(self, signal) -> "KernelCPD":
    """Update some parameters (no computation in this function).

    Args:
        signal (array): signal. Shape (n_samples, n_features) or (n_samples,).

    Returns:
        self
    """
    # update some params
    self.segmentations_dict = dict()
    self.cost.fit(signal.astype(np.double))
    self.n_samples = signal.shape[0]
    return self

fit_predict(signal, n_bkps=None, pen=None) #

Fit to the signal and return the optimal breakpoints.

Helper method to call fit and predict once

Parameters:

Name Type Description Default
signal array

signal. Shape (n_samples, n_features) or (n_samples,).

required
n_bkps int

Number of change points. Defaults to None.

None
pen float

penalty value (>0). Defaults to None. Not considered if n_bkps is not None.

None

Returns:

Name Type Description
list

sorted list of breakpoints

Source code in ruptures/detection/kernelcpd.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def fit_predict(self, signal, n_bkps=None, pen=None):
    """Fit to the signal and return the optimal breakpoints.

    Helper method to call fit and predict once

    Args:
        signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
        n_bkps (int, optional): Number of change points. Defaults to None.
        pen (float, optional): penalty value (>0). Defaults to None. Not considered if n_bkps is not None.

    Returns:
        list: sorted list of breakpoints
    """
    self.fit(signal)
    return self.predict(n_bkps=n_bkps, pen=pen)

predict(n_bkps=None, pen=None) #

Return the optimal breakpoints. Must be called after the fit method.

The breakpoints are associated with the signal passed to fit().

Parameters:

Name Type Description Default
n_bkps int

Number of change points. Defaults to None.

None
pen float

penalty value (>0). Defaults to None. Not considered if n_bkps is not None.

None

Raises:

Type Description
AssertionError

if pen or n_bkps is not strictly positive.

BadSegmentationParameters

in case of impossible segmentation configuration

Returns:

Type Description

list[int]: sorted list of breakpoints

Source code in ruptures/detection/kernelcpd.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def predict(self, n_bkps=None, pen=None):
    """Return the optimal breakpoints. Must be called after the fit method.

    The breakpoints are associated with the signal passed to
    [`fit()`][ruptures.detection.kernelcpd.KernelCPD.fit].

    Args:
        n_bkps (int, optional): Number of change points. Defaults to None.
        pen (float, optional): penalty value (>0). Defaults to None. Not considered
            if n_bkps is not None.

    Raises:
        AssertionError: if `pen` or `n_bkps` is not strictly positive.
        BadSegmentationParameters: in case of impossible segmentation
            configuration

    Returns:
        list[int]: sorted list of breakpoints
    """
    # Our KernelCPD implementation with Pelt implies that we have at least one change point
    # raise an exception in case of impossible segmentation configuration
    if not sanity_check(
        n_samples=self.cost.signal.shape[0],
        n_bkps=1 if n_bkps is None else n_bkps,
        jump=self.jump,
        min_size=self.min_size,
    ):
        raise BadSegmentationParameters

    # dynamic programming if the user passed a number change points
    if n_bkps is not None:
        n_bkps = int(n_bkps)
        err_msg = "The number of changes must be positive: {}".format(n_bkps)
        assert n_bkps > 0, err_msg
        # if we have already computed it, return it without computations.
        if n_bkps in self.segmentations_dict:
            return self.segmentations_dict[n_bkps]
        # otherwise, call the C function
        if self.kernel_name == "linear":
            path_matrix_flat = ekcpd_L2(self.cost.signal, n_bkps, self.min_size)
        elif self.kernel_name == "rbf":
            path_matrix_flat = ekcpd_Gaussian(
                self.cost.signal, n_bkps, self.min_size, self.cost.gamma
            )
        elif self.kernel_name == "cosine":
            path_matrix_flat = ekcpd_cosine(self.cost.signal, n_bkps, self.min_size)
        # from the path matrix, get all segmentation for k=1,...,n_bkps changes
        for k in range(1, n_bkps + 1):
            self.segmentations_dict[k] = from_path_matrix_to_bkps_list(
                path_matrix_flat, k, self.n_samples, n_bkps, self.jump
            )
        return self.segmentations_dict[n_bkps]

    # Call pelt if the user passed a penalty
    if pen is not None:
        assert pen > 0, "The penalty must be positive: {}".format(pen)
        if self.kernel_name == "linear":
            path_matrix = ekcpd_pelt_L2(self.cost.signal, pen, self.min_size)
        elif self.kernel_name == "rbf":
            path_matrix = ekcpd_pelt_Gaussian(
                self.cost.signal, pen, self.min_size, self.cost.gamma
            )
        elif self.kernel_name == "cosine":
            path_matrix = ekcpd_pelt_cosine(self.cost.signal, pen, self.min_size)

        my_bkps = list()
        ind = self.n_samples
        while ind > 0:
            my_bkps.append(ind)
            ind = path_matrix[ind]
        return my_bkps[::-1]