Skip to content

Commit ba97ee4

Browse files
committed
Improve performance with efficient use of FieldMappingCache
1 parent 76a52b7 commit ba97ee4

File tree

3 files changed

+43
-52
lines changed

3 files changed

+43
-52
lines changed

eland/field_mappings.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -653,29 +653,25 @@ def date_field_format(self, es_field_name: str) -> str:
653653
self._mappings_capabilities.es_field_name == es_field_name
654654
].es_date_format.squeeze()
655655

656-
def field_name_pd_dtype(self, es_field_name: str) -> str:
656+
def field_name_pd_dtype(self, es_field_name: str) -> Tuple[bool, Optional[str]]:
657657
"""
658658
Parameters
659659
----------
660660
es_field_name: str
661661
662662
Returns
663663
-------
664-
pd_dtype: str
665-
The pandas data type we map to
664+
Tuple[bool, Optional[str]]
665+
If es_field_name is source field and the pandas data type we map to
666666
667-
Raises
668-
------
669-
KeyError
670-
If es_field_name does not exist in mapping
671667
"""
672668
if es_field_name not in self._mappings_capabilities.es_field_name:
673-
raise KeyError(f"es_field_name {es_field_name} does not exist")
669+
return False, "object"
674670

675-
pd_dtype = self._mappings_capabilities.loc[
671+
df: pd.DataFrame = self._mappings_capabilities.loc[
676672
self._mappings_capabilities.es_field_name == es_field_name
677-
].pd_dtype.squeeze()
678-
return pd_dtype
673+
]
674+
return df.is_source.squeeze(), df.pd_dtype.squeeze()
679675

680676
def add_scripted_field(
681677
self, scripted_field_name: str, display_name: str, pd_dtype: str

eland/query_compiler.py

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,20 @@ def __init__(
8181
Union[str, List[str], Tuple[str, ...], "Elasticsearch"]
8282
] = None,
8383
index_pattern: Optional[str] = None,
84-
display_names=None,
85-
index_field=None,
86-
to_copy=None,
84+
display_names: Optional[List[str]] = None,
85+
index_field: Optional[str] = None,
86+
to_copy: Optional["QueryCompiler"] = None,
8787
) -> None:
8888
# Implement copy as we don't deep copy the client
8989
if to_copy is not None:
90-
self._client = to_copy._client
91-
self._index_pattern = to_copy._index_pattern
90+
self._client: "Elasticsearch" = to_copy._client
91+
self._index_pattern: Optional[str] = to_copy._index_pattern
9292
self._index: "Index" = Index(self, to_copy._index.es_index_field)
9393
self._operations: "Operations" = copy.deepcopy(to_copy._operations)
9494
self._mappings: FieldMappings = copy.deepcopy(to_copy._mappings)
95+
self._field_mapping_cache: Optional["FieldMappingCache"] = copy.deepcopy(
96+
to_copy._field_mapping_cache
97+
)
9598
else:
9699
self._client = ensure_es_client(client)
97100
self._index_pattern = index_pattern
@@ -104,6 +107,8 @@ def __init__(
104107
)
105108
self._index = Index(self, index_field)
106109
self._operations = Operations()
110+
# This should only be initialized when ETL is done
111+
self._field_mapping_cache = None
107112

