Skip to content

Commit

Permalink
feat: Implement Agent & Agent001 for signal generation package
Browse files Browse the repository at this point in the history
  • Loading branch information
seekersoftec committed Oct 10, 2024
1 parent 7f6c2cd commit f271d11
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 34 deletions.
1 change: 1 addition & 0 deletions packages/itbot/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .agent import *
92 changes: 92 additions & 0 deletions packages/itbot/agents/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional
from packages.itbot.itbot import Signal
import threading
import logging
from trade_flow.common.logging import Logger


class Agent(ABC):
"""
Abstract base class for agent interfaces responsible for loading trained models and generating trading signals.
The agent continuously waits for data and sends signals asynchronously as an event-driven program.
"""

def __init__(self, logger: Optional[Logger] = None):
# Set up logging
self.logger = logger or Logger(name="it_bot", log_level=logging.DEBUG, filename="ITBot.log")

self.model = None
self.signals_queue = asyncio.Queue()
self.loop = None

@abstractmethod
def load_model(self, model_path: str) -> None:
"""
Load the trained model from the specified path.
Args:
model_path (str): The file path of the trained model.
"""
pass

@abstractmethod
async def generate_signals(self, data: dict) -> List[Signal]:
"""
Asynchronously generate trading signals using the loaded model and input data.
Args:
data (dict): The data for the agent to make decisions.
Returns:
List[Signal]: A list of trading signals generated by the model.
"""
pass

async def add_data(self, data: dict) -> None:
"""
Add data to the agent to trigger signal generation. This acts as an event to push new data.
Args:
data (dict): The data for generating signals.
"""
await self.signals_queue.put(data)

async def send_signals(self, signals: List[Signal]) -> None:
"""
Asynchronously send the generated signals to ITBot.
Args:
signals (List[Signal]): The generated signals to be sent.
"""
for signal in signals:
self.logger.info(f"Sending signal: {signal}")
await asyncio.sleep(0.1)

async def _run_tasks(self) -> None:
"""
Asynchronously wait for data, generate signals, and send them. This keeps running in the event loop.
"""
while True:
data = await self.signals_queue.get()
signals = await self.generate_signals(data)
await self.send_signals(signals)

