-
Notifications
You must be signed in to change notification settings - Fork 211
/
Copy path__init__.py
297 lines (242 loc) · 10.7 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import sys
import re
import copy
import django
from django.db import models, connections
from django.db.models.query import QuerySet
from django.db.models.sql.query import Query as SQLQuery
from django.db.models.query_utils import Q
from django_mongodb_engine.compiler import OPERATORS_MAP, NEGATED_OPERATORS_MAP
from djangotoolbox.fields import AbstractIterableField
if django.VERSION >= (1, 5):
from django.db.models.constants import LOOKUP_SEP
else:
from django.db.models.sql.constants import LOOKUP_SEP
ON_PYPY = hasattr(sys, 'pypy_version_info')
ALL_OPERATORS = dict(list(OPERATORS_MAP.items() + NEGATED_OPERATORS_MAP.items())).keys()
MONGO_DOT_FIELDS = ('DictField', 'ListField', 'SetField', 'EmbeddedModelField')
def _compiler_for_queryset(qs, which='SQLCompiler'):
connection = connections[qs.db]
Compiler = connection.ops.compiler(which)
return Compiler(qs.query, connection, connection.alias)
class RawQuery(SQLQuery):
def __init__(self, model, raw_query):
super(RawQuery, self).__init__(model)
self.raw_query = raw_query
def clone(self, *args, **kwargs):
clone = super(RawQuery, self).clone(*args, **kwargs)
clone.raw_query = self.raw_query
return clone
class RawQueryMixin:
def get_raw_query_set(self, raw_query):
return QuerySet(self.model, RawQuery(self.model, raw_query), self._db)
def raw_query(self, query=None):
"""
Does a raw MongoDB query. The optional parameter `query` is the spec
passed to PyMongo's :meth:`<Collection.find> pymongo.Collection.find`.
"""
return self.get_raw_query_set(query or {})
def raw_update(self, spec_or_q, update_dict, **kwargs):
"""
Does a raw MongoDB update. `spec_or_q` is either a MongoDB
filter dict or a :class:`~django.db.models.query_utils.Q`
instance that selects the records to update. `update_dict` is
a MongoDB style update document containing either a new
document or atomic modifiers such as ``$inc``.
Keyword arguments will be passed to :meth:`pymongo.Collection.update`.
"""
if isinstance(spec_or_q, dict):
queryset = self.get_raw_query_set(spec_or_q)
else:
queryset = self.filter(spec_or_q)
queryset._for_write = True
compiler = _compiler_for_queryset(queryset, 'SQLUpdateCompiler')
compiler.execute_update(update_dict, **kwargs)
raw_update.alters_data = True
class MapReduceResult(object):
"""
Represents one item of a MapReduce result array.
:param model: the model on that query the MapReduce was performed
:param key: the *key* from the result item
:param value: the *value* from the result item
"""
def __init__(self, model, key, value):
self.model = model
self.key = key
self.value = value
@classmethod
def from_entity(cls, model, entity):
return cls(model, entity['_id'], entity['value'])
def __repr__(self):
return '<%s model=%r key=%r value=%r>' % (self.__class__.__name__,
self.model.__name__,
self.key, self.value)
class MongoDBQuerySet(QuerySet):
def _filter_or_exclude(self, negate, *args, **kwargs):
if args or kwargs:
assert self.query.can_filter(), \
'Cannot filter a query once a slice has been taken.'
clone = self._clone()
clone._process_arg_filters(args, kwargs)
if negate:
clone.query.add_q(~Q(*args, **kwargs))
else:
clone.query.add_q(Q(*args, **kwargs))
return clone
def _get_mongo_field_names(self):
if not hasattr(self, '_mongo_field_names'):
self._mongo_field_names = []
for name in self.model._meta.get_all_field_names():
field = self.model._meta.get_field_by_name(name)[0]
if '.' not in name and field.get_internal_type() in MONGO_DOT_FIELDS:
self._mongo_field_names.append(name)
return self._mongo_field_names
def _process_arg_filters(self, args, kwargs):
for key, val in kwargs.items():
del kwargs[key]
key = self._maybe_add_dot_field(key)
kwargs[key] = val
for a in args:
if isinstance(a, Q):
self._process_q_filters(a)
def _process_q_filters(self, q):
for c in range(len(q.children)):
child = q.children[c]
if isinstance(child, Q):
self._process_q_filters(child)
elif isinstance(child, tuple):
key, val = child
key = self._maybe_add_dot_field(key)
q.children[c] = (key, val)
def _maybe_add_dot_field(self, name):
if LOOKUP_SEP in name and name.split(LOOKUP_SEP)[0] in self._get_mongo_field_names():
for op in ALL_OPERATORS:
if name.endswith(LOOKUP_SEP + op):
name = re.sub(LOOKUP_SEP + op + '$', '#' + op, name)
break
name = name.replace(LOOKUP_SEP, '.').replace('#', LOOKUP_SEP)
parts1 = name.split(LOOKUP_SEP)
if '.' in parts1[0] and parts1[0] not in self.model._meta.get_all_field_names():
parts2 = parts1[0].split('.')
parts3 = []
parts4 = []
model = self.model
while len(parts2) > 0:
part = parts2.pop(0)
field = model._meta.get_field_by_name(part)[0]
field_type = field.get_internal_type()
column = field.db_column
if column:
part = column
parts3.append(part)
if field_type == 'ListField':
list_type = field.item_field.get_internal_type()
if list_type == 'EmbeddedModelField':
field = field.item_field
field_type = list_type
if field_type == 'EmbeddedModelField':
model = field.embedded_model()
else:
while len(parts2) > 0:
part = parts2.pop(0)
if field_type in MONGO_DOT_FIELDS:
parts3.append(part)
else:
parts4.append(part)
db_column = '.'.join(parts3)
if field_type in MONGO_DOT_FIELDS:
field = AbstractIterableField(
db_column=db_column,
blank=True,
null=True,
editable=False,
)
else:
field = copy.deepcopy(field)
field.name = None
field.db_column = db_column
field.blank = True
field.null = True
field.editable = False
if hasattr(field, '_related_fields'):
delattr(field, '_related_fields')
parts5 = parts1[0].split('.')[0:len(parts3)]
name = '.'.join(parts5)
self.model.add_to_class(name, field)
name = LOOKUP_SEP.join([name] + parts4 + parts1[1:])
return name
def map_reduce(self, *args, **kwargs):
"""
Performs a Map/Reduce operation on all documents matching the query,
yielding a :class:`MapReduceResult` object for each result entity.
If the optional keyword argument `drop_collection` is ``True``, the
result collection will be dropped after fetching all results.
Any other arguments are passed to :meth:`Collection.map_reduce
<pymongo.collection.Collection.map_reduce>`.
"""
# TODO: Field name substitution (e.g. id -> _id).
drop_collection = kwargs.pop('drop_collection', False)
query = self._get_query()
kwargs.setdefault('query', query.mongo_query)
result_collection = query.collection.map_reduce(*args, **kwargs)
# TODO: Get rid of this.
# PyPy has no guaranteed garbage collection so we can't rely on
# the 'finally' suite of a generator (_map_reduce_cpython) to
# be executed in time (in fact, it isn't guaranteed to be
# executed *at all*). On the other hand, we *must* drop the
# collection if `drop_collection` is True so we can't use a
# generator in this case.
if drop_collection and ON_PYPY:
return self._map_reduce_pypy_drop_collection_hack(
result_collection)
else:
return self._map_reduce_cpython(result_collection,
drop_collection)
def _map_reduce_cpython(self, result_collection, drop_collection):
try:
for entity in result_collection.find():
yield MapReduceResult.from_entity(self.model, entity)
finally:
if drop_collection:
result_collection.drop()
def _map_reduce_pypy_drop_collection_hack(self, result_collection):
try:
return iter([MapReduceResult.from_entity(self.model, entity)
for entity in result_collection.find()])
finally:
result_collection.drop()
def inline_map_reduce(self, *args, **kwargs):
"""
Similar to :meth:`map_reduce` but runs the Map/Reduce in memory,
returning a list of :class:`MapReduceResults <MapReduceResult>`.
Does not take the `drop_collection` keyword argument since no result
collection is involved for in-memory Map/Reduce operations.
"""
query = self._get_query()
kwargs.setdefault('query', query.mongo_query)
return [MapReduceResult.from_entity(self.model, entity) for entity in
query.collection.inline_map_reduce(*args, **kwargs)]
def _get_query(self):
return _compiler_for_queryset(self).build_query()
def distinct(self, *args, **kwargs):
query = self._get_query()
return query.get_cursor().distinct(*args, **kwargs)
class MongoDBManager(models.Manager, RawQueryMixin):
"""
Lets you use Map/Reduce and raw query/update with your models::
class FooModel(models.Model):
...
objects = MongoDBManager()
"""
def map_reduce(self, *args, **kwargs):
return self.get_query_set().map_reduce(*args, **kwargs)
def inline_map_reduce(self, *args, **kwargs):
return self.get_query_set().inline_map_reduce(*args, **kwargs)
def get_query_set(self):
return MongoDBQuerySet(self.model, using=self._db)
def distinct(self, *args, **kwargs):
"""
Runs a :meth:`~pymongo.Collection.distinct` query against the
database.
"""
return self.get_query_set().distinct(*args, **kwargs)