-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathbot.py
More file actions
2326 lines (2021 loc) · 121 KB
/
bot.py
File metadata and controls
2326 lines (2021 loc) · 121 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import os
import sys
import time
import concurrent.futures
import asyncio
import logging
import datetime
import re # For PIN detection
from pathlib import Path
import builtins
from unittest.mock import patch
import asyncio # Ensure asyncio is imported if not already globally
from functools import wraps
from typing import Optional, Dict, Any, Tuple, List
from dotenv import load_dotenv
import os
# --- DNS Resolver Patch ---
USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0"
try:
from pyrogram import Client, filters
from pyrogram.types import (
InlineKeyboardMarkup, InlineKeyboardButton, Message, CallbackQuery,
ForceReply
)
from pyrogram import enums
from pyrogram.errors import UserIsBlocked, FloodWait, InputUserDeactivated, UserDeactivated
except ImportError:
print("Error: Pyrofork not found. Install it: pip install pyrofork tgcrypto")
sys.exit(1)
try:
import motor.motor_asyncio
except ImportError:
print("Error: Motor not found. Install it: pip install motor")
sys.exit(1)
try:
from colorama import init, Fore, Style
init(autoreset=True) # Initialize Colorama for script logging if needed
except ImportError:
print("Colorama not found. Installing it is recommended: pip install colorama")
class DummyStyle:
def __getattr__(self, name): return ""
Fore = DummyStyle()
Style = DummyStyle()
# --- pyquotex Imports ---
try:
from quotexapi.stable_api import Quotex
from quotexapi.utils.processor import get_color # Optional
# Monkey patch target detection function (safer approach)
# --- Inside your script, replace the existing function ---
except ImportError:
print(f"{Fore.RED}Error: pyquotex library not found or import failed.")
print(f"{Fore.YELLOW}Please install it via pip:")
print(f"{Fore.CYAN}pip install git+https://github.com/cleitonleonel/pyquotex.git")
sys.exit(1)
except Exception as e:
print(f"{Fore.RED}Error during pyquotex import or patching setup: {e}")
sys.exit(1)
# --- Configuration (Load from environment variables or a config file) ---
# It's better practice to load these from environment or a separate config.py
# For simplicity in a single file as requested:
# Load environment variables from a .env file
load_dotenv()
API_ID = int(os.getenv("API_ID", 12345678)) # Replace with a default value if needed
API_HASH = os.getenv("API_HASH", "your_api_hash")
BOT_TOKEN = os.getenv("BOT_TOKEN", "your_bot_token")
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
OWNER_ID = int(os.getenv("OWNER_ID", 987654321)) # Replace with a default value if needed
# Basic Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger("pyrofork").setLevel(logging.WARNING) # Reduce pyrogram verbosity
try:
import dns.resolver
if hasattr(dns.resolver, 'get_default_resolver'):
dns.resolver.get_default_resolver().nameservers=['8.8.8.8', '1.1.1.1']
else:
dns.resolver.default_resolver=dns.resolver.Resolver(configure=False)
dns.resolver.default_resolver.nameservers=['8.8.8.8', '1.1.1.1']
logger.info("Applied DNS resolver patch.")
except ImportError:
logger.warning("dnspython not installed. Skipping DNS resolver patch.")
except Exception as e:
logger.error(f"Error applying DNS resolver patch: {e}")
# --- Third-party Libraries ---
# --- Global Variables & Bot Initialization ---
bot_instance: Optional[Client] = None
db = None # Database client
main_event_loop = None # <<< ADD THIS GLOBAL VARIABLE
users_db = None # Collection for users, roles, basic settings
quotex_accounts_db = None # Collection for Quotex credentials
trade_settings_db = None # Collection for trade settings per user/account
# Temporary storage for OTP requests: {user_id: {'qx_client': qx_client_instance, 'event': asyncio.Event()}}
active_otp_requests: Dict[int, Dict[str, Any]] = {}
# Temporary storage for ongoing user actions (e.g., waiting for broadcast message)
user_states: Dict[int, str] = {} # e.g., {user_id: "waiting_broadcast_message"}
# Default Quotex Settings (can be overridden from DB)
DEFAULT_TRADE_AMOUNT = 5
DEFAULT_TRADE_DURATION = 60 # For Timer/Time mode number
DEFAULT_TRADE_MODE = "TIMER" # 'TIMER' or 'TIME'
DEFAULT_CANDLE_SIZE = 60
DEFAULT_SERVICE_STATUS = False # Trading Off by default
MARTINGALE_MULTIPLIER = 2.0
MAX_CONSECUTIVE_LOSSES = 3
COOLDOWN_MINUTES = 3
# --- Database Setup ---
async def setup_database():
"""Initializes MongoDB connection and collections."""
global db, users_db, quotex_accounts_db, trade_settings_db
try:
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URI)
db = client['quotexTraderBot'] # Database name
users_db = db['users']
quotex_accounts_db = db['quotex_accounts']
trade_settings_db = db['trade_settings']
# Create indexes for faster lookups
await users_db.create_index("user_id", unique=True)
await quotex_accounts_db.create_index([("user_id", 1), ("email", 1)], unique=True)
await trade_settings_db.create_index("account_doc_id", unique=True) # Link to quotex account document
logger.info("Database connection successful and collections initialized.")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}", exc_info=True)
sys.exit(1)
# --- Database Helper Functions ---
async def add_user_if_not_exists(user_id: int):
"""Adds a user to the database if they aren't already there."""
if users_db is None:
logger.error("Users DB not initialized.")
return None
user_data = await users_db.find_one({"user_id": user_id})
if not user_data:
is_owner = (user_id == OWNER_ID)
new_user = {
"user_id": user_id,
"is_sudo": is_owner, # Owner is automatically sudo
"is_premium": is_owner, # Owner is automatically premium
"join_date": datetime.datetime.now(datetime.timezone.utc)
}
await users_db.insert_one(new_user)
logger.info(f"New user added to DB: {user_id}")
return new_user
return user_data
async def get_user(user_id: int):
"""Retrieves user data from DB."""
if users_db is None: return None
return await users_db.find_one({"user_id": user_id})
async def is_sudo_user(user_id: int) -> bool:
"""Checks if a user has sudo privileges."""
if user_id == OWNER_ID: return True
if users_db is None: return False
user_data = await users_db.find_one({"user_id": user_id})
return user_data and user_data.get("is_sudo", False)
async def is_premium_user(user_id: int) -> bool:
"""Checks if a user is premium."""
if user_id == OWNER_ID: return True # Owner always premium
if users_db is None: return False
user_data = await users_db.find_one({"user_id": user_id})
# Allow sudo users to access premium features too
return user_data and (user_data.get("is_premium", False) or user_data.get("is_sudo", False))
async def set_user_role(target_user_id: int, role: str, status: bool):
"""Sets 'is_sudo' or 'is_premium' status for a user."""
if users_db is None: return False
if role not in ["is_sudo", "is_premium"]: return False
result = await users_db.update_one(
{"user_id": target_user_id},
{"$set": {role: status}}
)
return result.modified_count > 0
async def get_all_user_ids() -> list[int]:
"""Gets a list of all user IDs from the DB."""
if users_db is None: return []
users_cursor = users_db.find({}, {"_id": 0, "user_id": 1})
return [user["user_id"] async for user in users_cursor]
async def get_role_user_ids(role: str) -> list[int]:
"""Gets user IDs for a specific role (is_sudo or is_premium)."""
if users_db is None or role not in ["is_sudo", "is_premium"]: return []
users_cursor = users_db.find({role: True}, {"_id": 0, "user_id": 1})
return [user["user_id"] async for user in users_cursor]
# --- Quotex Account DB Functions ---
async def add_quotex_account(user_id: int, email: str, password: str):
"""Adds a Quotex account credential for a user."""
if quotex_accounts_db is None: return False
try:
# Consider encrypting the password before storing!
await quotex_accounts_db.insert_one({
"user_id": user_id,
"email": email.lower(), # Store email in lowercase for consistency
"password": password, # WARNING: Storing plain text password!
"added_date": datetime.datetime.now(datetime.timezone.utc)
})
return True
except Exception as e: # Likely duplicate key error if email already exists for user
logger.warning(f"Failed to add Quotex account {email} for user {user_id}: {e}")
return False
async def get_user_quotex_accounts(user_id: int) -> List[Dict[str, Any]]:
"""Gets all Quotex accounts associated with a user."""
if quotex_accounts_db is None: return []
accounts_cursor = quotex_accounts_db.find({"user_id": user_id}, {"_id": 1, "email": 1}) # Fetch ID and email
return await accounts_cursor.to_list(length=None) # Get all accounts
async def get_quotex_account_details(account_doc_id: str) -> Optional[Dict[str, Any]]:
"""Gets full details of a specific Quotex account by its DB document ID."""
from bson import ObjectId
if quotex_accounts_db is None: return None
try:
return await quotex_accounts_db.find_one({"_id": ObjectId(account_doc_id)})
except Exception:
return None
async def delete_quotex_account(account_doc_id: str) -> bool:
"""Deletes a Quotex account and its associated settings."""
from bson import ObjectId
if quotex_accounts_db is None or trade_settings_db is None: return False
try:
# Delete account credentials
delete_acc_result = await quotex_accounts_db.delete_one({"_id": ObjectId(account_doc_id)})
# Delete associated trade settings
await trade_settings_db.delete_many({"account_doc_id": ObjectId(account_doc_id)}) # Use delete_many for safety
return delete_acc_result.deleted_count > 0
except Exception as e:
logger.error(f"Error deleting account {account_doc_id}: {e}", exc_info=True)
return False
# --- Trade Settings DB Functions ---
async def get_or_create_trade_settings(account_doc_id: str) -> Dict[str, Any]:
"""Gets trade settings for a Quotex account, creating defaults if none exist."""
from bson import ObjectId
if trade_settings_db is None:
raise ConnectionError("Trade settings DB not initialized.")
settings = await trade_settings_db.find_one({"account_doc_id": ObjectId(account_doc_id)})
if not settings:
settings = {
"account_doc_id": ObjectId(account_doc_id),
"account_mode": "PRACTICE", # PRACTICE / REAL
"trade_mode": DEFAULT_TRADE_MODE, # TIMER / TIME
"candle_size": DEFAULT_CANDLE_SIZE, # seconds
"service_status": DEFAULT_SERVICE_STATUS, # Trading on/off (boolean)
"assets": [], # List of dicts: {'name': str, 'amount': int, 'duration': int}
# Martingale state can also be stored here per asset if needed for persistence
"martingale_state": {}, # { asset_name: {'current_amount': float, 'consecutive_losses': int}}
"cooldown_until": 0.0, # Timestamp
"last_updated": datetime.datetime.now(datetime.timezone.utc)
}
await trade_settings_db.insert_one(settings)
logger.info(f"Created default trade settings for account_doc_id {account_doc_id}")
# Ensure all default keys exist in case new settings are added later
defaults = {
"account_mode": "PRACTICE", "trade_mode": DEFAULT_TRADE_MODE,
"candle_size": DEFAULT_CANDLE_SIZE, "service_status": DEFAULT_SERVICE_STATUS,
"assets": [], "martingale_state": {}, "cooldown_until": 0.0
}
updated = False
for key, default_value in defaults.items():
if key not in settings:
settings[key] = default_value
updated = True
if updated:
await update_trade_setting(account_doc_id, settings) # Save potentially added default keys
return settings
async def update_trade_setting(account_doc_id: str, update_data: dict):
"""Updates specific trade settings for a Quotex account."""
from bson import ObjectId
if trade_settings_db is None: return False
update_data["last_updated"] = datetime.datetime.now(datetime.timezone.utc)
result = await trade_settings_db.update_one(
{"account_doc_id": ObjectId(account_doc_id)},
{"$set": update_data},
upsert=True # Create if somehow missing, though get_or_create should handle it
)
return result.modified_count > 0 or result.upserted_id is not None
# --- Permission Decorators ---
def owner_only(func):
@wraps(func)
async def wrapper(client: Client, update: Message | CallbackQuery):
user_id = update.from_user.id
if user_id != OWNER_ID:
if isinstance(update, Message):
await update.reply_text("⛔️ Access Denied: Only the bot owner can use this command.")
elif isinstance(update, CallbackQuery):
await update.answer("⛔️ Access Denied: Owner only.", show_alert=True)
return None # Indicate failure or stop processing
return await func(client, update)
return wrapper
def sudo_only(func):
@wraps(func)
async def wrapper(client: Client, update: Message | CallbackQuery):
user_id = update.from_user.id
if not await is_sudo_user(user_id):
if isinstance(update, Message):
await update.reply_text("⛔️ Access Denied: You need Sudo privileges for this.")
elif isinstance(update, CallbackQuery):
await update.answer("⛔️ Access Denied: Sudo privileges required.", show_alert=True)
return None
return await func(client, update)
return wrapper
def premium_only(func):
@wraps(func)
async def wrapper(client: Client, update: Message | CallbackQuery):
user_id = update.from_user.id
if not await is_premium_user(user_id):
if isinstance(update, Message):
await update.reply_text("⛔️ Access Denied: This feature requires a Premium subscription or Sudo privileges.")
elif isinstance(update, CallbackQuery):
await update.answer("⛔️ Access Denied: Premium or Sudo required.", show_alert=True)
return None
return await func(client, update)
return wrapper
# --- Restore/Keep the ASYNC helper function ---
async def handle_potential_pin_input(prompt: str) -> Optional[str]:
"""
This ASYNC function is called by our patched input ONLY when
the specific PIN prompt is detected. It handles the bot interaction.
"""
target_prompt = "Insira o código PIN que acabamos de enviar para o seu e-mail:"
logger.debug(f"handle_potential_pin_input received prompt: '{prompt}'")
if target_prompt in prompt:
logger.critical(f"--- !!! BUILTIN INPUT PATCH TRIGGERED FOR PIN !!! Prompt: '{prompt}'")
global bot_instance, active_otp_requests
user_id = None
qx_client_instance = None
for uid, data in active_otp_requests.items():
# Assuming the current call belongs to the context we just added
user_id = uid
qx_client_instance = data.get('qx_client')
logger.info(f"Found potential user_id {user_id} and client {id(qx_client_instance)} from active_otp_requests.")
break
if not bot_instance:
logger.error("CRITICAL: Cannot ask for PIN via patched input - bot_instance is None!")
return None
if not user_id:
logger.error(f"CRITICAL: Cannot ask for PIN via patched input - no user_id found in active_otp_requests. State: {active_otp_requests}")
return None
if not qx_client_instance:
logger.error(f"CRITICAL: Cannot ask for PIN via patched input - no qx_client found for user {user_id}.")
return None
pin_code = None
try:
logger.info(f"Asking user {user_id} for PIN via bot.ask() [from patched input]. Timeout: 120s")
pin_message = await bot_instance.ask(
chat_id=user_id,
text=f"❗️ **QUOTEX 2FA REQUIRED** ❗️\n\n"
f"To log in to `{qx_client_instance.email}`, Quotex needs the PIN code sent to your email.\n\n"
f"**Prompt:**\n`{prompt}`\n\n"
f"➡️ Please reply to **this message** with the **PIN code only**.",
timeout=600, # 2 minutes timeout
)
if pin_message and pin_message.text:
pin_code = pin_message.text.strip()
if not pin_code.isdigit(): # Optional check
logger.warning(f"User {user_id} entered non-digit PIN '{pin_code}'. Using it anyway.")
logger.info(f"Received PIN '{pin_code}' from user {user_id} via patched input.")
await pin_message.delete() # Optional: delete the message after reading
return pin_code # Return the actual PIN
else:
logger.warning(f"User {user_id} did not provide a PIN response message [via patched input].")
await bot_instance.send_message(user_id, "❓ Did not receive a PIN response. Login failed.")
return "" # Return empty string maybe better than None for input()?
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for PIN from user {user_id} [via patched input].")
try: await bot_instance.send_message(user_id, "⏳ PIN request timed out (2 minutes). Login failed.")
except Exception: pass
return "" # Return empty string on timeout
except Exception as e:
logger.error(f"Error occurred in bot.ask() while getting PIN [via patched input] from {user_id}: {e}", exc_info=True)
try: await bot_instance.send_message(user_id, f"❌ An error occurred while processing your PIN: {e}\nLogin failed.")
except Exception: pass
# Raise exception to clearly signal failure in wrapper
raise ConnectionError("Failed to get PIN via Telegram interaction.") from e
else:
# IMPORTANT: If prompt not recognised, call original input
logger.warning(f"Patched input called with UNEXPECTED prompt: '{prompt}'. Falling back to original input.")
return original_builtin_input(prompt) # This will likely hang bot
# --- Restore/Keep the SYNC input_wrapper function ---
# Store original input safely AT MODULE LEVEL
original_builtin_input = builtins.input
patch_state = {'expecting_pin': False} # Manage patch activation state
def input_wrapper(prompt=""):
"""Synchronous wrapper that replaces input. Calls async handler if needed."""
global patch_state # Access the global state flag
target_prompt_substr = "Insira o código PIN"
# Check if we are *expecting* the PIN and if the prompt matches
if target_prompt_substr in prompt and patch_state.get('expecting_pin', False):
logger.info(f"Input wrapper intercepting prompt: '{prompt}'")
if not main_event_loop:
logger.error("CRITICAL: Main event loop not available in input_wrapper!")
# Decide how to fail: raise error or return empty? Raising is cleaner.
raise RuntimeError("Cannot handle PIN input: Main event loop not set.")
if not main_event_loop.is_running():
logger.error("CRITICAL: Main event loop is not running in input_wrapper!")
raise RuntimeError("Cannot handle PIN input: Main event loop not running.")
# Prepare the coroutine to run
coro = handle_potential_pin_input(prompt) # Pass the prompt
# Schedule the coroutine on the main loop from this (likely worker) thread
# This returns a concurrent.futures.Future, NOT an asyncio.Future
future = asyncio.run_coroutine_threadsafe(coro, main_event_loop)
logger.debug(f"Scheduled async PIN handler on loop {id(main_event_loop)}. Waiting for result...")
try:
# Block *this thread* (waiting for input) until the coroutine completes
# Add a reasonable timeout (e.g., slightly longer than your bot.listen timeout)
pin_result = future.result(timeout=130) # Wait up to 130 seconds
logger.info(f"Async PIN handler returned: {type(pin_result)} '{pin_result}'")
# Return the pin (string) or empty string if it failed/timed out internally
return pin_result if pin_result is not None else ""
except concurrent.futures.TimeoutError:
logger.error("Timeout waiting for async PIN handler result in input_wrapper.")
# Optional: Try to cancel the coroutine if it's still running
# future.cancel() # May not work reliably depending on coro state
# Propagate the timeout or return empty/raise custom error
raise TimeoutError("Timed out waiting for PIN input via Telegram.") from None
except Exception as e:
# This catches exceptions raised *inside* handle_potential_pin_input OR
# errors during scheduling/retrieval.
logger.error(f"Exception occurred retrieving async PIN handler result in input_wrapper: {e}", exc_info=True)
# Propagate the exception so get_quotex_client knows it failed clearly
# Wrap it maybe?
raise ConnectionError("Failed to get PIN via Telegram interaction.") from e
finally:
# Optional: Log when the wait finishes
logger.debug("Exiting input_wrapper after waiting for future.")
else:
# If not expecting PIN or prompt mismatch, use original input
logger.warning(f"Input wrapper calling original input for prompt: '{prompt}' (expecting_pin={patch_state.get('expecting_pin')})")
return original_builtin_input(prompt)
active_quotex_clients: Dict[str, Quotex] = {}
# --- REPLACE the get_quotex_client function (using the AGGRESSIVE timing with CORRECT patch function) ---
async def get_quotex_client(user_id: int, account_doc_id: str, interaction_type: str = "info") -> Tuple[Optional[Quotex], str]:
"""
Gets or creates a connected Quotex client instance for an account.
Uses AGGRESSIVE patching on builtins.input + ASYNC handler for PIN prompts.
"""
global active_quotex_clients, active_otp_requests, bot_instance, patch_state # Ensure patch_state is global
# --- Cache check logic ---
if account_doc_id in active_quotex_clients:
# ... (Existing cache logic) ...
logger.info(f"Reusing cached client for {account_doc_id}")
return active_quotex_clients[account_doc_id], "Reused existing client (cache)."
# --- Get account details ---
logger.info(f"Fetching Quotex account details for Doc ID: {account_doc_id} (User: {user_id})")
account_details = await get_quotex_account_details(account_doc_id)
if not account_details: return None, " Quotex account details not found in DB."
email = account_details["email"]
password = account_details["password"]
# temp_session_path = f"session_{user_id}_{account_doc_id}"
# # Ensure the directory for session files exists
# session_dir = Path(temp_session_path).parent
# session_dir.mkdir(parents=True, exist_ok=True)
qx_client: Optional[Quotex] = None
connection_check = False
connection_reason = "Initialization error"
patcher = None
is_patched = False
try:
# --- Apply AGGRESSIVE patch BEFORE creating instance ---
logger.warning("Applying AGGRESSIVE builtins.input patch (using input_wrapper)...")
# *** USE THE CORRECT WRAPPER ***
patcher = patch('builtins.input', input_wrapper)
patcher.start()
is_patched = True
logger.info("Aggressive builtins.input patch STARTED.")
# Create instance UNDER the patch
logger.info(f"Creating new Quotex client instance for {email} UNDER PATCH")
qx_client = Quotex(email=email, password=password)
# Add context *before* connect call
logger.info(f"Adding user {user_id} to active_otp_requests BEFORE connect (under patch).")
if not bot_instance: raise ConnectionAbortedError("Bot instance not available for OTP context.")
active_otp_requests[user_id] = {'qx_client': qx_client, 'doc_id': account_doc_id}
# Activate patch state
patch_state['expecting_pin'] = True
logger.info("Patch state set to expect PIN.")
# Call connect UNDER the patch
logger.info(f"Attempting connection for {email} (aggressive patch active)...")
# Run the connection process in a separate thread to avoid blocking the main event loop
def connect_in_thread():
"""Wrapper to run the connect method in a thread."""
return asyncio.run(qx_client.connect())
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(connect_in_thread)
try:
connection_check, connection_reason = await asyncio.wait_for(
asyncio.wrap_future(future), # Wrap the thread future for asyncio compatibility
timeout=180.0
)
except asyncio.TimeoutError:
logger.error("Connection attempt timed out.")
connection_check, connection_reason = False, "Timeout during connection"
except Exception as e:
logger.error(f"Error during connection in thread: {e}", exc_info=True)
connection_check, connection_reason = False, str(e)
# Deactivate patch state immediately after
patch_state['expecting_pin'] = False
logger.info("Patch state set to NOT expect PIN.")
logger.info(f"Connect() call finished (aggressive patch active).")
logger.info(f"Connection attempt finished. Result Check: {connection_check}, Reason: '{connection_reason}'")
# --- Handle connection results ---
if connection_check:
# ... (Success logic: log, switch mode, add to cache) ...
logger.info(f"Quotex connection successful for {email}.")
settings = await get_or_create_trade_settings(account_doc_id)
account_mode = settings.get("account_mode", "PRACTICE")
try:
qx_client.change_account(account_mode)
logger.info(f"Switched Quotex account {email} to {account_mode} mode.")
except Exception as e_mode:
logger.error(f"Failed to switch account {email} to {account_mode}: {e_mode}", exc_info=True)
active_quotex_clients[account_doc_id] = qx_client
return qx_client, f"Connected but failed to switch to {account_mode} mode."
active_quotex_clients[account_doc_id] = qx_client
return qx_client, f"Connected successfully in {account_mode} mode."
else:
# ... (Failure logic: check reasons, handle Invalid credentials, Token rejected, PIN/Auth errors) ...
logger.error(f"Quotex connection explicitly failed for {email}. Reason: {connection_reason}")
reason_str = str(connection_reason) if connection_reason else "Unknown reason"
# (Include all failure checks from previous versions)
if "Invalid credentials" in reason_str and interaction_type == "login_attempt":
await delete_quotex_account(account_doc_id)
return None, "Connection Failed: Invalid Credentials. Removed entry."
elif "check your email" in reason_str.lower() or "verifique seu e-mail" in reason_str.lower() or "pin" in reason_str.lower():
return None, f"Connection Failed: Authentication error ({reason_str}). Check PIN/email or account status."
elif "Token rejected" in reason_str:
# ...(delete session file logic)...
return None, "Connection Failed: Token rejected. Session deleted."
else:
return None, f"Connection Failed: {reason_str}"
except asyncio.TimeoutError:
logger.error(f"Connection attempt for {email} timed out overall (aggressive patch).")
patch_state['expecting_pin'] = False
return None, "Connection Failed: Timed out during connection/authentication."
except ConnectionAbortedError as cae: # Bot context missing error
logger.error(f"Connection aborted for {email}: {cae}")
patch_state['expecting_pin'] = False
return None, f"Connection Failed: {cae}"
except ConnectionError as ce: # Error explicitly raised by PIN handling failure
logger.error(f"ConnectionError during PIN handling for {email}: {ce}")
patch_state['expecting_pin'] = False
# Provide the clearer error from the PIN handler
return None, f"Connection Failed: Error during PIN retrieval ({ce})"
except Exception as e:
logger.error(f"Unexpected error during Quotex connect/setup for {email} (aggressive patch): {e}", exc_info=True)
patch_state['expecting_pin'] = False
if qx_client:
try: await qx_client.close()
except: pass
return None, f"Connection Failed: An unexpected error occurred ({type(e).__name__}). Check logs."
finally:
# --- ALWAYS CLEANUP ---
patch_state['expecting_pin'] = False # Reset patch state
logger.debug(f"Running FINALLY block for get_quotex_client (aggressive patch w/ handler)")
# Stop the patch
if is_patched and patcher:
try:
patcher.stop()
logger.info("Aggressive builtins.input patch STOPPED.")
except Exception as stop_err:
logger.error(f"Error stopping aggressive patch: {stop_err}")
# Cleanup OTP context dict
if user_id in active_otp_requests:
if active_otp_requests[user_id].get('doc_id') == account_doc_id:
logger.info(f"Removing user {user_id} from active_otp_requests in finally.")
del active_otp_requests[user_id]
else:
logger.warning(f"Context mismatch during finally cleanup for {user_id}/{account_doc_id}.")
# ----------------------------------------------------
async def disconnect_quotex_client(account_doc_id: str):
"""Disconnects and removes a Quotex client instance."""
global active_quotex_clients
if account_doc_id in active_quotex_clients:
client = active_quotex_clients[account_doc_id]
logger.info(f"Disconnecting Quotex client for {account_doc_id}...")
try:
await client.close() # Assuming close is async
except Exception as e:
logger.warning(f"Error closing Quotex client for {account_doc_id}: {e}")
del active_quotex_clients[account_doc_id]
logger.info(f"Removed Quotex client instance for {account_doc_id}.")
# --- Bot UI Buttons ---
async def main_menu_keyboard(user_id: int) -> InlineKeyboardMarkup:
"""Generates the main menu keyboard."""
keyboard = [
[InlineKeyboardButton("➕ Add Quotex Account", callback_data="quotex_add")],
[InlineKeyboardButton("👤 My Quotex Accounts", callback_data="quotex_list")],
]
# Dynamic buttons based on role
# Add settings etc. later
keyboard.append([InlineKeyboardButton("⚙️ Settings", callback_data="settings_main")])
keyboard.append([InlineKeyboardButton("Trading Dashboard", callback_data="trade_dashboard")]) # Maybe
# Check role ASYNCHRONOUSLY
is_user_sudo = await is_sudo_user(user_id) # Correct use of await
if is_user_sudo:
keyboard.append([InlineKeyboardButton("👑 Admin Panel", callback_data="admin_panel")])
keyboard.append([InlineKeyboardButton("❓ Help", callback_data="help")])
return InlineKeyboardMarkup(keyboard)
def back_button(callback_data="main_menu"):
return [InlineKeyboardButton("⬅️ Back", callback_data=callback_data)]
def account_management_keyboard(account_doc_id: str, settings: Dict) -> InlineKeyboardMarkup:
"""Keyboard for managing a specific Quotex account."""
trading_status = "ON" if settings.get('service_status', False) else "OFF"
toggle_trading_text = f"🔴 Stop Trading" if trading_status == "ON" else f"🟢 Start Trading"
keyboard = [
[
InlineKeyboardButton("📊 Get Profile", callback_data=f"qx_profile:{account_doc_id}"),
InlineKeyboardButton("💰 Get Balance", callback_data=f"qx_balance:{account_doc_id}")
],
[
InlineKeyboardButton("💱 Manage Assets", callback_data=f"asset_manage:{account_doc_id}"),
InlineKeyboardButton(f"Mode: {settings.get('trade_mode', 'N/A')}", callback_data=f"set_tmode:{account_doc_id}"),
],
[
InlineKeyboardButton(f"Candle: {settings.get('candle_size', 'N/A')}s", callback_data=f"set_csize:{account_doc_id}"),
InlineKeyboardButton(f"Acct: {settings.get('account_mode', 'N/A')}", callback_data=f"set_amode:{account_doc_id}"),
],
[
InlineKeyboardButton(toggle_trading_text, callback_data=f"toggle_trade:{account_doc_id}"),
],
[InlineKeyboardButton("🗑 Delete Account", callback_data=f"qx_delete_confirm:{account_doc_id}")],
back_button("quotex_list") # Back to account list
]
return InlineKeyboardMarkup(keyboard)
def admin_panel_keyboard() -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("📢 Broadcast", callback_data="admin_broadcast"),
InlineKeyboardButton("👥 List Users", callback_data="admin_list_users")
],
[
InlineKeyboardButton("⭐ Manage Sudo", callback_data="admin_manage_sudo"),
InlineKeyboardButton("💎 Manage Premium", callback_data="admin_manage_premium")
],
back_button("main_menu")
]
return InlineKeyboardMarkup(keyboard)
def manage_role_keyboard(role_name: str) -> InlineKeyboardMarkup: # role_name = "Sudo" or "Premium"
role_prefix = role_name.lower()
keyboard = [
[
InlineKeyboardButton(f"➕ Add {role_name}", callback_data=f"admin_add_{role_prefix}"),
InlineKeyboardButton(f"➖ Remove {role_name}", callback_data=f"admin_remove_{role_prefix}")
],
[
InlineKeyboardButton(f"📄 List {role_name} Users", callback_data=f"admin_list_{role_prefix}")
],
back_button("admin_panel")
]
return InlineKeyboardMarkup(keyboard)
# --- Command Handlers ---
@Client.on_message(filters.command("start") & filters.private)
async def start_command(client: Client, message: Message):
global bot_instance # Store the client instance
if not bot_instance: bot_instance = client
user_id = message.from_user.id
await add_user_if_not_exists(user_id)
logger.info(f"User {user_id} ({message.from_user.first_name}) started the bot.")
welcome_text = f"👋 Welcome, {message.from_user.mention}!\n\n" \
f"This bot helps you interact with your Quotex account(s).\n\n" \
f"Use the buttons below to navigate."
await message.reply_text(
welcome_text,
reply_markup=await main_menu_keyboard(user_id),
quote=True
)
@Client.on_message(filters.command("help") & filters.private)
async def help_command(client: Client, message: Message):
# Add more detailed help information here
help_text = """
**ℹ️ Bot Help & Information**
This bot allows you to:
- Add and manage multiple Quotex accounts.
- Check account profile and balance.
- Manage assets for trading.
- Configure trade settings (Mode, Candle Size, Account Type).
- Toggle automated trading (Premium feature, requires setup).
- (Admin) Manage users and broadcast messages.
**Key Features:**
- **➕ Add Quotex Account:** Securely add your credentials (Requires 2FA/PIN verification via bot).
- **👤 My Quotex Accounts:** View and manage your linked accounts and their specific settings.
- **⚙️ Settings:** Configure bot or global preferences (if applicable).
- **👑 Admin Panel:** (For Owner/Sudo) Access user management and broadcast tools.
**Important Notes:**
- **Security:** While we try to be secure, storing credentials always has risks. Be cautious.
- **Quotex API:** This bot uses the `pyquotex` library, which interacts with Quotex in ways that might be unofficial. Use at your own risk. API changes can break functionality.
- **Trading:** Automated trading involves significant financial risk. Ensure you understand the strategy and risks before enabling it.
Use the buttons or contact the owner if you need further assistance.
"""
await message.reply_text(
help_text,
reply_markup=InlineKeyboardMarkup([back_button("main_menu")]),
quote=True
)
@Client.on_message(filters.command("broadcast") & filters.private)
@owner_only # Or use @sudo_only if Sudo can also broadcast
async def broadcast_command_handler(client: Client, message: Message):
global user_states
user_id = message.from_user.id
user_states[user_id] = "waiting_broadcast_message"
await message.reply_text(
"Okay, send me the message you want to broadcast.\n"
"You can use text, photos, videos, documents, formatting, etc.\n"
"Send /cancel to abort.",
reply_markup=ForceReply(selective=True),
quote=True
)
# --- Callback Query Handler (Main Router) ---
@Client.on_callback_query()
async def callback_query_handler(client: Client, callback_query: CallbackQuery):
global bot_instance # Store the client instance
if not bot_instance: bot_instance = client
user_id = callback_query.from_user.id
data = callback_query.data
message = callback_query.message # The message where the button was clicked
# --- Acknowledge Callback ---
try:
await callback_query.answer() # Acknowledge the button press
except Exception as e:
logger.warning(f"Failed to answer callback query: {e}")
# --- Main Menu Navigation ---
if data == "main_menu":
await message.edit_text(
f"👋 Welcome back, {callback_query.from_user.mention}!\n\nChoose an option:",
reply_markup=await main_menu_keyboard(user_id)
)
elif data == "help":
await help_command(client, message) # Reuse help command logic on the message object
# We need to edit the original message, not send a new one if called from button
await message.edit_reply_markup(reply_markup=InlineKeyboardMarkup([back_button("main_menu")])) # Keep help text, add back btn
# --- Quotex Account Management ---
elif data == "quotex_add":
await callback_query.message.reply_text(
"Let's add a new Quotex account.\n"
"Please reply with the **Email Address** for the account.\n"
"Send /cancel to abort.",
reply_markup=ForceReply(selective=True) # Ask for reply
)
user_states[user_id] = "waiting_qx_email"
elif data == "quotex_list":
accounts = await get_user_quotex_accounts(user_id)
if not accounts:
await message.edit_text(
"You haven't added any Quotex accounts yet.",
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("➕ Add Account Now", callback_data="quotex_add")],
back_button("main_menu")
])
)
else:
keyboard = []
for acc in accounts:
# Ensure '_id' is retrieved and is ObjectId, then convert to string
acc_id_str = str(acc['_id'])
keyboard.append([InlineKeyboardButton(f"👤 {acc['email']}", callback_data=f"qx_manage:{acc_id_str}")])
keyboard.append(back_button("main_menu"))
await message.edit_text(
"Select a Quotex account to manage:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
elif data.startswith("qx_manage:"):
account_doc_id = data.split(":")[1]
account_details = await get_quotex_account_details(account_doc_id)
if not account_details or account_details["user_id"] != user_id:
await message.edit_text("Error: Account not found or access denied.", reply_markup=InlineKeyboardMarkup([back_button("quotex_list")]))
return
# Fetch current settings for this account to display in buttons
settings = await get_or_create_trade_settings(account_doc_id)
await message.edit_text(
f"Managing account: **{account_details['email']}**\n"
f"Select an action:",
reply_markup=account_management_keyboard(account_doc_id, settings)
)
elif data.startswith("qx_profile:") or data.startswith("qx_balance:"):
action = "profile" if data.startswith("qx_profile:") else "balance"
account_doc_id = data.split(":")[1]
account_details = await get_quotex_account_details(account_doc_id)
if not account_details or account_details["user_id"] != user_id:
await message.edit_text("Error: Account not found or access denied.", reply_markup=InlineKeyboardMarkup([back_button(f"qx_manage:{account_doc_id}")]))
return
# Attempt to connect or get existing client
await callback_query.edit_message_text("🔄 Connecting to Quotex and fetching data...")
qx_client, status_msg = await get_quotex_client(user_id, account_doc_id, interaction_type=action)
text = f"Managing account: **{account_details['email']}**\nStatus: {status_msg}\n\n"
settings = await get_or_create_trade_settings(account_doc_id) # Needed for keyboard refresh
if qx_client:
try:
if action == "profile":
profile = await qx_client.get_profile() # Profile often includes both balances
if profile:
#current_mode = qx_client.account_type # Check instance mode
#balance_val = profile.live_balance if current_mode == 'REAL' else profile.demo_balance
text += f"**🆔 ID: `{profile.profile_id}`**\n"
text += f"**💰 Current Balance:**\n\n"
text += f" - 🪙 Demo: `{float(profile.demo_balance)}`\n"
text += f" - 💵 Real: `{float(profile.live_balance):.2f}`\n"
text += f"**👤 User Name: {profile.nick_name}**\n"
text += f"**🖼️ Avatar: {profile.avatar}**\n"
text += f"**🌍 Country: {profile.country_name}**\n"
else:
text += "❌ Failed to retrieve profile details."
elif action == "balance":
# Get current settings for account mode before fetching balance
#settings = await get_or_create_trade_settings(account_doc_id)
#current_mode = settings.get("account_mode", "PRACTICE")
# Balance from profile is often sufficient and quicker
#balance = await qx_client.get_balance() # Uses currently set mode
profile = await qx_client.get_profile() # Profile often includes both balances
if profile:
#current_mode = qx_client.account_type # Check instance mode
#balance_val = profile.live_balance if current_mode == 'REAL' else profile.demo_balance
text += f"**💰 Current Balance:**\n\n"
text += f" - Demo: `{profile.demo_balance}`\n"
text += f" - Real: `{profile.live_balance}`"
else:
text += "❌ Failed to retrieve balance information (could not get profile)."
# Option: Disconnect immediately after action? Or keep client alive?
# await disconnect_quotex_client(account_doc_id) # Disconnect now
except Exception as e:
text += f"❌ Error getting {action} data: {e}"
logger.error(f"Error during Quotex {action} for {account_doc_id}: {e}", exc_info=True)
# Consider disconnecting on error?
# await disconnect_quotex_client(account_doc_id)
else:
text += "\nCould not perform action." # Status message already added
# Edit the message with results and the management keyboard
await message.edit_text(text, reply_markup=account_management_keyboard(account_doc_id, settings))
# --- Asset Management ---
elif data.startswith("asset_manage:"):
# TODO: Implement Asset Management UI (Add, Remove, List)
account_doc_id = data.split(":")[1]
settings = await get_or_create_trade_settings(account_doc_id)
assets = settings.get("assets", [])
text = f"**💱 Asset Management for Account**\n"
if not assets:
text += "\nNo assets configured yet."
else:
text += "\nCurrent Assets:\n"
for i, asset in enumerate(assets):
text += f"{i+1}. `{asset['name']}` (Amt: {asset['amount']}, Dur: {asset['duration']}s)\n"
keyboard = [
[InlineKeyboardButton("➕ Add Asset", callback_data=f"asset_add:{account_doc_id}")],
# Add buttons to remove specific assets if list is not empty
]
if assets:
keyboard.append([InlineKeyboardButton("➖ Remove Asset", callback_data=f"asset_remove_select:{account_doc_id}")]) # Leads to selection
keyboard.append(back_button(f"qx_manage:{account_doc_id}"))
await message.edit_text(text, reply_markup=InlineKeyboardMarkup(keyboard))
elif data.startswith("asset_add:"):
account_doc_id = data.split(":")[1]
user_states[user_id] = f"waiting_asset_add:{account_doc_id}"
await message.reply_text(
"Enter the asset details to add.\n"
"Format: `ASSET_NAME,Amount,Duration`\n"
f"Example: `EURUSD_otc,{DEFAULT_TRADE_AMOUNT},{DEFAULT_TRADE_DURATION}`\n"
"(Amount and Duration are optional, defaults will be used if omitted)\n"
"Send /cancel to abort.",
reply_markup=ForceReply(selective=True),
parse_mode=enums.ParseMode.DEFAULT
)
elif data.startswith("asset_remove_select:"):
account_doc_id = data.split(":")[1]
settings = await get_or_create_trade_settings(account_doc_id)
assets = settings.get("assets", [])
if not assets:
await message.edit_text("No assets to remove.", reply_markup=InlineKeyboardMarkup([back_button(f"asset_manage:{account_doc_id}")]))
return
keyboard = []
for i, asset in enumerate(assets):
# Store index in callback data for removal
keyboard.append([InlineKeyboardButton(f"❌ Remove {asset['name']}", callback_data=f"asset_remove_confirm:{account_doc_id}:{i}")])
keyboard.append(back_button(f"asset_manage:{account_doc_id}"))
await message.edit_text("Select the asset to remove:", reply_markup=InlineKeyboardMarkup(keyboard))
elif data.startswith("asset_remove_confirm:"):
parts = data.split(":")
account_doc_id = parts[1]
asset_index_to_remove = int(parts[2])
settings = await get_or_create_trade_settings(account_doc_id)
assets = settings.get("assets", [])
if 0 <= asset_index_to_remove < len(assets):
removed_asset = assets.pop(asset_index_to_remove)
await update_trade_setting(account_doc_id, {"assets": assets})
await callback_query.answer(f"Removed asset: {removed_asset['name']}", show_alert=True)
# Go back to asset manage screen
await callback_query_handler(client, CallbackQuery(
id=callback_query.id, from_user=callback_query.from_user,
message=message, chat_instance="dummy",
data=f"asset_manage:{account_doc_id}", game_short_name=None, inline_message_id=None
)) # Simulate callback
else:
await callback_query.answer("Error: Invalid asset index for removal.", show_alert=True)
# --- Settings Management ---