def _start_event_loop(self) -> None:
"""
Start the event loop in the background using `asyncio.create_task`.
This allows the agent to run without blocking the main thread.
"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.create_task(self._run_tasks()) # Schedule _run_tasks in the background
self.loop.run_forever() # Keep the event loop running indefinitely

def run(self) -> None:
"""
Start the event loop in a separate thread, allowing the agent to process signals in the background.
"""
self.logger.debug("Running Agent")
thread = threading.Thread(target=self._start_event_loop, daemon=True)
thread.start()
86 changes: 86 additions & 0 deletions packages/itbot/agents/agent_001.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import os
from typing import List, Optional
from packages.itbot.itbot import Signal
from packages.itbot.agents.agent import Agent
from trade_flow.common.logging import Logger


class Agent001(Agent):
"""
Example agent that implements Agent, for loading a model, generating signals, and sending them to ITBot.
"""

def __init__(self, logger: Optional[Logger] = None):
super().__init__(logger)
self.model = None

def load_model(self, model_path: str) -> None:
"""
Load the trained model from the specified path.
"""
if os.path.exists(model_path):
# Dummy model loading for demonstration purposes
self.model = f"Loaded model from {model_path}"
self.logger.info(f"Model loaded from {model_path}")
else:
raise FileNotFoundError(f"Model file {model_path} not found.")

async def generate_signals(self, data: dict) -> List[Signal]:
"""
Asynchronously generate trading signals using the loaded model based on the data.
Args:
data: Data from the environment to generate signals.
Returns:
List[Signal]: A list of generated trading signals.
"""
if self.model is None:
raise ValueError("Model is not loaded. Please load the model first.")

# Simulate signal generation delay
await asyncio.sleep(1)

# Example signals based on input data
signals = [
Signal(
symbol="BTCUSD",
price=data["price"],
score=data["score"],
trend="↑",
zone="Buy Zone",
trade_type="Buy",
),
Signal(
symbol="ETHUSD",
price=data["price"] * 0.05,
score=data["score"] - 0.2,
trend="↓",
zone="Sell Zone",
trade_type="Sell",
),
]
return signals

async def send_signals(self) -> None:
"""
Asynchronously send the generated signals to ITBot for further processing and forwarding to MT5.
"""
while True:
# Wait for new data to be added
data = await self.signals_queue.get()

# Generate signals asynchronously based on new data
signals = await self.generate_signals(data)

for signal in signals:
# Forward the signal to ITBot's queue
self.trader.execute_trade(signal)
self.logger.info(f"Signal sent to ITBot: {signal}")

def run(self):
"""
Run the agent in the background, processing data and sending signals.
"""
super().run() # Starts the event loop and processes tasks
1 change: 1 addition & 0 deletions packages/itbot/itbot/interfaces/telegram_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(self) -> None:
Raises:
RuntimeError: If the client fails to run or disconnects unexpectedly.
"""
self.logger.debug("Listening for Signals...")
try:
with self.client:
self.logger.info("Telegram client is running and listening for messages...")
Expand Down
165 changes: 137 additions & 28 deletions packages/itbot/itbot/mt5_trader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import logging
import random
import time
import pandas as pd
from datetime import datetime
from typing import Dict, Optional, List, Tuple
from packages.itbot.itbot import Signal
from packages.itbot.itbot.MetaTrader5 import MetaTrader5
Expand Down Expand Up @@ -96,6 +99,7 @@ def __init__(
# Risk Manager Setup
self.risk_management = RiskManager(
initial_balance=self.initial_balance,
risk_percentage=0.1,
contract_size=contract_size,
logger=self.logger,
)
Expand All @@ -118,12 +122,142 @@ def _initialize_mt5(self) -> None:
try:
if not self.mt5.initialize():
raise RuntimeError("MetaTrader 5 initialization failed")
if not self.mt5.login(self.mt5_account_number, self.mt5_password, self.mt5_server):
raise RuntimeError("MetaTrader 5 login failed")

# self.logger.debug(
# self.mt5.login(self.mt5_account_number, self.mt5_password, self.mt5_server)
# )
# if not self.mt5.login(self.mt5_account_number, self.mt5_password, self.mt5_server):
# raise RuntimeError("MetaTrader 5 login failed")
except Exception as e:
self.logger.error(f"Error initializing MetaTrader 5: {e}")
raise MT5TraderInitializationError("Failed to initialize MetaTrader 5")

def _prepare_trade_request(self, symbol: str, trade_type: str, **kwargs) -> Dict:
"""Prepare a trade request based on the given symbol and trade type."""

def round_2_tick_size(price: float, trade_tick_size):
return round(price / trade_tick_size) * trade_tick_size

symbol_info = self.mt5.symbol_info(symbol)
point = symbol_info.point
trade_stops_level = symbol_info.trade_stops_level
trade_tick_size = symbol_info.trade_tick_size
price = (
self.mt5.symbol_info_tick(symbol).bid
if trade_type == "BUY"
else self.mt5.symbol_info_tick(symbol).ask
)

position_size = self.risk_management.calculate_position_size(**kwargs)

# Validate the position size before executing the trade
position_size = self.validate_position_size(symbol, position_size)

request = {
"action": self.mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": position_size,
"type": self.mt5.ORDER_TYPE_BUY if trade_type == "BUY" else self.mt5.ORDER_TYPE_SELL,
"price": price,
# "sl": price - 1000 * point,
# "tp": price + (100 * 2) * point, # RRR => 2:1
"deviation": 20,
"magic": random.randint(234000, 237000),
"comment": "ITBot",
# "type_time": self.mt5.ORDER_TIME_GTC,
# "type_filling": self.mt5.ORDER_FILLING_RETURN,
}
self.logger.debug(request)
return request

def validate_position_size(self, symbol: str, volume: float) -> bool:
"""
Validate the position size for the specified symbol.
Args:
symbol (str): The trading symbol.
volume (float): The proposed position size.
Returns:
bool: True if the position size is valid, raises ValueError otherwise.
"""
symbol_info = self.mt5.symbol_info(symbol)

if not symbol_info:
raise ValueError(f"Symbol {symbol} information is not available.")

self.logger.debug(symbol_info)

# Check minimum and maximum volume
min_volume = symbol_info.volume_min
max_volume = symbol_info.volume_max

if volume < min_volume:
self.logger.warning(
f"Volume {volume} is below the minimum volume of {min_volume} for {symbol}."
)
volume = min_volume
if volume > max_volume:
self.logger.warning(
f"Volume {volume} exceeds the maximum volume of {max_volume} for {symbol}."
)
volume = max_volume

self.logger.info(
f"Volume {volume} is valid for {symbol} (min: {min_volume}, max: {max_volume})."
)
return volume

def trail_sl(self, symbol, order_ticket, timeframe, stop_loss_dict):
while True:

bars = self.mt5.copy_rates_from_pos(symbol, timeframe, 0, 4)
bars_df = pd.DataFrame(bars)
bar1_high = bars_df["high"].iloc[0]
bar2_high = bars_df["high"].iloc[1]
bar3_high = bars_df["high"].iloc[2]
bar1_low = bars_df["low"].iloc[0]
bar2_low = bars_df["low"].iloc[1]
bar3_low = bars_df["low"].iloc[2]

if self.mt5.positions_get(ticket=order_ticket):
position_type = self.mt5.positions_get(ticket=order_ticket)[0].type
if position_type == self.mt5.ORDER_TYPE_SELL:
stop_loss_value = max(bar1_high, bar2_high, bar3_high) # Sell order S/L
else:
stop_loss_value = min(bar1_low, bar2_low, bar3_low) # Buy order S/L

tick_size = self.mt5.symbol_info(symbol).trade_tick_size
normalised_sl = round(stop_loss_value / tick_size) * tick_size

if normalised_sl != stop_loss_dict[symbol]:
current_sl = self.mt5.positions_get(ticket=order_ticket)[0].sl
if normalised_sl != current_sl:
request = {
"action": self.mt5.TRADE_ACTION_SLTP,
"position": order_ticket,
"symbol": symbol,
"magic": 24001,
"sl": normalised_sl,
}
result = self.mt5.order_send(request)
if result.retcode == self.mt5.TRADE_RETCODE_DONE:
print(
f"[{datetime.now()}] Trailing Stop Loss for Order {order_ticket} updated. New S/L: {normalised_sl}"
)
print()
stop_loss_dict[symbol] = normalised_sl
elif result.retcode == 10025: # Ignore error code 10025
pass
else:
print(
f"[{datetime.now()}] Failed to update Trailing Stop Loss for Order {order_ticket}: {result.comment}"
)
print(f"Error code: {result.retcode}")
print()

time.sleep(1) # Wait for 1 second before checking again

async def execute_trade(
self, signal: Signal, strategy_name: str = "fixed_percentage", **kwargs
) -> None:
Expand Down Expand Up @@ -165,36 +299,11 @@ async def execute_trade(
result = await asyncio.get_running_loop().run_in_executor(
None, self.mt5.order_send, request
)
# result = self.mt5.order_send(request)

if result.retcode != self.mt5.TRADE_RETCODE_DONE:
raise ValueError(f"Order send failed with retcode={result.retcode}")

self.logger.info(f"MT5 order sent successfully: Order ID = {result.order}")
except Exception as e:
self.logger.error(f"Error executing MT5 order: {e}")

def _prepare_trade_request(self, symbol: str, trade_type: str, **kwargs) -> Dict:
"""Prepare a trade request based on the given symbol and trade type."""
point = self.mt5.symbol_info(symbol).point
price = (
self.mt5.symbol_info_tick(symbol).ask
if trade_type == "BUY"
else self.mt5.symbol_info_tick(symbol).bid
)

position_size = self.risk_management.calculate_position_size(**kwargs)
request = {
"action": self.mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": position_size,
"type": self.mt5.ORDER_TYPE_BUY if trade_type == "BUY" else self.mt5.ORDER_TYPE_SELL,
"price": price,
"sl": price - 100 * point,
"tp": price + 100 * point,
"deviation": 20,
"magic": random.randint(234000, 237000),
"comment": "ITBot",
"type_time": self.mt5.ORDER_TIME_GTC,
"type_filling": self.mt5.ORDER_FILLING_RETURN,
}
return request
2 changes: 1 addition & 1 deletion packages/itbot/itbot/risk_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class RiskManager:
def __init__(
self,
initial_balance: float,
risk_percentage: float = 2.0,
risk_percentage: float = 1.0,
contract_size: float = 1.0,
drawdown_factor: float = 0.5, # Risk reduction during drawdown
profit_factor: float = 1.5, # Risk increment during profit
Expand Down
Loading

0 comments on commit f271d11

Please sign in to comment.