-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathlog_context.py
144 lines (114 loc) · 4.3 KB
/
log_context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import contextvars
import logging
from dataclasses import asdict, dataclass, field, replace
import sentry_sdk
from sentry_sdk import get_current_span
from shared.config import get_config
from database.models.core import Owner, Repository
log = logging.getLogger("log_context")
@dataclass
class LogContext:
"""
Class containing all the information we may want to add in logs and metrics.
"""
task_name: str = "???"
task_id: str = "???"
_populated_from_db = False
owner_username: str | None = None
owner_service: str | None = None
owner_plan: str | None = None
owner_id: int | None = None
repo_name: str | None = None
repo_id: int | None = None
commit_sha: str | None = None
# TODO: begin populating this again or remove it
# we can populate this again if we reduce query load by passing the Commit,
# Owner, and Repository models we use to populate the log context into the
# various task implementations so they don't fetch the same models again
commit_id: int | None = None
checkpoints_data: dict = field(default_factory=lambda: {})
@property
def sentry_trace_id(self):
if span := get_current_span():
return span.trace_id
return None
def as_dict(self):
d = asdict(self)
d.pop("_populated_from_db", None)
d["sentry_trace_id"] = self.sentry_trace_id
return d
def populate_from_sqlalchemy(self, dbsession):
"""
Attempt to use the information we have to fill in other context fields. For
example, if we have `self.repo_id` but not `self.owner_id`, we can look up
the latter in the database.
Ignore exceptions; no need to fail a task for a missing context field.
"""
if self._populated_from_db or not get_config(
"setup", "log_context", "populate", default=True
):
return
try:
# - commit_id (or commit_sha + repo_id) is enough to get everything else
# - however, this slams the commit table and we rarely really need the
# DB PK for a commit, so we don't do this.
# - repo_id is enough to get repo and owner
# - owner_id is just enough to get owner
if self.repo_id:
query = (
dbsession.query(
Repository.name,
Owner.ownerid,
Owner.username,
Owner.service,
Owner.plan,
)
.join(Repository.owner)
.filter(Repository.repoid == self.repo_id)
)
(
self.repo_name,
self.owner_id,
self.owner_username,
self.owner_service,
self.owner_plan,
) = query.first()
elif self.owner_id:
query = dbsession.query(
Owner.username, Owner.service, Owner.plan
).filter(Owner.ownerid == self.owner_id)
(self.owner_username, self.owner_service, self.owner_plan) = (
query.first()
)
except Exception:
log.exception("Failed to populate log context")
self._populated_from_db = True
def add_to_log_record(self, log_record: dict):
d = self.as_dict()
d.pop("checkpoints_data", None)
log_record["context"] = d
def add_to_sentry(self):
d = self.as_dict()
d.pop("sentry_trace_id", None)
d.pop("checkpoints_data", None)
sentry_sdk.set_tags(d)
_log_context = contextvars.ContextVar("log_context", default=LogContext())
def set_log_context(context: LogContext):
"""
Overwrite whatever is currently in the log context. Also sets tags in the
Sentry SDK appropriately.
"""
_log_context.set(context)
context.add_to_sentry()
def update_log_context(context: dict):
"""
Add new fields to the log context without removing old ones.
"""
current_context: LogContext = _log_context.get()
new_context = replace(current_context, **context)
set_log_context(new_context)
def get_log_context() -> LogContext:
"""
Access the log context.
"""
return _log_context.get()