-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathbuffered_writer.py
60 lines (45 loc) · 1.62 KB
/
buffered_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from fastparquet import write
import pandas as pd
from pathlib import Path
class BufferedParquetWriter:
def __init__(self, filename: str, buffer_size: int = 1000) -> None:
self.keys = []
self.values = []
self.filename = filename
self.buffer_size = buffer_size
def add(self, key: int, value: str):
self.keys.append(key)
self.values.append(value)
if len(self.keys) < self.buffer_size:
return
self.flush_to_file()
def flush_to_file(self):
path = Path(self.filename)
if not path.parent.exists():
path.parent.mkdir()
df = pd.DataFrame(data={"item_id": self.keys, "item_json": self.values})
write(path, df, compression="ZSTD", append=path.exists())
self.keys.clear()
self.values.clear()
def close(self):
if self.keys:
self.flush_to_file()
class BucketedBufferedParquetWriter:
def __init__(self, basename: str):
self.basename = basename
self.writers = {}
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
for v in self.writers.values():
v.close()
def add(self, key: int, value: str) -> None:
data_bucket = BucketedBufferedParquetWriter.id_to_bucket(key)
if data_bucket not in self.writers:
self.writers[data_bucket] = BufferedParquetWriter(
f"{self.basename}-{data_bucket}.parquet"
)
self.writers[data_bucket].add(key, value)
@staticmethod
def id_to_bucket(data_id: int) -> str:
return f"{data_id // 1000000:04}"