-
Notifications
You must be signed in to change notification settings - Fork 764
/
Copy pathfields.py
120 lines (99 loc) · 3.79 KB
/
fields.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from collections import OrderedDict
from functools import partial
from asgiref.sync import sync_to_async
from django.core.exceptions import ValidationError
from graphene.types.argument import to_arguments
from graphene.types.enum import EnumType
from graphene.utils.str_converters import to_snake_case
from ..fields import DjangoConnectionField
from .utils import get_filtering_args_from_filterset, get_filterset_class
def convert_enum(data):
"""
Check if the data is a enum option (or potentially nested list of enum option)
and convert it to its value.
This method is used to pre-process the data for the filters as they can take an
graphene.Enum as argument, but filters (from django_filters) expect a simple value.
"""
if isinstance(data, list):
return [convert_enum(item) for item in data]
if isinstance(type(data), EnumType):
return data.value
else:
return data
class DjangoFilterConnectionField(DjangoConnectionField):
def __init__(
self,
type_,
fields=None,
order_by=None,
extra_filter_meta=None,
filterset_class=None,
*args,
**kwargs,
):
self._fields = fields
self._provided_filterset_class = filterset_class
self._filterset_class = None
self._filtering_args = None
self._extra_filter_meta = extra_filter_meta
self._base_args = None
super().__init__(type_, *args, **kwargs)
@property
def args(self):
return to_arguments(self._base_args or OrderedDict(), self.filtering_args)
@args.setter
def args(self, args):
self._base_args = args
@property
def filterset_class(self):
if not self._filterset_class:
fields = self._fields or self.node_type._meta.filter_fields
meta = {"model": self.model, "fields": fields}
if self._extra_filter_meta:
meta.update(self._extra_filter_meta)
filterset_class = (
self._provided_filterset_class or self.node_type._meta.filterset_class
)
self._filterset_class = get_filterset_class(filterset_class, **meta)
return self._filterset_class
@property
def filtering_args(self):
if not self._filtering_args:
self._filtering_args = get_filtering_args_from_filterset(
self.filterset_class, self.node_type
)
return self._filtering_args
@classmethod
def resolve_queryset(
cls, connection, iterable, info, args, filtering_args, filterset_class
):
def filter_kwargs():
kwargs = {}
for k, v in args.items():
if k in filtering_args:
if k == "order_by" and v is not None:
v = to_snake_case(v)
kwargs[k] = convert_enum(v)
return kwargs
qs = super().resolve_queryset(connection, iterable, info, args)
if info.is_awaitable(qs):
async def filter_async(qs):
filterset = filterset_class(
data=filter_kwargs(), queryset=await qs, request=info.context
)
if await sync_to_async(filterset.is_valid)():
return filterset.qs
raise ValidationError(filterset.form.errors.as_json())
return filter_async(qs)
filterset = filterset_class(
data=filter_kwargs(), queryset=qs, request=info.context
)
if filterset.is_valid():
return filterset.qs
raise ValidationError(filterset.form.errors.as_json())
def get_queryset_resolver(self):
return partial(
self.resolve_queryset,
filterset_class=self.filterset_class,
filtering_args=self.filtering_args,
)