-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutil.py
278 lines (218 loc) · 9.15 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import typing as tp
from copy import deepcopy
import numpy as np
DTYPE_DATETIME_KIND = 'M'
DTYPE_TIMEDELTA_KIND = 'm'
DTYPE_COMPLEX_KIND = 'c'
DTYPE_FLOAT_KIND = 'f'
DTYPE_OBJECT_KIND = 'O'
DTYPE_BOOL_KIND = 'b'
DTYPE_STR_KINDS = ('U', 'S') # S is np.bytes_
DTYPE_INT_KINDS = ('i', 'u') # signed and unsigned
DTYPE_INEXACT_KINDS = (DTYPE_FLOAT_KIND, DTYPE_COMPLEX_KIND) # kinds that support NaN values
DTYPE_NAT_KINDS = (DTYPE_DATETIME_KIND, DTYPE_TIMEDELTA_KIND)
DTYPE_OBJECT = np.dtype(object)
DTYPE_BOOL = np.dtype(bool)
DTYPE_STR = np.dtype(str)
DTYPE_INT_DEFAULT = np.dtype(np.int64)
DTYPE_FLOAT_DEFAULT = np.dtype(np.float64)
DTYPE_COMPLEX_DEFAULT = np.dtype(np.complex128)
DTYPES_BOOL = (DTYPE_BOOL,)
DTYPES_INEXACT = (DTYPE_FLOAT_DEFAULT, DTYPE_COMPLEX_DEFAULT)
def mloc(array: np.ndarray) -> int:
'''Return the memory location of an array.
'''
return tp.cast(int, array.__array_interface__['data'][0])
def immutable_filter(src_array: np.ndarray) -> np.ndarray:
'''Pass an immutable array; otherwise, return an immutable copy of the provided array.
'''
if src_array.flags.writeable:
dst_array = src_array.copy()
dst_array.flags.writeable = False
return dst_array
return src_array # keep it as is
def name_filter(name):
'''
For name attributes on containers, only permit recursively hashable objects.
'''
try:
hash(name)
except TypeError:
raise TypeError('unhashable name attribute', name)
return name
def shape_filter(array: np.ndarray) -> tp.Tuple[int, int]:
'''Represent a 1D array as a 2D array with length as rows of a single-column array.
Return:
row, column count for a block of ndim 1 or ndim 2.
'''
if array.ndim == 1:
return array.shape[0], 1
return array.shape #type: ignore
def column_2d_filter(array: np.ndarray) -> np.ndarray:
'''Reshape a flat ndim 1 array into a 2D array with one columns and rows of length. This is used (a) for getting string representations and (b) for using np.concatenate and np binary operators on 1D arrays.
'''
# it is not clear when reshape is a copy or a view
if array.ndim == 1:
return np.reshape(array, (array.shape[0], 1))
return array
def column_1d_filter(array: np.ndarray) -> np.ndarray:
'''
Ensure that a column that might be 2D or 1D is returned as a 1D array.
'''
if array.ndim == 2:
# could assert that array.shape[1] == 1, but this will raise if does not fit
return np.reshape(array, array.shape[0])
return array
def row_1d_filter(array: np.ndarray) -> np.ndarray:
'''
Ensure that a row that might be 2D or 1D is returned as a 1D array.
'''
if array.ndim == 2:
# could assert that array.shape[0] == 1, but this will raise if does not fit
return np.reshape(array, array.shape[1])
return array
#-------------------------------------------------------------------------------
def resolve_dtype(dt1: np.dtype, dt2: np.dtype) -> np.dtype:
'''
Given two dtypes, return a compatible dtype that can hold both contents without truncation.
'''
# NOTE: this is not taking into account endianness; it is not clear if this is important
# NOTE: np.dtype(object) == np.object_, so we can return np.object_
# if the same, return that dtype
if dt1 == dt2:
return dt1
# if either is object, we go to object
if dt1.kind == 'O' or dt2.kind == 'O':
return DTYPE_OBJECT
dt1_is_str = dt1.kind in DTYPE_STR_KINDS
dt2_is_str = dt2.kind in DTYPE_STR_KINDS
if dt1_is_str and dt2_is_str:
# if both are string or string-like, we can use result type to get the longest string
return np.result_type(dt1, dt2)
dt1_is_dt = dt1.kind == DTYPE_DATETIME_KIND
dt2_is_dt = dt2.kind == DTYPE_DATETIME_KIND
if dt1_is_dt and dt2_is_dt:
# if both are datetime, result type will work
return np.result_type(dt1, dt2)
dt1_is_tdelta = dt1.kind == DTYPE_TIMEDELTA_KIND
dt2_is_tdelta = dt2.kind == DTYPE_TIMEDELTA_KIND
if dt1_is_tdelta and dt2_is_tdelta:
# this may or may not work
# TypeError: Cannot get a common metadata divisor for NumPy datetime metadata [D] and [Y] because they have incompatible nonlinear base time units
try:
return np.result_type(dt1, dt2)
except TypeError:
return DTYPE_OBJECT
dt1_is_bool = dt1.type is np.bool_
dt2_is_bool = dt2.type is np.bool_
# if any one is a string or a bool, we have to go to object; we handle both cases being the same above; result_type gives a string in mixed cases
if (dt1_is_str or dt2_is_str
or dt1_is_bool or dt2_is_bool
or dt1_is_dt or dt2_is_dt
or dt1_is_tdelta or dt2_is_tdelta
):
return DTYPE_OBJECT
# if not a string or an object, can use result type
return np.result_type(dt1, dt2)
def resolve_dtype_iter(dtypes: tp.Iterable[np.dtype]) -> np.dtype:
'''Given an iterable of one or more dtypes, do pairwise comparisons to determine compatible overall type. Once we get to object we can stop checking and return object.
Args:
dtypes: iterable of one or more dtypes.
'''
dtypes = iter(dtypes)
dt_resolve = next(dtypes)
for dt in dtypes:
dt_resolve = resolve_dtype(dt_resolve, dt)
if dt_resolve == DTYPE_OBJECT:
return dt_resolve
return dt_resolve
def array_deepcopy(
array: np.ndarray,
memo: tp.Optional[tp.Dict[int, tp.Any]],
) -> np.ndarray:
'''
Create a deepcopy of an array, handling memo lookup, insertion, and object arrays.
'''
ident = id(array)
if memo is not None and ident in memo:
return memo[ident]
if array.dtype == DTYPE_OBJECT:
post = deepcopy(array, memo)
else:
post = array.copy()
if post.ndim > 0:
post.flags.writeable = array.flags.writeable
if memo is not None:
memo[ident] = post
return post
def isna_element(value: tp.Any) -> bool:
'''Return Boolean if value is an NA. This does not yet handle pd.NA
'''
try:
return np.isnan(value) #type: ignore
except TypeError:
pass
if isinstance(value, (np.datetime64, np.timedelta64)):
return np.isnat(value) #type: ignore
return value is None
def dtype_from_element(value: tp.Optional[tp.Hashable]) -> np.dtype:
'''Given an arbitrary hashable to be treated as an element, return the appropriate dtype. This was created to avoid using np.array(value).dtype, which for a Tuple does not return object.
'''
if value is np.nan:
# NOTE: this will not catch all NaN instances, but will catch any default NaNs in function signatures that reference the same NaN object found on the NP root namespace
return DTYPE_FLOAT_DEFAULT
if value is None:
return DTYPE_OBJECT
if isinstance(value, tuple):
return DTYPE_OBJECT
if hasattr(value, 'dtype'):
return value.dtype #type: ignore
# NOTE: calling array and getting dtype on np.nan is faster than combining isinstance, isnan calls
return np.array(value).dtype
#-------------------------------------------------------------------------------
# tools for handling duplicates
def array_to_duplicated_hashable(
array: np.ndarray,
axis: int = 0,
exclude_first: bool = False,
exclude_last: bool = False,
) -> np.ndarray:
'''
Algorithm for finding duplicates in unsortable arrays for hashables. This will always be an object array.
'''
# np.unique fails under the same conditions that sorting fails, so there is no need to try np.unique: must go to set drectly.
len_axis = array.shape[axis]
if array.ndim == 1:
value_source = array
to_hashable = None
else:
if axis == 0:
value_source = array # will iterate rows
else:
value_source = (array[:, i] for i in range(len_axis))
# values will be arrays; must convert to tuples to make hashable
to_hashable = tuple
is_dupe = np.full(len_axis, False)
# could exit early with a set, but would have to hash all array twice to go to set and dictionary
# creating a list for each entry and tracking indices would be very expensive
unique_to_first: tp.Dict[tp.Hashable, int] = {} # value to first occurence
dupe_to_first: tp.Dict[tp.Hashable, int] = {}
dupe_to_last: tp.Dict[tp.Hashable, int] = {}
for idx, v in enumerate(value_source):
if to_hashable:
v = to_hashable(v)
if v not in unique_to_first:
unique_to_first[v] = idx
else:
# v has been seen before; upate Boolean array
is_dupe[idx] = True
# if no entry in dupe to first, no update with value in unique to first, which is the index this values was first seen
if v not in dupe_to_first:
dupe_to_first[v] = unique_to_first[v]
# always update last
dupe_to_last[v] = idx
if exclude_last: # overwrite with False
is_dupe[list(dupe_to_last.values())] = False
if not exclude_first: # add in first values
is_dupe[list(dupe_to_first.values())] = True
return is_dupe