-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy patharrow.py
More file actions
51 lines (45 loc) · 1.24 KB
/
arrow.py
File metadata and controls
51 lines (45 loc) · 1.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from pyarrow import ipc
import pyarrow as pa
from typing import List
def schema_to_bytes(sc) -> bytes:
"""
Convert a schema to bytes
"""
sink = pa.BufferOutputStream()
try:
with pa.ipc.new_stream(sink, sc) as writer:
pass
return sink.getvalue().to_pybytes()
finally:
sink.close()
def new_schema_from_bytes(buf: bytes) -> pa.Schema:
"""
Convert bytes to schema
"""
schema = None
with pa.ipc.open_stream(buf) as reader:
schema = reader.schema
return schema
def schemas_to_bytes(schemas: List[pa.Schema]):
res : List[bytes] = []
for schema in schemas:
res.append(schema_to_bytes(schema))
return res
def new_schemas_from_bytes(bufs: List[bytes]) -> List[pa.Schema]:
res : List[pa.Schema] = []
for buf in bufs:
res.append(new_schema_from_bytes(buf))
return res
def record_to_bytes(rec: pa.RecordBatch) -> bytes:
sink = pa.BufferOutputStream()
try:
with pa.ipc.new_stream(sink, rec.schema) as writer:
writer.write_batch(rec)
return sink.getvalue().to_pybytes()
finally:
sink.close()
def new_record_from_bytes(buf :bytes) -> pa.RecordBatch:
rec = None
with pa.ipc.open_stream(buf) as reader:
rec = reader.read_next_batch()
return rec