Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.log
*.db
**/__pycache__/
49 changes: 49 additions & 0 deletions READMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Package Measurement Conversion

## Overview
This application exposes a RESTful API endpoint that accepts a masurement input as string of characters, and returns a response of a list of total values of measured inflows.

## Features
- **Package Measurement Conversion:** Accepts a sequence of characters and returns a result list of measured inflows.
- **Clear and adaptable:** Easily modified and extended to include additional functionalities.

## Installation
- Clone the following url in your command prompt: https://github.com/ziyadabd/PackageMeasurementConversionAPI.git

### Prerequisites
- Python 3.x
- CherryPy

### Setup
1. Clone the repository (see the Installation section above).
2. Install CherryPy package:
```pip install CherryPy==18.9.0```

### Running the Program
- **Script**:

Default: ```python main_app.py```

Specific port: ```python main_app.py <enter_port>```

### Usage
- Call the API using the following URLs:

http://localhost:8080/?convert_measurements=<enter_string_here>


- Get history of all conversions:

Default: http://localhost:8080/get_history

## Contributing
Contributions to this project are prohibited due to the course restrictions.

## License
This project is licensed under the MIT License.

## Contact
For any queries, please contact [email protected]

## Acknowledgements
Project by Ziyad Al Hashar
27 changes: 27 additions & 0 deletions main_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import cherrypy
import logging
import argparse

from src.controllers.SequenceController import SequenceRecordsV1

def parse_command_line():
parser = argparse.ArgumentParser(description="CherryPy Server Configuration")
parser.add_argument('port', nargs='?', type=int, default=8080, help='Port on which the server will run')
return parser.parse_args()

if __name__ == '__main__':
# Parse command line arguments
args = parse_command_line()

# Configure LOGGING to file
logging.basicConfig(filename='error_log.log', level=logging.CRITICAL,
format='%(asctime)s:%(levelname)s:%(message)s')


cherrypy.config.update({'server.socket_host': '0.0.0.0', 'server.socket_port': args.port})
cherrypy.tree.mount(SequenceRecordsV1(), '/')


# Start the CherryPy server
cherrypy.engine.start()
cherrypy.engine.block()
Empty file added src/__init__.py
Empty file.
42 changes: 42 additions & 0 deletions src/controllers/SequenceController.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import cherrypy

from src.services.SequenceService import SequenceService
from src.services.SequenceHistory import SequenceHistory


class SequenceRecordsV1():

def __init__(self):
self.sequence_service = SequenceService()
self.sequence_history = SequenceHistory()

@cherrypy.expose()
@cherrypy.tools.json_out()
def convert_measurements(self, input: str = ""):
"""
Handles the GET request and return a JSON response. The API will convert measurement input
string from a sequence of characters into a result list of the total values of measured
inflows for each package.
"""

try:
self.sequence_service.get_sequence(input)
res_msg = {"status": "success", "err_msg": "", "result": self.sequence_service.process_sequence()}
self.sequence_history.insert_data("SUCCESS", input)
except: # If Invalid Sequence
res_msg = {"status": "fail", "err_msg": "invalid sequence", "result": []}
self.sequence_history.insert_data("FAILED", input)


return res_msg

@cherrypy.expose()
@cherrypy.tools.json_out()
def get_history(self):
"""
Handles the GET all records from Database and returns a JSON response.
"""

res_msg = {"status": "success", "err_msg": "", "result": self.sequence_history.fetch_data()}

return res_msg
Empty file added src/controllers/__init__.py
Empty file.
51 changes: 51 additions & 0 deletions src/models/Sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

class Sequence:

def __init__(self):
self.sequence = ""


def set_sequence(self, sequence):
self.sequence = sequence

def get_sequence_as_str(self):
return str(self.sequence)

# Takes character as input and returns it's numerical representation.
def encoder(self, character):
encoder = {"_": 0, "a": 1, "b": 2, "c": 3, "d": 4, "e": 5, "f": 6, "g": 7, "h": 8,
"i": 9, "j": 10, "k": 11, "l": 12, "m": 13, "n": 14, "o": 15, "p": 16, "q": 17,
"r": 18, "s": 19, "t": 20, "u": 21, "v": 22, "w": 23, "x": 24, "y": 25, "z": 26}

return encoder[character]

# Logic to check if input sequence is valid.
def is_valid(self):
sequence = self.sequence

steps = 0
index = 0
is_counting = False
while index < len(sequence):
if is_counting: # IF in the middle of cycle
steps = steps - 1
if sequence[index] == 'z':
steps = steps + 1
if steps == 0:
is_counting = False
else: # IF at the start of cycle
steps = self.encoder(sequence[index])
if sequence[index] == 'z':
steps = steps + self.encoder(sequence[index + 1])
if sequence[index + 1] == 'z':
steps = steps + self.encoder(sequence[index + 2])
index += 1
index += 1
is_counting = True

