-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrud.py
148 lines (119 loc) ยท 4.6 KB
/
crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import datetime
import json
from sqlalchemy import or_
import scheme
from sqlalchemy.orm import Session, joinedload
from model import Account, Question, InterviewQuestion, Inteverview, Answer
import random
# account
def find_account_by_email(db: Session, email):
db_account = db.query(Account).filter(email == Account.email)
if db_account:
for acc in db_account:
return acc
# question
def find_all_question(db: Session):
db_questions = db.query(Question).all()
return list(db_questions)
def find_question_by_id(db: Session, question_id: int):
return db.query(Question).get(question_id)
def find_question_by_categories(db: Session, categories: list):
db_questions = None
if len(categories) == 1:
db_questions = db.query(Question).filter(
categories[0] == Question.category
)
elif len(categories) == 2:
db_questions = db.query(Question).filter(or_(
categories[0] == Question.category, categories[1] == Question.category
))
elif len(categories) == 3:
db_questions = db.query(Question).filter(or_(
categories[0] == Question.category, categories[1] == Question.category, categories[2] == Question.category))
else:
db_questions = db.query(Question)
if len(list(db_questions)) >= 10:
return random.sample(list(db_questions), 10)
else:
return list(db_questions)
# interview
def create_interview(db: Session, categories: list, account: Account, questions: list):
while len(categories) < 3:
categories.append("")
db_interview = Inteverview(
account=account.id,
created_at=datetime.datetime.now(),
category_1=categories[0],
category_2=categories[1],
category_3=categories[2]
)
db.add(db_interview)
db.commit()
db.refresh(db_interview)
db_interview_questions = []
for i in range(len(questions)):
db_interview_questions.append(InterviewQuestion(
interview=db_interview.id,
question=questions[i].id
))
db.add_all(db_interview_questions)
db.commit()
for i in range(len(db_interview_questions)):
db.refresh(db_interview_questions[i])
return db_interview, db_interview_questions
def find_interview_questions(db: Session, interview_questions: list):
res = []
for i in range(len(interview_questions)):
db_interview_question = db.query(InterviewQuestion).options(
joinedload(InterviewQuestion.question_model)).filter(
InterviewQuestion.id == interview_questions[i].id)
res.append(list(db_interview_question)[0])
return res
def find_question_by_interview_question(db: Session, interview_question_id: int):
db_iq = db.query(InterviewQuestion).options(
joinedload(InterviewQuestion.question_model)).filter(InterviewQuestion.id == interview_question_id)
iq = list(db_iq)[0]
return iq.question_model.title
def update_interview_question(db: Session, iq_id: int, answer: str, gpt_answer: str, gpt_additional: str):
gpt_additional = json.loads(gpt_additional.replace("'", "\""))
db.query(InterviewQuestion).filter_by(id=iq_id).update({
"answer": answer,
"gpt_answer": gpt_answer,
"additional_question_1": gpt_additional['question_1'],
"additional_question_2": gpt_additional['question_2'],
"additional_question_3": gpt_additional['question_3']
})
db.commit()
def find_interview(db: Session, interview_id: int):
return db.query(InterviewQuestion).options(
joinedload(InterviewQuestion.question_model)).filter(
InterviewQuestion.interview == interview_id)
def update_interview_question_additional_answer(db: Session, sequence: int, question_id: int, answer: str):
target = ""
if sequence == 1:
target = "additional_answer_1"
elif sequence == 2:
target = "additional_answer_2"
elif sequence == 3:
target = "additional_answer_3"
db.query(InterviewQuestion).filter_by(id=question_id).update({
target: answer
})
db.commit()
def update_question_gpt_answer(db: Session, db_question, gpt_answer):
answer = Answer(
question=db_question.id,
content=gpt_answer,
type="TYPE_GPT"
)
db.add(answer)
db.commit()
db.refresh(answer)
def delete_gpt_answers(db: Session):
db.query(Answer).filter(
Answer.type == "TYPE_GPT"
).delete()
db.commit()
def find_interview_question_by_pk(db, iq_id: int):
return list(db.query(InterviewQuestion).options(
joinedload(InterviewQuestion.question_model)).filter(InterviewQuestion.id == iq_id))[0]