forked from stumpy-dev/stumpy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmstumped.py
More file actions
124 lines (98 loc) · 4.18 KB
/
mstumped.py
File metadata and controls
124 lines (98 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
import numpy as np
from . import core
from . import _mstump, _get_first_mstump_profile, _get_multi_QT, _multi_compute_mean_std
import logging
logger = logging.getLogger(__name__)
def mstumped(dask_client, T, m):
"""
This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_mstump` function which computes the multi-dimensional matrix
profile according to STOMP. Note that only self-joins are supported.
Parameters
----------
dask_client : client
A Dask Distributed client that is connected to a Dask scheduler and
Dask workers. Setting up a Dask distributed cluster is beyond the
scope of this library. Please refer to the Dask Distributed
documentation.
T : ndarray
The time series or sequence for which to compute the multi-dimensional
matrix profile
m : int
Window size
Returns
-------
P : ndarray
The multi-dimensioanl matrix profile. Each row of the array corresponds
to each matrix profile for a given dimension (i.e., the first row is the
1-D matrix profile and the second row is the 2-D matrix profile).
I : ndarray
The multi-dimensional matrix profile index where each row of the array
correspondsto each matrix profile index for a given dimension.
Notes
-----
DOI: 10.1109/ICDM.2017.66
See mSTAMP Algorithm
"""
hosts = list(dask_client.ncores().keys())
nworkers = len(hosts)
core.check_dtype(T)
d = T.shape[0]
n = T.shape[1]
k = n-m+1
excl_zone = int(np.ceil(m/4)) # See Definition 3 and Figure 3
M_T, Σ_T = _multi_compute_mean_std(T, m)
μ_Q, σ_Q = _multi_compute_mean_std(T, m)
P = np.empty((nworkers, d, k), dtype='float64')
D = np.zeros((nworkers, d, k), dtype='float64')
D_prime = np.zeros((nworkers, k), dtype='float64')
I = np.ones((nworkers, d, k), dtype='int64') * -1
# Scatter data to Dask cluster
T_future = dask_client.scatter(T, broadcast=True)
M_T_future = dask_client.scatter(M_T, broadcast=True)
Σ_T_future = dask_client.scatter(Σ_T, broadcast=True)
μ_Q_future = dask_client.scatter(μ_Q, broadcast=True)
σ_Q_future = dask_client.scatter(σ_Q, broadcast=True)
step = 1+k//nworkers
QT_futures = []
QT_first_futures = []
P_futures = []
I_futures = []
D_futures = []
D_prime_futures = []
for i, start in enumerate(range(0, k, step)):
P[i], I[i] = _get_first_mstump_profile(start, T, m, excl_zone, M_T, Σ_T)
P_future = dask_client.scatter(P[i], workers=[hosts[i]])
I_future = dask_client.scatter(I[i], workers=[hosts[i]])
D_future = dask_client.scatter(D[i], workers=[hosts[i]])
D_prime_future = dask_client.scatter(D_prime[i], workers=[hosts[i]])
P_futures.append(P_future)
I_futures.append(I_future)
D_futures.append(D_future)
D_prime_futures.append(D_prime_future)
QT, QT_first = _get_multi_QT(start, T, m)
QT_future = dask_client.scatter(QT, workers=[hosts[i]])
QT_first_future = dask_client.scatter(QT_first, workers=[hosts[i]])
QT_futures.append(QT_future)
QT_first_futures.append(QT_first_future)
futures = []
for i, start in enumerate(range(0, k, step)):
stop = min(k, start + step)
futures.append(
dask_client.submit(_mstump, T_future, m, P_futures[i], I_futures[i],
D_futures[i], D_prime_futures[i], stop, excl_zone,
M_T_future, Σ_T_future, QT_futures[i],
QT_first_futures[i], μ_Q_future, σ_Q_future, k,
start+1
)
)
results = dask_client.gather(futures)
for i, start in enumerate(range(0, k, step)):
P[i], I[i] = results[i]
col_mask = P[0] > P[i]
P[0, col_mask] = P[i, col_mask]
I[0, col_mask] = I[i, col_mask]
return P[0], I[0]