Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "Elevators",
"dockerFile": "../Dockerfile",
"settings": {
"terminal.integrated.shell.linux": "/bin/bash"
},
"extensions": [
"ms-python.python",
],
"postCreateCommand": "poetry install",
"workspaceFolder": "/workspace"
}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
*.py[cod]
33 changes: 33 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Dockerfile
FROM python:3.9

# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
curl \
vim \
&& apt-get install sqlite3 \
&& rm -rf /var/lib/apt/lists/*

# Install Poetry using the official installation script
RUN curl -sSL https://install.python-poetry.org | python3 - && \
mv /root/.local/bin/poetry /usr/local/bin/poetry

# Ensure Poetry is on the PATH
ENV PATH="/root/.local/bin:$PATH"

# Set the working directory
WORKDIR /workspace


# Copy dependency files first for caching, then install dependencies.
# If your project root contains additional files, adjust accordingly.
#COPY pyproject.toml poetry.lock* ./
#RUN poetry install --no-root

# Optionally, copy the rest of your project files.
COPY . /workspace

# Default command: open a bash shell with Poetry activated.
CMD ["poetry", "run", "bash", "uvicorn", "sqlite3"]

Binary file added dev_containers.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added dev_containers_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added dev_containers_install.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions elevators/database_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- State table
CREATE TABLE states(
id INTEGER PRIMARY KEY AUTOINCREMENT,
current_floor INTEGER NOT NULL,
state_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
vacant BOOLEAN NOT NULL,
mooving BOOLEAN NOT NULL
);

-- Demands table
CREATE TABLE demands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
demand_floor INTEGER NOT NULL,
demand_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Binary file added elevators/elevators.db
Binary file not shown.
182 changes: 182 additions & 0 deletions elevators/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import create_engine, Column, Boolean, Integer, DateTime, literal
from sqlalchemy.orm import declarative_base, sessionmaker, Session


DATABASE_URL = "sqlite:///./elevators.db"

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()

app = FastAPI()

#Models to be stored in the database

class State(Base):
"""
The State class represents a snapshot of the elevator's status.
Attributes:
id (int): The unique identifier for the state record.
current_floor (int): The current floor where the elevator is based.
state_time (datetime): The time when the state was recorded.
vacant (bool): Indicates if the elevator is vacant.
mooving (bool): Indicates if the elevator is moving.
"""
__tablename__ = "states"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
current_floor = Column(Integer, nullable=False)
state_time = Column(DateTime, default=datetime.utcnow)
vacant = Column(Boolean, nullable=False)
mooving = Column(Boolean, nullable=False)

class Demand(Base):
"""
The Demand class represents a demand made for the elevator.
Attributes:
id (int): The unique identifier for the demand record.
demand_floor (int): The floor from which the demand is made.
demand_time (datetime): The time when the demand was recorded.
"""
__tablename__ = "demands"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
demand_floor = Column(Integer, nullable=False)
demand_time = Column(DateTime, default=datetime.utcnow)



#Pydantic Schemas

class StateBase(BaseModel):
current_floor: int
state_time: datetime
vacant: bool
mooving: bool

class DemandBase(BaseModel):
demand_floor : int
demand_time : datetime


#Dependency or Data Base Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


#Endpoints
@app.get("/state", summary = 'Get all elevator states', description = 'Get all situations related to the elavator')
def read_states(db: Session = Depends(get_db)):
"""
Retrieve all state records.
Args:
db (Session): The database session dependency.
Returns:
List[State]: A list of all state records.
"""

states = db.query(State).all()
return states

@app.post("/state", summary = 'Log a new elevator state',
description = 'Crate a situation of the elevator, whether it is moving, vacant, current floor and time', status_code=201)
def create_state(state: StateBase, db: Session = Depends(get_db)):
"""
Log a new state record.
Args:
state (StateBase): The state data to be logged.
db (Session): The database session dependency.
Returns:
State: The newly created state record.
"""
new_state = State(
current_floor=state.current_floor,
vacant=state.vacant,
mooving=state.mooving,
state_time=state.state_time
)
db.add(new_state)
db.commit()
db.refresh(new_state)
return new_state


@app.get("/demand", summary = 'Get all elevator demands', description = 'Get all demands related to the elavator')
def read_demands(db: Session = Depends(get_db)):
"""
Retrieve all demand records.
Args:
db (Session): The database session dependency.
Returns:
List[Demand]: A list of all demand records.
"""
demands = db.query(Demand).all()
return demands

@app.post("/demand", summary = 'Log a new elevator demand',
description = 'Crate a new demand for the elevator', status_code=201)

def create_demand(demand: DemandBase, db: Session = Depends(get_db)):
"""
Log a new demand record and automatically generate a new state.
Args:
demand (DemandBase): The demand data to be logged.
db (Session): The database session dependency.

Returns:
Demand: The newly created demand record.
"""

# Log the demand.
new_demand = Demand(
demand_floor= demand.demand_floor,
demand_time= demand.demand_time
)
db.add(new_demand)
db.commit()
db.refresh(new_demand)

return new_demand

@app.get("/dataset", summary='Get dataset for model training', description='Returns a dataset for training the prediction model')
def get_dataset(db: Session = Depends(get_db)):
"""
Retrieve a dataset suitable for training a prediction model.
Args:
db (Session): The database session dependency.
Returns:
List[dict]: A list of records where each record contains:
- event_type_is_resting (bool): True if the event is a resting state, False if it is a demand.
- floor (int): The elevator floor associated with the event.
- time (datetime): The timestamp of the event.
"""
# Query resting states as events
resting_query = db.query(
literal(True).label("event_type_is_resting"),
State.current_floor.label("floor"),
State.state_time.label("time")
).filter(
State.vacant == True,
State.mooving == False
)

# Query demand events
demand_query = db.query(
literal(False).label("event_type_is_resting"),
Demand.demand_floor.label("floor"),
Demand.demand_time.label("time")
)

# Combine the queries using union_all and order by the event time.
union_query = resting_query.union_all(demand_query).order_by("time")

results = union_query.all()

# Convert SQLAlchemy row objects to dictionaries.
dataset = [{"event_type_is_resting": row.event_type_is_resting, "floor": row.floor, "time": row.time} for row in results]
return dataset
138 changes: 138 additions & 0 deletions elevators/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import pytest
from datetime import datetime, timedelta
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from sqlalchemy.orm import sessionmaker
from main import app, Base, get_db

#Use an in-memory SQLite database with StaticPool for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine_test = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine_test)

#Create all tables in the in-memory database
Base.metadata.create_all(bind=engine_test)

#Override the get_db dependency to use the in-memory test database
def override_get_db():

db = TestingSessionLocal()
try:
yield db
finally:
db.close()

#Override the get_db dependency in our app.
app.dependency_overrides[get_db] = override_get_db

#Create a TestClient instance for our app.
client = TestClient(app)

def test_create_and_read_state():
"""
Test creating a new elevator state and retrieving the list of states.

Steps:
1. POST a new state record.
2. GET the state records and verify that the created state is present.
"""

state_payload = {
"current_floor": 1,
"state_time": datetime.utcnow().isoformat(),
"vacant": True,
"mooving": False
}
# Create a new state.
response = client.post("/state", json=state_payload)
assert response.status_code == 201, f"Expected status 201, got {response.status_code}"
created_state = response.json()
assert created_state["current_floor"] == state_payload["current_floor"]
assert created_state["vacant"] is True
assert created_state["mooving"] is False

# Retrieve all states.
response = client.get("/state")
assert response.status_code == 200, f"Expected status 200, got {response.status_code}"
states = response.json()
assert isinstance(states, list)
# There should be at least one state (the one we just created).
assert len(states) > 0

def test_create_and_read_demand():
"""
Test creating a new elevator demand and retrieving the list of demands.

Steps:
1. POST a new demand record.
2. GET the demand records and verify that the created demand is present.
"""

demand_payload = {
"demand_floor": 3,
"demand_time": datetime.utcnow().isoformat()
}
# Create a new demand.
response = client.post("/demand", json=demand_payload)
assert response.status_code == 201, f"Expected status 201, got {response.status_code}"
created_demand = response.json()
assert created_demand["demand_floor"] == demand_payload["demand_floor"]

# Retrieve all demands.
response = client.get("/demand")
assert response.status_code == 200, f"Expected status 200, got {response.status_code}"
demands = response.json()
assert isinstance(demands, list)
assert len(demands) > 0

def test_get_dataset():
"""
Test that the /dataset endpoint returns the apropriate data for training the prediction model.

Steps:
1. POST a resting state record.
2. POST a demand record.
3. GET the dataset and verify that:
- The events are sorted by time.
- There is at least one resting event and one demand event.

"""

# Create a resting state event (simulate a resting event 10 minutes ago).
resting_state_payload = {
"current_floor": 2,
"state_time": (datetime.utcnow() - timedelta(minutes=10)).isoformat(),
"vacant": True,
"mooving": False
}
response = client.post("/state", json=resting_state_payload)
assert response.status_code == 201, f"Expected status 201, got {response.status_code}"

# Create a demand event with a current timestamp.
demand_payload = {
"demand_floor": 4,
"demand_time": datetime.utcnow().isoformat()
}
response = client.post("/demand", json=demand_payload)
assert response.status_code == 201, f"Expected status 201, got {response.status_code}"

# Retrieve the dataset.
response = client.get("/dataset")
assert response.status_code == 200, f"Expected status 200, got {response.status_code}"
dataset = response.json()

# Verify that the dataset is a list and sorted by time.
times = [record["time"] for record in dataset]
sorted_times = sorted(times)
assert times == sorted_times, "Dataset events are not sorted by time"

# Verify that there is at least one resting event and one demand event.
resting_events = [rec for rec in dataset if rec["event_type_is_resting"] is True]
demand_events = [rec for rec in dataset if rec["event_type_is_resting"] is False]
assert len(resting_events) > 0, "No resting events found in dataset"
assert len(demand_events) > 0, "No demand events found in dataset"
Loading
Loading