-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy pathpolars_.py
More file actions
166 lines (134 loc) · 6.38 KB
/
polars_.py
File metadata and controls
166 lines (134 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors
import json
import operator
from collections.abc import Callable
from typing import Any
import polars as pl
import vortex.expr as ve
from ._lib import dtype as _dtype # pyright: ignore[reportMissingModuleSource]
def polars_to_vortex(expr: pl.Expr) -> ve.Expr:
"""Convert a Polars expression to a Vortex expression."""
data = json.loads(expr.meta.serialize(format="json")) # pyright: ignore[reportAny]
assert isinstance(data, dict)
return _polars_to_vortex(data) # pyright: ignore[reportUnknownArgumentType]
_OPS = {
"Eq": operator.eq,
"NotEq": operator.ne,
"Lt": operator.lt,
"LtEq": operator.le,
"Gt": operator.gt,
"GtEq": operator.ge,
"And": operator.and_,
"Or": operator.or_,
"LogicalAnd": operator.and_,
"LogicalOr": operator.or_,
}
_LITERAL_TYPES: dict[str, Callable[[Any | None], _dtype.DType]] = { # pyright: ignore[reportExplicitAny]
"Boolean": lambda v: _dtype.bool_(nullable=v is None),
"Int": lambda v: _dtype.int_(64, nullable=v is None),
"Int8": lambda v: _dtype.int_(8, nullable=v is None),
"Int16": lambda v: _dtype.int_(16, nullable=v is None),
"Int32": lambda v: _dtype.int_(32, nullable=v is None),
"Int64": lambda v: _dtype.int_(64, nullable=v is None),
"UInt8": lambda v: _dtype.uint(8, nullable=v is None),
"UInt16": lambda v: _dtype.uint(16, nullable=v is None),
"UInt32": lambda v: _dtype.uint(32, nullable=v is None),
"UInt64": lambda v: _dtype.uint(64, nullable=v is None),
"Float32": lambda v: _dtype.float_(32, nullable=v is None),
"Float64": lambda v: _dtype.float_(64, nullable=v is None),
"Null": lambda v: _dtype.null(),
"String": lambda v: _dtype.utf8(nullable=v is None),
"Binary": lambda v: _dtype.binary(nullable=v is None),
}
def _polars_to_vortex(expr: dict[str, Any]) -> ve.Expr: # pyright: ignore[reportExplicitAny]
"""Convert a Polars expression to a Vortex expression."""
if "BinaryExpr" in expr:
expr = expr["BinaryExpr"] # pyright: ignore[reportAny]
lhs = _polars_to_vortex(expr["left"]) # pyright: ignore[reportAny]
rhs = _polars_to_vortex(expr["right"]) # pyright: ignore[reportAny]
op = expr["op"] # pyright: ignore[reportAny]
if op not in _OPS:
raise NotImplementedError(f"Unsupported Polars binary operator: {op}")
return _OPS[op](lhs, rhs) # pyright: ignore[reportAny]
if "Column" in expr:
return ve.column(expr["Column"]) # pyright: ignore[reportAny]
# See https://github.com/pola-rs/polars/pull/21849
if "Scalar" in expr:
scalar = expr["Scalar"] # pyright: ignore[reportAny]
if "Null" in scalar:
value = None
dtype = "Null"
elif "String" in scalar:
value = scalar["String"] # pyright: ignore[reportAny]
dtype = "String"
elif "Int" in scalar:
value = scalar["Int"] # pyright: ignore[reportAny]
dtype = "Int64"
elif "Float" in scalar:
value = scalar["Float"] # pyright: ignore[reportAny]
dtype = "Float64"
elif "Float32" in scalar:
value = scalar["Float32"] # pyright: ignore[reportAny]
dtype = "Float32"
elif "Float64" in scalar:
value = scalar["Float64"] # pyright: ignore[reportAny]
dtype = "Float64"
elif "Int32" in scalar:
value = scalar["Int32"] # pyright: ignore[reportAny]
dtype = "Int32"
elif "Int64" in scalar:
value = scalar["Int64"] # pyright: ignore[reportAny]
dtype = "Int64"
else:
raise ValueError(f"Cannot convert to Vortex: unsupported Polars scalar value type {scalar}")
return ve.literal(_LITERAL_TYPES[dtype](value), value)
if "Literal" in expr:
expr = expr["Literal"] # pyright: ignore[reportAny]
literal_type = next(iter(expr.keys()), None)
if literal_type == "Scalar":
return _polars_to_vortex(expr)
# Special-case Series
if literal_type == "Series":
raise ValueError
# Special-case date-times
if literal_type == "DateTime":
(value, unit, tz) = expr[literal_type] # pyright: ignore[reportAny, reportAny]
if unit == "Nanoseconds":
unit = "ns"
elif unit == "Microseconds":
unit = "us"
elif unit == "Milliseconds":
unit = "ms"
elif unit == "Seconds":
unit = "s"
else:
raise NotImplementedError(f"Unsupported Polars date time unit: {unit}")
dtype = _dtype.timestamp(unit, tz=tz, nullable=value) # pyright: ignore[reportAny]
return ve.literal(dtype, value) # pyright: ignore[reportAny]
# Unwrap 'Dyn' scalars, whose type hasn't been established yet.
# (post https://github.com/pola-rs/polars/pull/21849)
if literal_type == "Dyn":
expr = expr["Dyn"] # pyright: ignore[reportAny]
literal_type = next(iter(expr.keys()), None)
if literal_type not in _LITERAL_TYPES:
raise NotImplementedError(f"Unsupported Polars literal type: {literal_type}")
value = expr[literal_type] # pyright: ignore[reportAny]
return ve.literal(_LITERAL_TYPES[literal_type](value), value) # pyright: ignore[reportAny]
if "Function" in expr:
expr = expr["Function"] # pyright: ignore[reportAny]
_inputs = [_polars_to_vortex(e) for e in expr["input"]] # pyright: ignore[reportAny]
fn = expr["function"] # pyright: ignore[reportAny]
if "Boolean" in fn:
fn = fn["Boolean"] # pyright: ignore[reportAny]
if "IsIn" in fn:
fn = fn["IsIn"] # pyright: ignore[reportAny]
if fn["nulls_equal"]:
raise ValueError(f"Unsupported nulls_equal argument in fn {expr}")
# Vortex doesn't support is-in, so we need to construct a series of ORs?
if "StringExpr" in fn:
fn = fn["StringExpr"] # pyright: ignore[reportAny]
if "Contains" in fn:
raise ValueError("Unsupported Polars StringExpr.Contains")
raise NotImplementedError(f"Unsupported Polars function: {fn}")
raise NotImplementedError(f"Unsupported Polars expression: {expr}")