diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f93ebf --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv +__pycache__ diff --git a/main.py b/main.py new file mode 100755 index 0000000..4caac51 --- /dev/null +++ b/main.py @@ -0,0 +1,134 @@ +from fastapi import FastAPI, Depends, HTTPException +from sqlmodel import SQLModel, Session, create_engine, select +from models import Elevator, Demand, Floor +import uvicorn + +app = FastAPI(title="Elevator Model") + +DATABASE_URL = "sqlite:///./elevator.db" +engine = create_engine(DATABASE_URL) + +def get_session(): + with Session(engine) as session: + yield session + +SQLModel.metadata.create_all(engine) + +@app.post("/elevator/") +def create_elevator(elevator: Elevator, session: Session = Depends(get_session)): + db_elevator = Elevator.model_validate(elevator) + session.add(db_elevator) + session.commit() + for floor in range(db_elevator.min_floor, db_elevator.max_floor+1): + db_floor = Floor(floor=floor, elevator_id=db_elevator.id) + session.add(db_floor) + session.commit() + session.refresh(db_elevator) + return db_elevator + +@app.get("/elevator/{elevator_id}") +def get_elevator_status(elevator_id: int, session: Session = Depends(get_session)): + elevator = session.get(Elevator, elevator_id) + if not elevator: + raise HTTPException(status_code=404, detail="Elevator not found") + return elevator + +@app.get("floors/{elevator_id}") +def get_floors_status(elevator_id: int, session: Session = Depends(get_session)): + statement = select(Floor) \ + .where(Floor.elevator_id == elevator_id) \ + .where(Floor.is_demanded == True) + floors = session.exec(statement) + if len(floors) == 0: + raise HTTPException(status_code=404, detail="No floors demanded.") + return floors + +def get_next_stop(elevator: Elevator, session: Session = Depends(get_session)): + if elevator.motion_status == "ascending": + next_up = session.exec(select(Floor) \ + .where(Floor.elevator_id == elevator.id) \ + .where(Floor.floor > elevator.current_floor) \ + .order_by(Floor.floor)).first() + if next_up: + return next_up.floor + else: + next_down = session.exec(select(Floor) \ + .where(Floor.elevator_id == elevator.id) \ + .where(Floor.floor < elevator.current_floor) \ + .order_by(Floor.floor)).last() + return next_down + elif elevator.motion_status == "ascending": + next_down = session.exec(select(Floor) \ + .where(Floor.elevator_id == elevator.id) \ + .where(Floor.floor < elevator.current_floor) \ + .order_by(Floor.floor)).last() + if next_down: + return next_down.floor + else: + next_up = session.exec(select(Floor) \ + .where(Floor.elevator_id == elevator.id) \ + .where(Floor.floor > elevator.current_floor) \ + .order_by(Floor.floor)).first() + return next_up.floor + else: + next = session.exec(select(Floor) \ + .where(Floor.elevator_id == elevator.id)).first() + return next.floor + +def change_demand_status(elevator_id: int, floor: int, new_status: bool, session: Session = Depends(get_session)): + statement = select(Floor) \ + .where(Floor.elevator_id == elevator_id)\ + .where(Floor.floor == floor) + floor = session.exec(statement).first() + if floor: + raise HTTPException(status_code=404, detail="Floor not found for the elevator.") + + floor.is_demanded = new_status + session.add(floor) + session.commit() + +@app.post("elevator/{elevator_id}") +def move_elevator(elevator_id: int, session: Session = Depends(get_session)): + # moves elevator to the next stop + elevator = session.get(Elevator, elevator_id) + if not elevator: + raise HTTPException(status_code=404, detail="Elevator not found") + if elevator.next_stop != None: + elevator.current_floor = elevator.next_stop + elevator.next_stop = get_next_stop(elevator) + change_demand_status(elevator.elevator_id, elevator.current_floor, False) + if elevator.next_stop: + if elevator.next_stop > elevator.current_floor: + elevator.motion_status = "ascending" + else: + elevator.motion_status = "descending" + elevator.motion_status = "still" + session.add(elevator) + session.commit() + session.refresh(elevator) + else: + raise HTTPException(status_code=400, detail="No floors demanded") + return elevator + +@app.post("/demand/") +def create_demand(demand: Demand, session: Session = Depends(get_session)): + db_demand = Demand.model_validate(demand) + db_floor = session.exec(select(Floor) + .where(Floor.elevator_id == db_demand.elevator_id) + .where(Floor.floor == db_demand.target_floor)).first() + db_floor.is_demanded = True + session.add(db_floor) + session.add(db_demand) + session.commit() + session.refresh(db_demand) + return db_demand + +@app.get("/demand/{demand_id}") +def get_demand(demand_id: int, session: Session = Depends(get_session)): + demand = session.get(Demand, demand_id) + if not demand: + raise HTTPException(status_code=404, detail="Demand not found") + return demand + +if __name__ == "__main__": + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/models.py b/models.py new file mode 100755 index 0000000..801d5b4 --- /dev/null +++ b/models.py @@ -0,0 +1,33 @@ +from sqlmodel import Field, SQLModel +import datetime + + +class Elevator(SQLModel, table=True): + id: int | None = Field(default=None, primary_key=True) + max_floor: int = Field(default=10) + min_floor: int = Field(default=0) + current_floor: int = Field(default=0) + next_stop: int | None = Field(default=0) + motion_status: str = Field(default="still") + # min_floor <= current_floor <= max_floor + # motion_status = "still" | "ascending" | "descending" + +class Floor(SQLModel, table=True): + id: int | None = Field(default=None, primary_key=True) + floor: int = Field(default=None) + elevator_id: int = Field(default=None, foreign_key="elevator.id") + is_demanded: bool = Field(default=False) + +class Demand(SQLModel, table=True): + id: int | None = Field(default=None, primary_key=True) + elevator_id: int = Field(default=None, foreign_key="elevator.id") + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + source: str = Field(default=None) + target_floor: int = Field(default=None) + + # source = "inside" | "outside" + # elevator_id must exist + # elevator.min_floor <= target_floor <= elevator.max_floor + + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..485bac5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,26 @@ +annotated-types==0.7.0 +anyio==4.9.0 +certifi==2025.1.31 +charset-normalizer==3.4.1 +click==8.1.8 +fastapi==0.115.12 +greenlet==3.1.1 +h11==0.14.0 +httpcore==1.0.7 +httpx==0.28.1 +idna==3.10 +iniconfig==2.1.0 +packaging==24.2 +pluggy==1.5.0 +pydantic==2.11.2 +pydantic_core==2.33.1 +pytest==8.3.5 +requests==2.32.3 +sniffio==1.3.1 +SQLAlchemy==2.0.40 +sqlmodel==0.0.24 +starlette==0.46.1 +typing-inspection==0.4.0 +typing_extensions==4.13.1 +urllib3==2.3.0 +uvicorn==0.34.0 diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..4a5da5a --- /dev/null +++ b/tests.py @@ -0,0 +1,116 @@ +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session, SQLModel, create_engine +from sqlalchemy.pool import StaticPool + +from main import app, get_session +from models import Elevator, Floor, Demand + +# Set up in-memory database for testing +@pytest.fixture +def client(): + engine = create_engine( + "sqlite://:memory:" + ) + SQLModel.metadata.create_all(engine) + + def get_session_override(): + with Session(engine) as session: + yield session + + app.dependency_overrides[get_session] = get_session_override + client = TestClient(app) + return client + +@pytest.fixture +def db_session(): + engine = create_engine( + "sqlite://:memory:" + ) + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + yield session + +def test_create_elevator(client): + response = client.post( + "/elevator/", + json={"max_floor": 10, "min_floor": -2, "current_floor": 0} + ) + assert response.status_code == 200 + data = response.json() + print(response) + assert data["id"] is not None + assert data["max_floor"] == 10 + assert data["min_floor"] == -2 + assert data["current_floor"] == 0 + assert data["motion_status"] == "still" + +def test_get_existing_elevator(client): + create_response = client.post( + "/elevator/", + json={"max_floor": 10, "min_floor": 0, "current_floor": 0} + ) + elevator_id = create_response.json()["id"] + + response = client.get(f"/elevator/{elevator_id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == elevator_id + assert data["max_floor"] == 10 + assert data["min_floor"] == 0 + +def test_get_nonexistent_elevator(client): + response = client.get("/elevator/999") + assert response.status_code == 404 + assert response.json()["detail"] == "Elevator not found" + +def test_create_demand(client): + create_elevator_response = client.post( + "/elevator/", + json={"max_floor": 10, "min_floor": 0, "current_floor": 0} + ) + elevator_id = create_elevator_response.json()["id"] + + # Then create a demand + response = client.post( + "/demand/", + json={ + "elevator_id": elevator_id, + "source": "inside", + "target_floor": 5 + } + ) + assert response.status_code == 200 + data = response.json() + assert data["id"] is not None + assert data["elevator_id"] == elevator_id + assert data["source"] == "inside" + assert data["target_floor"] == 5 + +def test_get_demand(client): + create_elevator_response = client.post( + "/elevator/", + json={"max_floor": 10, "min_floor": 0, "current_floor": 0} + ) + elevator_id = create_elevator_response.json()["id"] + + create_demand_response = client.post( + "/demand/", + json={ + "elevator_id": elevator_id, + "source": "outside", + "target_floor": 3 + } + ) + demand_id = create_demand_response.json()["id"] + + response = client.get(f"/demand/{demand_id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == demand_id + assert data["elevator_id"] == elevator_id + assert data["target_floor"] == 3 + +def test_get_nonexistent_demand(client): + response = client.get("/demand/999") + assert response.status_code == 404 diff --git a/thinking-process.md b/thinking-process.md new file mode 100755 index 0000000..ef7b19d --- /dev/null +++ b/thinking-process.md @@ -0,0 +1,25 @@ +# Goal +Model elevator problem in order to store proper data to create machine learning models. + +# What are we optimizing for in a basic model? +- get the optimized resting floor? +- (time?, cost?) + +# Tasks +- [ ] Model the problem into a storage schema (SQL DB schema or whatever you prefer) +- [ ] CRUD some data +- [ ] Add some flair with a business rule or two +- [ ] Have the data in a suitable format to feed to a prediction training algorithm + + +### Marking +- storage (sqlite, docker container, ...) +- tests + +# Stack +- sqlite (fast prototype) +- fastapi +- sqlmodel (sqlalchemy + pydantic) + + +