-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy pathmask_future.rs
More file actions
94 lines (81 loc) · 2.56 KB
/
mask_future.rs
File metadata and controls
94 lines (81 loc) · 2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::future::Future;
use std::ops::Range;
use std::sync::Arc;
use futures::FutureExt;
use futures::TryFutureExt;
use futures::future::BoxFuture;
use futures::future::Shared;
use vortex_error::SharedVortexResult;
use vortex_error::VortexError;
use vortex_error::VortexResult;
use vortex_error::vortex_panic;
use vortex_mask::Mask;
/// A future that resolves to a mask.
#[derive(Clone)]
pub struct MaskFuture {
inner: Shared<BoxFuture<'static, SharedVortexResult<Mask>>>,
len: usize,
}
impl MaskFuture {
/// Create a new MaskFuture from a future that returns a mask.
pub fn new<F>(len: usize, fut: F) -> Self
where
F: Future<Output = VortexResult<Mask>> + Send + 'static,
{
Self {
inner: fut
.inspect(move |r| {
if let Ok(mask) = r
&& mask.len() != len {
vortex_panic!("MaskFuture created with future that returned mask of incorrect length (expected {}, got {})", len, mask.len());
}
})
.map_err(Arc::new)
.boxed()
.shared(),
len,
}
}
/// Returns the length of the mask.
pub fn len(&self) -> usize {
self.len
}
/// Returns true if the mask is empty.
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Create a MaskFuture from a ready mask.
pub fn ready(mask: Mask) -> Self {
Self::new(mask.len(), async move { Ok(mask) })
}
/// Create a MaskFuture that resolves to a mask with all values set to true.
pub fn new_true(row_count: usize) -> Self {
Self::ready(Mask::new_true(row_count))
}
/// Create a MaskFuture that resolves to a slice of the original mask.
pub fn slice(&self, range: Range<usize>) -> Self {
let inner = self.inner.clone();
Self::new(range.len(), async move { Ok(inner.await?.slice(range)) })
}
pub fn inspect(
self,
f: impl FnOnce(&SharedVortexResult<Mask>) + 'static + Send + Sync,
) -> Self {
let len = self.len;
Self {
inner: self.inner.inspect(f).boxed().shared(),
len,
}
}
}
impl Future for MaskFuture {
type Output = VortexResult<Mask>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.inner.poll_unpin(cx).map_err(VortexError::from)
}
}