108113
@property
109114
def index(self) -> Index:
@@ -239,7 +244,8 @@ def _es_results_to_pandas(
239244
# This is one of the most performance critical areas of eland, and it repeatedly calls
240245
# self._mappings.field_name_pd_dtype and self._mappings.date_field_format
241246
# therefore create a simple cache for this data
242-
field_mapping_cache = FieldMappingCache(self._mappings)
247+
if self._field_mapping_cache is None:
248+
self._field_mapping_cache = FieldMappingCache(self._mappings)
243249

244250
rows = []
245251
index = []
@@ -266,7 +272,7 @@ def _es_results_to_pandas(
266272
index.append(index_field)
267273

268274
# flatten row to map correctly to 2D DataFrame
269-
rows.append(self._flatten_dict(row, field_mapping_cache))
275+
rows.append(self._flatten_dict(row))
270276

271277
# Create pandas DataFrame
272278
df = pd.DataFrame(data=rows, index=index)
@@ -279,7 +285,7 @@ def _es_results_to_pandas(
279285
)
280286

281287
for missing in missing_field_names:
282-
pd_dtype = self._mappings.field_name_pd_dtype(missing)
288+
_, pd_dtype = self._field_mapping_cache.field_name_pd_dtype(missing)
283289
df[missing] = pd.Series(dtype=pd_dtype)
284290

285291
# Rename columns
@@ -291,7 +297,7 @@ def _es_results_to_pandas(
291297

292298
return df
293299

294-
def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
300+
def _flatten_dict(self, y):
295301
out = {}
296302

297303
def flatten(x, name=""):
@@ -301,12 +307,10 @@ def flatten(x, name=""):
301307
is_source_field = False
302308
pd_dtype = "object"
303309
else:
304-
try:
305-
pd_dtype = field_mapping_cache.field_name_pd_dtype(name[:-1])
306-
is_source_field = True
307-
except KeyError:
308-
is_source_field = False
309-
pd_dtype = "object"
310+
(
311+
is_source_field,
312+
pd_dtype,
313+
) = self._field_mapping_cache.field_name_pd_dtype(name[:-1])
310314

311315
if not is_source_field and isinstance(x, dict):
312316
for a in x:
@@ -321,7 +325,7 @@ def flatten(x, name=""):
321325
# Coerce types - for now just datetime
322326
if pd_dtype == "datetime64[ns]":
323327
x = elasticsearch_date_to_pandas_date(
324-
x, field_mapping_cache.date_field_format(field_name)
328+
x, self._field_mapping_cache.date_field_format(field_name)
325329
)
326330

327331
# Elasticsearch can have multiple values for a field. These are represented as lists, so
@@ -791,28 +795,21 @@ class FieldMappingCache:
791795

792796
def __init__(self, mappings: "FieldMappings") -> None:
793797
self._mappings = mappings
798+
# This returns all the es_field_names
799+
self._es_field_names: List[str] = mappings.get_field_names()
800+
# Cache these to re-use later
801+
self._field_name_pd_dtype: Dict[str, Tuple[bool, Optional[str]]] = {
802+
i: mappings.field_name_pd_dtype(i) for i in self._es_field_names
803+
}
804+
self._date_field_format: Dict[str, str] = {
805+
i: mappings.date_field_format(i) for i in self._es_field_names
806+
}
794807

795-
self._field_name_pd_dtype: Dict[str, str] = dict()
796-
self._date_field_format: Dict[str, str] = dict()
797-
798-
def field_name_pd_dtype(self, es_field_name: str) -> str:
799-
if es_field_name in self._field_name_pd_dtype:
808+
def field_name_pd_dtype(self, es_field_name: str) -> Tuple[bool, Optional[str]]:
809+
if es_field_name not in self._field_name_pd_dtype:
810+
return False, "object"
811+
else:
800812
return self._field_name_pd_dtype[es_field_name]
801813

802-
pd_dtype = self._mappings.field_name_pd_dtype(es_field_name)
803-
804-
# cache this
805-
self._field_name_pd_dtype[es_field_name] = pd_dtype
806-
807-
return pd_dtype
808-
809814
def date_field_format(self, es_field_name: str) -> str:
810-
if es_field_name in self._date_field_format:
811-
return self._date_field_format[es_field_name]
812-
813-
es_date_field_format = self._mappings.date_field_format(es_field_name)
814-
815-
# cache this
816-
self._date_field_format[es_field_name] = es_date_field_format
817-
818-
return es_date_field_format
815+
return self._date_field_format[es_field_name]

tests/field_mappings/test_field_name_pd_dtype_pytest.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
# under the License.
1717

1818
# File called _pytest for PyCharm compatability
19-
import pytest
2019
from pandas.testing import assert_series_equal
2120

2221
from eland.field_mappings import FieldMappings
@@ -35,7 +34,7 @@ def test_all_formats(self):
3534
assert_series_equal(pd_flights.dtypes, ed_field_mappings.dtypes())
3635

3736
for es_field_name in FLIGHTS_MAPPING["mappings"]["properties"].keys():
38-
pd_dtype = ed_field_mappings.field_name_pd_dtype(es_field_name)
37+
_, pd_dtype = ed_field_mappings.field_name_pd_dtype(es_field_name)
3938

4039
assert pd_flights[es_field_name].dtype == pd_dtype
4140

@@ -44,5 +43,4 @@ def test_non_existant(self):
4443
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
4544
)
4645

47-
with pytest.raises(KeyError):
48-
ed_field_mappings.field_name_pd_dtype("unknown")
46+
assert (False, "object") == ed_field_mappings.field_name_pd_dtype("unknown")

0 commit comments

Comments
 (0)