-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpredict_with_optimized_model.py
More file actions
209 lines (163 loc) · 8.68 KB
/
predict_with_optimized_model.py
File metadata and controls
209 lines (163 loc) · 8.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
import joblib
import os
def predict_test_set_optimized(test_file_path='data/test.xlsx', output_file='predict_1_optimized.csv'):
"""
使用优化的集成模型对测试集进行预测
Args:
test_file_path: 测试集文件路径
output_file: 输出文件名
"""
print("=== 使用优化集成模型进行测试集预测 ===")
try:
# 1. 加载优化的模型
print("1. 加载优化的集成模型...")
system = joblib.load('final_optimized_system.pkl')
print("✅ 优化模型加载成功")
# 2. 加载测试数据
print("2. 加载测试数据...")
if test_file_path.endswith('.xlsx'):
test_data = pd.read_excel(test_file_path)
else:
test_data = pd.read_csv(test_file_path)
print(f"测试数据形状: {test_data.shape}")
print(f"测试数据列: {test_data.columns.tolist()}")
# 3. 数据预处理(模拟训练时的处理)
print("3. 对测试数据进行预处理...")
# 处理列名差异(如果有的话)
expected_columns = ['商户名称', '行业大类', 'mcc', '行业小类', '所属省份', '收单收入', '交易金额']
missing_columns = [col for col in expected_columns if col not in test_data.columns]
if missing_columns:
print(f"⚠️ 缺失列: {missing_columns}")
# 为缺失列填充默认值
for col in missing_columns:
if col in ['收单收入', '交易金额']:
test_data[col] = 0.0
else:
test_data[col] = 'Unknown'
# 使用训练时的预处理器处理测试数据
preprocessor = system.preprocessor
# 编码分类特征
encoded_test_data = test_data.copy()
categorical_cols = ['行业大类', '行业小类', '所属省份']
for col in categorical_cols:
if col in test_data.columns and col in preprocessor.label_encoders:
le = preprocessor.label_encoders[col]
# 处理新类别
test_values = test_data[col].fillna('Unknown')
encoded_values = []
for value in test_values:
if value in le.classes_:
encoded_values.append(le.transform([value])[0])
else:
# 使用最频繁的类别
encoded_values.append(0)
print(f"警告: 在{col}中发现未知类别: {value}")
encoded_test_data[f'{col}_encoded'] = encoded_values
# 添加mcc如果不存在
if 'mcc' not in encoded_test_data.columns:
encoded_test_data['mcc'] = 0
# 创建基础特征(匹配训练时的特征)
basic_features = ['行业大类_encoded', '行业小类_encoded', '所属省份_encoded', 'mcc', '收单收入', '交易金额']
# 确保所有基础特征都存在
for feature in basic_features:
if feature not in encoded_test_data.columns:
if 'encoded' in feature:
encoded_test_data[feature] = 0
else:
encoded_test_data[feature] = 0.0
# 添加训练时创建的衍生特征
encoded_test_data['收入交易比'] = encoded_test_data['收单收入'] / (encoded_test_data['交易金额'] + 1e-6)
encoded_test_data['单笔平均收入'] = encoded_test_data['收单收入'] / (encoded_test_data['交易金额'] / 1000 + 1e-6)
encoded_test_data['收单收入_log'] = np.log1p(encoded_test_data['收单收入'])
encoded_test_data['交易金额_log'] = np.log1p(encoded_test_data['交易金额'])
# 添加行业相关特征(简化版)
encoded_test_data['行业收入均值'] = encoded_test_data['收单收入'].mean()
encoded_test_data['行业交易均值'] = encoded_test_data['交易金额'].mean()
encoded_test_data['收入_行业相对'] = encoded_test_data['收单收入'] / (encoded_test_data['行业收入均值'] + 1e-6)
encoded_test_data['交易_行业相对'] = encoded_test_data['交易金额'] / (encoded_test_data['行业交易均值'] + 1e-6)
# 选择基础特征进行预测
base_feature_cols = ['行业大类_encoded', '行业小类_encoded', '所属省份_encoded', 'mcc',
'收单收入', '交易金额', '收入交易比', '单笔平均收入',
'收单收入_log', '交易金额_log', '收入_行业相对', '交易_行业相对']
X_test = encoded_test_data[base_feature_cols]
# 标准化(使用训练时的scaler)
X_test_scaled = preprocessor.scaler.transform(X_test)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=base_feature_cols, index=X_test.index)
print(f"测试特征矩阵形状: {X_test_scaled.shape}")
# 4. 进行预测
print("4. 使用优化集成模型进行预测...")
predictions = system.predict(X_test_scaled)
probabilities = system.predict_proba(X_test_scaled)
print(f"预测结果形状: {predictions.shape}")
# 5. 分析预测结果
print("5. 预测结果分析:")
product_names = ['产品1(E开票)', '产品2(小U产品)', '产品3(银杏认证)']
for i, product in enumerate(product_names):
positive_count = predictions[:, i].sum()
positive_rate = positive_count / len(predictions)
print(f" {product}: {positive_count} 个商户预测购买 ({positive_rate:.1%})")
# 6. 生成提交文件
print("6. 生成提交文件...")
# 创建比赛要求的格式
result_df = pd.DataFrame({
'商品名称': ['产品1', '产品2', '产品3']
})
# 添加每个商户的预测结果
for i in range(len(predictions)):
result_df[f'商户{i+1}'] = predictions[i]
# 保存结果
result_df.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"✅ 预测完成!结果已保存到 {output_file}")
# 显示结果预览
print(f"\n预测结果预览(前10个商户):")
print(result_df.iloc[:, :11].to_string())
# 保存详细的预测概率(用于分析)
prob_df = pd.DataFrame()
for i, product in enumerate(product_names):
prob_df[f'{product}_概率'] = probabilities[i][:, 1]
prob_df['商户ID'] = range(1, len(predictions) + 1)
prob_df.to_csv(f'prediction_probabilities_{output_file}', index=False)
print(f"✅ 预测概率已保存到 prediction_probabilities_{output_file}")
return result_df, predictions, probabilities
except Exception as e:
print(f"❌ 预测失败: {e}")
import traceback
traceback.print_exc()
return None, None, None
def main():
"""主函数"""
print("🎯 优化集成模型测试集预测工具")
# 检查模型文件
if not os.path.exists('final_optimized_system.pkl'):
print("❌ 未找到优化模型文件 'final_optimized_system.pkl'")
print("请先运行 'python final_optimized_model.py' 训练模型")
return
# 检查测试文件
test_files = ['data/test.xlsx', 'data/test.csv', 'test.xlsx', 'test.csv']
test_file = None
for file in test_files:
if os.path.exists(file):
test_file = file
break
if test_file:
print(f"找到测试文件: {test_file}")
result_df, predictions, probabilities = predict_test_set_optimized(test_file)
if result_df is not None:
print("\n🎉 预测成功完成!")
print(f"📊 使用了以下优化技术:")
print(f" ✅ 集成学习:XGBoost + LightGBM + RandomForest")
print(f" ✅ 阈值优化:产品1(0.84), 产品2(0.86), 产品3(0.16)")
print(f" ✅ 特征增强:25个优化特征")
print(f" ✅ 召回率平衡:三类产品召回率均衡")
print(f" ✅ 类别不平衡处理:产品3专门优化")
else:
print("❌ 未找到测试文件")
print("请将测试文件命名为以下之一并放在对应位置:")
for file in test_files:
print(f" - {file}")
if __name__ == "__main__":
main()