-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfinal_model.py
More file actions
283 lines (232 loc) · 9.83 KB
/
final_model.py
File metadata and controls
283 lines (232 loc) · 9.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
sys.path.append('src')
import pandas as pd
import numpy as np
from data_preprocessing import DataPreprocessor
from sklearn.ensemble import RandomForestClassifier
from sklearn.multioutput import MultiOutputClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import f1_score, classification_report
import xgboost as xgb
import lightgbm as lgb
import joblib
class FinalRecommendationSystem:
"""最终的推荐系统"""
def __init__(self):
self.preprocessor = DataPreprocessor()
self.model = None
self.best_params = None
def load_and_process_data(self):
"""加载和处理训练数据"""
print("=== 加载和处理数据 ===")
self.X, self.y, self.processed_data = self.preprocessor.process_all('data/train.xlsx')
print(f"数据处理完成: {self.X.shape}, {self.y.shape}")
return self.X, self.y
def hyperparameter_tuning(self):
"""超参数优化"""
print("\n=== 开始超参数优化 ===")
# 定义候选模型和参数
models_params = {
'RandomForest': {
'model': RandomForestClassifier(random_state=42, n_jobs=-1),
'params': {
'estimator__n_estimators': [100, 200],
'estimator__max_depth': [10, 15, 20],
'estimator__min_samples_split': [2, 5],
'estimator__min_samples_leaf': [1, 2]
}
},
'XGBoost': {
'model': xgb.XGBClassifier(random_state=42, eval_metric='logloss'),
'params': {
'estimator__n_estimators': [100, 200],
'estimator__max_depth': [6, 8, 10],
'estimator__learning_rate': [0.1, 0.15],
'estimator__subsample': [0.8, 0.9]
}
},
'LightGBM': {
'model': lgb.LGBMClassifier(random_state=42, verbosity=-1),
'params': {
'estimator__n_estimators': [100, 200],
'estimator__max_depth': [6, 8, 10],
'estimator__learning_rate': [0.1, 0.15],
'estimator__num_leaves': [31, 50]
}
}
}
best_score = 0
best_model_name = None
# 分割数据
X_train, X_val, y_train, y_val = train_test_split(
self.X, self.y, test_size=0.2, random_state=42
)
for name, config in models_params.items():
print(f"\n优化 {name}...")
# 包装为MultiOutput
multi_model = MultiOutputClassifier(config['model'])
# 由于MultiOutput的复杂性,我们简化参数搜索
# 只测试几个关键参数组合
if name == 'RandomForest':
best_estimator = MultiOutputClassifier(
RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=2,
min_samples_leaf=1,
random_state=42,
n_jobs=-1
)
)
elif name == 'XGBoost':
best_estimator = MultiOutputClassifier(
xgb.XGBClassifier(
n_estimators=200,
max_depth=8,
learning_rate=0.15,
subsample=0.9,
random_state=42,
eval_metric='logloss'
)
)
else: # LightGBM
best_estimator = MultiOutputClassifier(
lgb.LGBMClassifier(
n_estimators=200,
max_depth=8,
learning_rate=0.15,
num_leaves=50,
random_state=42,
verbosity=-1
)
)
# 训练和评估
best_estimator.fit(X_train, y_train)
y_pred = best_estimator.predict(X_val)
# 计算F1分数
f1_scores = []
for i in range(3):
f1 = f1_score(y_val.iloc[:, i], y_pred[:, i])
f1_scores.append(f1)
avg_f1 = np.mean(f1_scores)
print(f"{name} 平均F1: {avg_f1:.4f}")
print(f"各产品F1: {f1_scores}")
if avg_f1 > best_score:
best_score = avg_f1
best_model_name = name
self.model = best_estimator
print(f"\n🏆 最佳模型: {best_model_name}, F1: {best_score:.4f}")
return best_model_name, best_score
def train_final_model(self):
"""训练最终模型"""
print("\n=== 训练最终模型 ===")
if self.model is None:
# 如果没有经过超参数优化,使用默认的最佳配置
self.model = MultiOutputClassifier(
xgb.XGBClassifier(
n_estimators=200,
max_depth=8,
learning_rate=0.15,
subsample=0.9,
random_state=42,
eval_metric='logloss'
)
)
# 使用全部数据训练最终模型
self.model.fit(self.X, self.y)
print("✅ 最终模型训练完成")
# 保存模型
joblib.dump(self.model, 'final_recommendation_model.pkl')
joblib.dump(self.preprocessor, 'preprocessor.pkl')
print("✅ 模型已保存")
def evaluate_final_model(self):
"""评估最终模型(交叉验证)"""
print("\n=== 最终模型评估 ===")
from sklearn.model_selection import cross_val_score
# 对每个产品进行5折交叉验证
cv_results = {}
product_names = ['产品1(E开票)', '产品2(小U产品)', '产品3(银杏认证)']
for i, product in enumerate(product_names):
estimator = self.model.estimators_[i]
scores = cross_val_score(
estimator, self.X, self.y.iloc[:, i],
cv=5, scoring='f1', n_jobs=-1
)
cv_results[product] = scores
print(f"{product}: F1 = {scores.mean():.4f} ± {scores.std():.4f}")
# 计算平均F1分数
avg_f1 = np.mean([scores.mean() for scores in cv_results.values()])
print(f"\n交叉验证平均F1分数: {avg_f1:.4f}")
return cv_results
def analyze_feature_importance(self):
"""分析特征重要性"""
print("\n=== 特征重要性分析 ===")
if hasattr(self.model.estimators_[0], 'feature_importances_'):
# 计算平均特征重要性
importances = np.mean([
estimator.feature_importances_
for estimator in self.model.estimators_
], axis=0)
feature_importance_df = pd.DataFrame({
'feature': self.preprocessor.feature_columns,
'importance': importances
}).sort_values('importance', ascending=False)
print("特征重要性排序:")
print(feature_importance_df)
# 保存特征重要性
feature_importance_df.to_csv('feature_importance.csv', index=False)
print("✅ 特征重要性已保存到 feature_importance.csv")
return feature_importance_df
else:
print("当前模型不支持特征重要性分析")
return None
def create_sample_predictions(self):
"""创建示例预测(基于训练数据的前几行)"""
print("\n=== 创建示例预测 ===")
# 使用前10个样本作为示例
sample_X = self.X.head(10)
predictions = self.model.predict(sample_X)
# 创建预测结果DataFrame
sample_results = pd.DataFrame({
'商户ID': range(1, 11),
'产品1(E开票)': predictions[:, 0],
'产品2(小U产品)': predictions[:, 1],
'产品3(银杏认证)': predictions[:, 2]
})
print("示例预测结果:")
print(sample_results)
# 保存示例预测
sample_results.to_csv('sample_predictions.csv', index=False)
print("✅ 示例预测已保存到 sample_predictions.csv")
return sample_results
def main():
"""主函数"""
print("🚀 开始构建最终推荐系统")
# 创建推荐系统实例
rec_system = FinalRecommendationSystem()
# 1. 加载和处理数据
X, y = rec_system.load_and_process_data()
# 2. 超参数优化
best_model_name, best_score = rec_system.hyperparameter_tuning()
# 3. 训练最终模型
rec_system.train_final_model()
# 4. 评估最终模型
cv_results = rec_system.evaluate_final_model()
# 5. 分析特征重要性
feature_importance = rec_system.analyze_feature_importance()
# 6. 创建示例预测
sample_predictions = rec_system.create_sample_predictions()
print(f"\n🎉 推荐系统构建完成!")
print(f"最佳模型: {best_model_name}")
print(f"最佳F1分数: {best_score:.4f}")
print(f"文件保存:")
print(f" - final_recommendation_model.pkl (模型文件)")
print(f" - preprocessor.pkl (预处理器)")
print(f" - feature_importance.csv (特征重要性)")
print(f" - sample_predictions.csv (示例预测)")
return rec_system
if __name__ == "__main__":
rec_system = main()