index += 1

if steps == 0:
return True
else:
return False
Empty file added src/models/__init__.py
Empty file.
57 changes: 57 additions & 0 deletions src/services/SequenceHistory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import sqlite3
import time
import datetime

# Manages Database Operations
class SequenceHistory:
def __init__(self):
self.db_name = "sequence_history.db"
self.conn = sqlite3.connect(self.db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self.create_table()


def create_table(self):
query = '''
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
input TEXT,
status TEXT
)
'''
try:
self.cursor.execute(query)
self.conn.commit()
except sqlite3.Error as e:
print(f"An error occurred: {e}")

def insert_data(self, status, input = "test"):
insert_query = 'INSERT INTO history (datetime, input, status) VALUES (?, ?, ?)'
data_to_insert = (str(time.time()), input, status)

try:
self.cursor.execute(insert_query, data_to_insert)
self.conn.commit()
except sqlite3.Error as e:
print(f"An error occurred: {e}")

def fetch_data(self):
select_query = 'SELECT * FROM history'

try:
# Fetch history from database and format it (epoch to human-readable time)
self.cursor.execute(select_query)
fetched_data = self.cursor.fetchall()
filtered_data = []
for entry in fetched_data:
# Convert the timestamp to a datetime object and format it
timestamp = float(entry[1])
readable_time = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
# Append the converted time and the rest of the data, excluding the ID
filtered_sublist = [readable_time, entry[2], entry[3]]
filtered_data.append(filtered_sublist)
return filtered_data
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return None
67 changes: 67 additions & 0 deletions src/services/SequenceService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from src.models.Sequence import Sequence
from src.services.SequenceHistory import SequenceHistory


class SequenceService:

def __init__(self):
self.sequence_manager = Sequence()
self.sequence_history = SequenceHistory()
self.result = []

# Receives the input sequenece from the API endpoint and stores it as a Sequence object.
def get_sequence(self, str_representation):
self.sequence_manager.set_sequence(str_representation)

# Adds integer to result list (sum of each cycle) and returns it.
def append_num_to_list(self, number):
self.result.append(number)
return self.result

# Processes the Sequence object and returns the result list.
def process_sequence(self):
sequence = self.sequence_manager.get_sequence_as_str()

# Check if input is valid
if sequence[0] == "_": # Special Case for "_"
return [0]
elif self.sequence_manager.is_valid():
pass
else:
raise Exception("INVALID SEQUENCE")

self.result = []
index = 0
is_counting = False
while index < len(sequence):
if is_counting:
if steps != 0:
if sequence[index] == 'z':
sum = sum + self.sequence_manager.encoder(sequence[index])
else:
sum = sum + self.sequence_manager.encoder(sequence[index])
steps -= 1
if (index + 1) == len(sequence):
self.append_num_to_list(sum)
else:
if steps == 0:
index -= 1
self.append_num_to_list(sum)
is_counting = False
else:
sum = 0
steps = self.sequence_manager.encoder(sequence[index])

if steps == 0 and index + 1 == len(sequence):
self.append_num_to_list(sum)
else:
if sequence[index] == 'z':
steps = steps + self.sequence_manager.encoder(sequence[index + 1])
if sequence[index + 1] == 'z':
steps = steps + self.sequence_manager.encoder(sequence[index + 2])
index += 1
index += 1
is_counting = True
index += 1

return self.result
Empty file added src/services/__init__.py
Empty file.
Empty file added src/services/tests/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions src/services/tests/unitTests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import unittest

from ..SequenceService import SequenceService


CURRENT_VS_EXPECTED_VALUE = (
("aa", [1]),
("abbcc", [2, 6]),
("dz_a_aazzaaa", [28, 53, 1]),
("a_", [0]),
("abcdabcdab", [2, 7, 7]),
("abcdabcdab_", [2, 7, 7, 0]),
("zdaaaaaaaabaaaaaaaabaaaaaaaabbaa", [34]),
("zza_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_a_", [26]),
("za_a_a_a_a_a_a_a_a_a_a_a_a_azaaa", [40, 1]),
("_", [0]),
("_ad", [0]),
("a_", [0]),
("_zzzb", [0]),
("__", [0])
)


class TestSequence(unittest.TestCase):

def test_sequence(self):
for test_string, expected in CURRENT_VS_EXPECTED_VALUE:
sequence_service = SequenceService()
sequence_service.get_sequence(test_string)
result = sequence_service.process_sequence()
print(f"result {result} expected {expected}")
self.assertEqual(result, expected)


if __name__ == '__main__':
unittest.main()