-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathcsv_uow.py
More file actions
66 lines (55 loc) · 2.25 KB
/
csv_uow.py
File metadata and controls
66 lines (55 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# pylint: disable=too-few-public-methods, protected-access
import csv
from datetime import datetime
from pathlib import Path
from typing import Dict
from allocation.domain import model
from allocation.adapters import repository
from allocation.service_layer import unit_of_work
class CsvRepository(repository.AbstractRepository):
def __init__(self, folder):
self._batches_path = Path(folder) / "batches.csv"
self._allocations_path = Path(folder) / "allocations.csv"
self._batches = {} # type: Dict[str, model.Batch]
self._load()
def get(self, reference):
return self._batches.get(reference)
def add(self, batch):
self._batches[batch.reference] = batch
def _load(self):
with self._batches_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
ref, sku = row["ref"], row["sku"]
qty = int(row["qty"])
if row["eta"]:
eta = datetime.strptime(row["eta"], "%Y-%m-%d").date()
else:
eta = None
self._batches[ref] = model.Batch(ref=ref, sku=sku, qty=qty, eta=eta)
if self._allocations_path.exists() is False:
return
with self._allocations_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
batchref, orderid, sku = row["batchref"], row["orderid"], row["sku"]
qty = int(row["qty"])
line = model.OrderLine(orderid, sku, qty)
batch = self._batches[batchref]
batch._allocations.add(line)
def list(self):
return list(self._batches.values())
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
def __init__(self, folder):
self.batches = CsvRepository(folder)
def commit(self):
with self.batches._allocations_path.open("w") as f:
writer = csv.writer(f)
writer.writerow(["orderid", "sku", "qty", "batchref"])
for batch in self.batches.list():
for line in batch._allocations:
writer.writerow(
[line.orderid, line.sku, line.qty, batch.reference]
)
def rollback(self):
pass