forked from stumpy-dev/stumpy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstumped.py
More file actions
156 lines (120 loc) · 5.81 KB
/
stumped.py
File metadata and controls
156 lines (120 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
import numpy as np
from . import core, _stump, _get_first_stump_profile, _get_QT
import logging
logger = logging.getLogger(__name__)
def stumped(dask_client, T_A, m, T_B=None, ignore_trivial=True):
"""
This is highly distributed implementation around the Numba JIT-compiled
parallelized `_stump` function which computes the matrix profile according
to STOMP.
Parameters
----------
dask_client : client
A Dask Distributed client that is connected to a Dask scheduler and
Dask workers. Setting up a Dask distributed cluster is beyond the
scope of this library. Please refer to the Dask Distributed
documentation.
T_A : ndarray
The time series or sequence for which to compute the matrix profile
m : int
Window size
T_B : ndarray
The time series or sequence that contain your query subsequences
of interest. Default is `None` which corresponds to a self-join.
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`. Default is `True`.
Returns
-------
out : ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices, the third column consists of
the left matrix profile indices, and the fourth column consists of
the right matrix profile indices.
Notes
-----
DOI: 10.1109/ICDM.2016.0085
See Table II
This is a Dask distributed implementation of stump that scales
across multiple servers and is a convenience wrapper around the
parallelized `stump._stump` function
Timeseries, T_B, will be annotated with the distance location
(or index) of all its subsequences in another times series, T_A.
Return: For every subsequence, Q, in T_B, you will get a distance
and index for the closest subsequence in T_A. Thus, the array
returned will have length T_B.shape[0]-m+1. Additionally, the
left and right matrix profiles are also returned.
Note: Unlike in the Table II where T_A.shape is expected to be equal
to T_B.shape, this implementation is generalized so that the shapes of
T_A and T_B can be different. In the case where T_A.shape == T_B.shape,
then our algorithm reduces down to the same algorithm found in Table II.
Additionally, unlike STAMP where the exclusion zone is m/2, the default
exclusion zone for STOMP is m/4 (See Definition 3 and Figure 3).
For self-joins, set `ignore_trivial = True` in order to avoid the
trivial match.
Note that left and right matrix profiles are only available for self-joins.
"""
core.check_dtype(T_A)
if T_B is None:
T_B = T_A
core.check_dtype(T_B)
if ignore_trivial == False and core.are_arrays_equal(T_A, T_B): # pragma: no cover
logger.warning("Arrays T_A, T_B are equal, which implies a self-join.")
logger.warning("Try setting `ignore_trivial = True`.")
if ignore_trivial and core.are_arrays_equal(T_A, T_B) == False: # pragma: no cover
logger.warning("Arrays T_A, T_B are not equal, which implies an AB-join.")
logger.warning("Try setting `ignore_trivial = False`.")
n = T_B.shape[0]
k = T_A.shape[0]-m+1
l = n-m+1
excl_zone = int(np.ceil(m/4)) # See Definition 3 and Figure 3
M_T, Σ_T = core.compute_mean_std(T_A, m)
μ_Q, σ_Q = core.compute_mean_std(T_B, m)
out = np.empty((l, 4), dtype=object)
profile = np.empty((l,), dtype='float64')
indices = np.empty((l, 3), dtype='int64')
hosts = list(dask_client.ncores().keys())
nworkers = len(hosts)
# Scatter data to Dask cluster
T_A_future = dask_client.scatter(T_A, broadcast=True)
T_B_future = dask_client.scatter(T_B, broadcast=True)
M_T_future = dask_client.scatter(M_T, broadcast=True)
Σ_T_future = dask_client.scatter(Σ_T, broadcast=True)
μ_Q_future = dask_client.scatter(μ_Q, broadcast=True)
σ_Q_future = dask_client.scatter(σ_Q, broadcast=True)
step = 1+l//nworkers
QT_futures = []
QT_first_futures = []
for i, start in enumerate(range(0, l, step)):
profile[start], indices[start, :] = \
_get_first_stump_profile(start, T_A, T_B, m, excl_zone, M_T,
Σ_T, ignore_trivial)
QT, QT_first = _get_QT(start, T_A, T_B, m)
QT_future = dask_client.scatter(QT, workers=[hosts[i]])
QT_first_future = dask_client.scatter(QT_first, workers=[hosts[i]])
QT_futures.append(QT_future)
QT_first_futures.append(QT_first_future)
futures = []
for i, start in enumerate(range(0, l, step)):
stop = min(l, start + step)
futures.append(
dask_client.submit(_stump, T_A_future, T_B_future, m, stop,
excl_zone, M_T_future, Σ_T_future,
QT_futures[i], QT_first_futures[i], μ_Q_future,
σ_Q_future, k, ignore_trivial, start+1
)
)
results = dask_client.gather(futures)
for i, start in enumerate(range(0, l, step)):
stop = min(l, start + step)
profile[start+1:stop], indices[start+1:stop, :] = results[i]
out[:, 0] = profile
out[:, 1:4] = indices
threshold = 10e-6
if core.are_distances_too_small(out[:, 0], threshold=threshold): # pragma: no cover
logger.warning(f"A large number of values are smaller than {threshold}.")
logger.warning("For a self-join, try setting `ignore_trivial = True`.")
return out