From e8186dea571dbee6b1e3acb6ab077430a76fa1fe Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 9 Sep 2025 15:07:40 +0530 Subject: [PATCH 01/16] removal of scheduled_process entries from SQLite Signed-off-by: ashish-jabble --- scripts/plugins/storage/sqlite/init.sql | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 53db93928..8c9003bad 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -837,12 +837,9 @@ INSERT INTO fledge.scheduled_processes (name, script) VALUES ('restore', '["task -- South, Notification, North Tasks -- INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'notification_c', '["services/notification_c"]', 30 ); -INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'dispatcher_c', '["services/dispatcher_c"]', 20 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'bucket_storage_c', '["services/bucket_storage_c"]', 10 ); +-INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'pipeline_c', '["services/pipeline_c"]', 90 ); +INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -- Automation script tasks -- From 25fe98384edc8d502893813c26b2d913e2fbb839 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 9 Sep 2025 15:08:11 +0530 Subject: [PATCH 02/16] external services scripts deleted Signed-off-by: ashish-jabble --- scripts/services/bucket_storage_c | 14 -------------- scripts/services/dispatcher_c | 28 ---------------------------- scripts/services/notification_c | 28 ---------------------------- 3 files changed, 70 deletions(-) delete mode 100755 scripts/services/bucket_storage_c delete mode 100755 scripts/services/dispatcher_c delete mode 100755 scripts/services/notification_c diff --git a/scripts/services/bucket_storage_c b/scripts/services/bucket_storage_c deleted file mode 100755 index 94cb2301a..000000000 --- a/scripts/services/bucket_storage_c +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/sh -# Run a Fledge Bucket Storage service written in C/C++ -if [ "${FLEDGE_ROOT}" = "" ]; then - FLEDGE_ROOT=/usr/local/fledge -fi - -if [ ! -d "${FLEDGE_ROOT}" ]; then - logger "Fledge home directory missing or incorrectly set environment" - exit 1 -fi - -cd "${FLEDGE_ROOT}/services" - -./fledge.services.bucket "$@" diff --git a/scripts/services/dispatcher_c b/scripts/services/dispatcher_c deleted file mode 100755 index 550016087..000000000 --- a/scripts/services/dispatcher_c +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash -# Run a Fledge Dispatcher service written in C/C++ -if [ "${FLEDGE_ROOT}" = "" ]; then - FLEDGE_ROOT=/usr/local/fledge -fi - -if [ ! -d "${FLEDGE_ROOT}" ]; then - logger "Fledge home directory missing or incorrectly set environment" - exit 1 -fi - -# startup with delay -delay() { - for ARG in "$@"; - do - PARAM=$(echo $ARG | cut -f1 -d=) - if [ $PARAM = '--delay' ]; then - PARAM_LENGTH=${#PARAM} - VALUE="${ARG:$PARAM_LENGTH+1}" - sleep $VALUE - break - fi - done -} - -cd "${FLEDGE_ROOT}/services" -delay "$@" -./fledge.services.dispatcher "$@" diff --git a/scripts/services/notification_c b/scripts/services/notification_c deleted file mode 100755 index ae3023cb7..000000000 --- a/scripts/services/notification_c +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash -# Run a Fledge notification service written in C/C++ -if [ "${FLEDGE_ROOT}" = "" ]; then - FLEDGE_ROOT=/usr/local/fledge -fi - -if [ ! -d "${FLEDGE_ROOT}" ]; then - logger "Fledge home directory missing or incorrectly set environment" - exit 1 -fi - -# startup with delay -delay() { - for ARG in "$@"; - do - PARAM=$(echo $ARG | cut -f1 -d=) - if [ $PARAM = '--delay' ]; then - PARAM_LENGTH=${#PARAM} - VALUE="${ARG:$PARAM_LENGTH+1}" - sleep $VALUE - break - fi - done -} - -cd "${FLEDGE_ROOT}/services" -delay "$@" -./fledge.services.notification "$@" From d028fa741a6a8633b1d22fe6019ba30bb9ca0548 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 9 Sep 2025 15:18:06 +0530 Subject: [PATCH 03/16] get service installed API fixes for python based services Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 48 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 49c1e3e76..795b9dc51 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -329,16 +329,50 @@ def _get_service_info_from_path(service_path: str, is_python: bool = False, serv def get_service_installed() -> List: - paths = [_FLEDGE_ROOT + "/services", _FLEDGE_ROOT + "/python/fledge/services/management"] + paths = [ + os.path.join(_FLEDGE_ROOT, "services"), + os.path.join(_FLEDGE_ROOT, "python", "fledge", "services") + ] services = [] svc_prefix = 'fledge.services.' + ignored_items = {'common', 'core', 'south'} for _path in paths: - for root, dirs, files in os.walk(_path): - for _file in files: - if _file.startswith(svc_prefix): - services.append(_file.split(svc_prefix)[-1]) - elif _file == '__main__.py': - services.append('management') + if not os.path.exists(_path): + continue + if _path.endswith("/python/fledge/services"): + # Python-based services + try: + for item in os.listdir(_path): + if item in ignored_items or item.startswith('__'): + continue + + item_path = os.path.join(_path, item) + if not os.path.isdir(item_path): + continue + + init_file = os.path.join(item_path, '__init__.py') + main_file = os.path.join(item_path, '__main__.py') + + if os.path.isfile(init_file) and os.path.isfile(main_file): + services.append(item) + else: + missing = [] + if not os.path.isfile(init_file): + missing.append('__init__.py') + if not os.path.isfile(main_file): + missing.append('__main__.py') + _logger.error( + f"'{item}' service is not installed correctly — missing: {', '.join(missing)}" + ) + except Exception as ex: + _logger.warning(f"Failed to list Python services at {_path}: {ex}") + else: + # C-based services + for root, dirs, files in os.walk(_path): + for _file in files: + if _file.startswith(svc_prefix): + service_name = _file[len(svc_prefix):] + services.append(service_name) return services From e751877e8fc9136c9a887831eaa087eaffac9c11 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 9 Sep 2025 16:10:28 +0530 Subject: [PATCH 04/16] add service API updates as per service info Signed-off-by: ashish-jabble --- Makefile | 15 --- python/fledge/services/core/api/service.py | 103 +++++++-------------- scripts/plugins/storage/sqlite/init.sql | 2 +- 3 files changed, 36 insertions(+), 84 deletions(-) diff --git a/Makefile b/Makefile index d9fed2d7b..f8a15c5c4 100644 --- a/Makefile +++ b/Makefile @@ -140,9 +140,6 @@ STORAGE_SERVICE_SCRIPT_SRC := scripts/services/storage STORAGE_SCRIPT_SRC := scripts/storage NORTH_C_SCRIPT_SRC := scripts/tasks/north_c NORTH_SERVICE_C_SCRIPT_SRC := scripts/services/north_C -NOTIFICATION_C_SCRIPT_SRC := scripts/services/notification_c -DISPATCHER_C_SCRIPT_SRC := scripts/services/dispatcher_c -BUCKET_STORAGE_C_SCRIPT_SRC := scripts/services/bucket_storage_c PURGE_SCRIPT_SRC := scripts/tasks/purge PURGE_C_SCRIPT_SRC := scripts/tasks/purge_system CHECK_UPDATES_SCRIPT_SRC := scripts/tasks/check_updates @@ -366,9 +363,6 @@ scripts_install : $(SCRIPTS_INSTALL_DIR) \ install_storage_service_script \ install_north_c_script \ install_north_service_c_script \ - install_notification_c_script \ - install_dispatcher_c_script \ - install_bucket_storage_c_script \ install_purge_script \ install_check_updates_script \ install_statistics_script \ @@ -423,15 +417,6 @@ install_north_c_script : $(SCRIPT_TASKS_INSTALL_DIR) $(NORTH_C_SCRIPT_SRC) install_north_service_c_script : $(SCRIPT_SERVICES_INSTALL_DIR) $(NORTH_SERVICE_C_SCRIPT_SRC) $(CP) $(NORTH_SERVICE_C_SCRIPT_SRC) $(SCRIPT_SERVICES_INSTALL_DIR) -install_notification_c_script: $(SCRIPT_SERVICES_INSTALL_DIR) $(NOTIFICATION_C_SCRIPT_SRC) - $(CP) $(NOTIFICATION_C_SCRIPT_SRC) $(SCRIPT_SERVICES_INSTALL_DIR) - -install_dispatcher_c_script: $(SCRIPT_SERVICES_INSTALL_DIR) $(DISPATCHER_C_SCRIPT_SRC) - $(CP) $(DISPATCHER_C_SCRIPT_SRC) $(SCRIPT_SERVICES_INSTALL_DIR) - -install_bucket_storage_c_script: $(SCRIPT_SERVICES_INSTALL_DIR) $(BUCKET_STORAGE_C_SCRIPT_SRC) - $(CP) $(BUCKET_STORAGE_C_SCRIPT_SRC) $(SCRIPT_SERVICES_INSTALL_DIR) - install_purge_script : $(SCRIPT_TASKS_INSTALL_DIR) $(PURGE_SCRIPT_SRC) $(CP) $(PURGE_SCRIPT_SRC) $(SCRIPT_TASKS_INSTALL_DIR) $(CP) $(PURGE_C_SCRIPT_SRC) $(SCRIPT_TASKS_INSTALL_DIR) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 795b9dc51..a6ee20b85 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -699,10 +699,6 @@ async def add_service(request): raise web.HTTPBadRequest(reason='Missing type property in payload.') service_type = str(service_type).lower() - if service_type not in ['south', 'north', 'notification', 'management', 'dispatcher', - 'bucketstorage', 'pipeline']: - raise web.HTTPBadRequest(reason='Only south, north, notification, management, dispatcher, bucketstorage ' - 'and pipeline types are supported.') if plugin is None and service_type in ('south', 'north'): raise web.HTTPBadRequest(reason='Missing plugin property for type {} in payload.'.format(service_type)) if plugin and utils.check_reserved(plugin) is False: @@ -716,6 +712,9 @@ async def add_service(request): (type(enabled) is bool and enabled is True))) else False dryrun = not is_enabled + process_name = None + script = None + priority = None # Check if a valid plugin has been provided plugin_module_path, plugin_config, process_name, script = "", {}, "", "" @@ -724,8 +723,15 @@ async def add_service(request): # folder, within the plugin_module_path. # if multiple plugin with same name are found, then python plugin import will be tried first plugin_module_path = "{}/python/fledge/plugins/{}/{}".format(_FLEDGE_ROOT, service_type, plugin) - process_name = 'south_c' if service_type == 'south' else 'north_C' - script = '["services/south_c"]' if service_type == 'south' else '["services/north_C"]' + # FIXME: FOGL-10225 For south, north service type derived values from service info + if service_type == 'south': + process_name = 'south_c' + script = '["services/south_c"]' + priority = 100 + else: + process_name = 'north_C' + script = '["services/north_C"]' + priority = 200 try: plugin_info = common.load_and_fetch_python_plugin_info(plugin_module_path, plugin, service_type) plugin_config = plugin_info['config'] @@ -746,36 +752,19 @@ async def add_service(request): except Exception as ex: _logger.error(ex, "Failed to fetch plugin info config item.") raise web.HTTPInternalServerError(reason='Failed to fetch plugin configuration') - elif service_type == 'notification': - if not os.path.exists(_FLEDGE_ROOT + "/services/fledge.services.{}".format(service_type)): - msg = "{} service is not installed correctly.".format(service_type.capitalize()) - raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) - process_name = 'notification_c' - script = '["services/notification_c"]' - elif service_type == 'management': - file_names_list = ['{}/python/fledge/services/management/__main__.py'.format(_FLEDGE_ROOT), - '{}/scripts/services/management'.format(_FLEDGE_ROOT), - '{}/scripts/tasks/manage'.format(_FLEDGE_ROOT)] - if not all(list(map(os.path.exists, file_names_list))): - msg = "{} service is not installed correctly.".format(service_type.capitalize()) - raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) - process_name = 'management' - script = '["services/management"]' - elif service_type == 'dispatcher': - if not os.path.exists(_FLEDGE_ROOT + "/services/fledge.services.{}".format(service_type)): - msg = "{} service is not installed correctly.".format(service_type.capitalize()) - raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) - process_name = 'dispatcher_c' - script = '["services/dispatcher_c"]' - elif service_type == 'bucketstorage': - if not os.path.exists(_FLEDGE_ROOT + "/services/fledge.services.bucket"): - msg = "{} service is not installed correctly.".format(service_type.capitalize()) - raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) - process_name = 'bucket_storage_c' - script = '["services/bucket_storage_c"]' - elif service_type == 'pipeline': - process_name = 'pipeline_c' - script = '["services/pipeline_c"]' + else: + for service_name in get_service_installed(): + if service_name not in _PREBUILT_SERVICES: + service_info = await _fetch_service_info(service_name) + if service_info['type'] == service_type: + process_name = service_info['process'] + script = service_info['process_script'] + priority = service_info['startup_priority'] + break + if process_name is None or script is None or priority is None: + message = f"""Either the '{service_type}' service type is invalid or the '{service_type.capitalize()}' + service is not installed correctly.""" + raise web.HTTPNotFound(reason=message, body=json.dumps({"message": message})) storage = connect.get_storage_async() config_mgr = ConfigurationManager(storage) @@ -794,12 +783,11 @@ async def add_service(request): count = await check_scheduled_processes(storage, process_name, script) if count == 0: # Now first create the scheduled process entry for the new service - column_name = {"name": process_name, "script": script} - if service_type == 'management': - column_name["priority"] = 300 + column_name = {"name": process_name, "script": script, "priority": priority} payload = PayloadBuilder().INSERT(**column_name).payload() try: res = await storage.insert_into_tbl("scheduled_processes", payload) + await server.Server.scheduler._get_process_scripts() except StorageServerError as ex: _logger.exception("Failed to create scheduled process. %s", ex.error) raise web.HTTPInternalServerError(reason='Failed to create service.') @@ -807,35 +795,7 @@ async def add_service(request): _logger.error(ex, "Failed to create scheduled process.") raise web.HTTPInternalServerError(reason='Failed to create service.') - # check that notification service is not already registered, right now notification service LIMIT to 1 - if service_type == 'notification': - res = await check_schedule_entry(storage) - for ps in res['rows']: - if 'notification_c' in ps['process_name']: - msg = "A Notification service type schedule already exists." - raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) - # check that dispatcher service is not already registered, right now dispatcher service LIMIT to 1 - elif service_type == 'dispatcher': - res = await check_schedule_entry(storage) - for ps in res['rows']: - if 'dispatcher_c' in ps['process_name']: - msg = "A Dispatcher service type schedule already exists." - raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) - # check the schedule entry for BucketStorage service as LIMIT to 1 - elif service_type == 'bucketstorage': - res = await check_schedule_entry(storage) - for ps in res['rows']: - if 'bucket_storage_c' in ps['process_name']: - msg = "A BucketStorage service type schedule already exists." - raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) - # check that management service is not already registered, right now management service LIMIT to 1 - elif service_type == 'management': - res = await check_schedule_entry(storage) - for ps in res['rows']: - if 'management' in ps['process_name']: - msg = "A Management service type schedule already exists." - raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) - elif service_type == 'south' or service_type == 'north': + if service_type == 'south' or service_type == 'north': try: # Create a configuration category from the configuration defined in the plugin category_desc = plugin_config['plugin']['description'] @@ -859,6 +819,12 @@ async def add_service(request): msg = "Failed to create plugin configuration while adding service." _logger.error(ex, msg) raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + res = await check_schedule_entry(storage) + for ps in res['rows']: + if process_name in ps['process_name']: + msg = "A '{}' service schedule already exists.".format(name) + raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) # If all successful then lastly add a schedule to run the new service at startup try: @@ -1130,3 +1096,4 @@ async def issueOTPToken(request): startToken = ServiceRegistry.issueStartupToken(service_name) return web.json_response({"startupToken": startToken}) + diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 8c9003bad..242ae088f 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -837,7 +837,7 @@ INSERT INTO fledge.scheduled_processes (name, script) VALUES ('restore', '["task -- South, Notification, North Tasks -- INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); --INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'pipeline_c', '["services/pipeline_c"]', 90 ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); From 468d9ede8f0aa78626473f59c0ca1b07d546311f Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 11 Sep 2025 15:50:48 +0530 Subject: [PATCH 05/16] fledge status script updates Signed-off-by: ashish-jabble --- scripts/fledge | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/scripts/fledge b/scripts/fledge index ad963deb1..ca7ad339f 100755 --- a/scripts/fledge +++ b/scripts/fledge @@ -542,13 +542,32 @@ fledge_status() { # Show Services fledge_log "info" "=== Fledge services:" "outonly" "pretty" fledge_log "info" "fledge.services.core" "outonly" "pretty" - ps -ef | grep "fledge.services.storage" | grep -v 'grep' | grep -v awk | awk '{print "fledge.services.storage " $9 " " $10}' || true - ps -ef | grep "fledge.services.south " |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.south "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true - ps -ef | grep "fledge.services.north " |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.north "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true - ps -ef | grep "fledge.services.notification" |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.notification "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true - ps -ef | grep "fledge.services.dispatcher" |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.dispatcher "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true - ps -ef | grep "fledge.services.bucket" |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.bucket "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true - ps -ef | grep "fledge.services.pipeline " |grep -v python3| grep -v 'grep' | grep -v awk | awk '{printf "fledge.services.pipeline "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | sed -e 's/--token.*--name/--name/g' || true + + show_service_processes() { + local service_name="$1" + local has_name_param="$2" + + if [ "$has_name_param" = "true" ]; then + # Services with parameter + ps -ef | grep "$service_name" | grep -v python3 | grep -v 'grep' | grep -v awk | \ + awk -v svc="$service_name" '{printf svc " "; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' | \ + sed -e 's/--token.*--name/--name/g' || true + else + # Services without parameter (like storage) + ps -ef | grep "$service_name" | grep -v 'grep' | grep -v awk | \ + awk -v svc="$service_name" '{print svc " " $9 " " $10}' || true + fi + } + + # Show Services in C code + for service_name in $(ls ${FLEDGE_ROOT}/services) + do + if [ "$service_name" = "fledge.services.storage" ]; then + show_service_processes "$service_name" "false" + else + show_service_processes "$service_name" "true" + fi + done # Show Python services (except core) ps -ef | grep -o 'python3 -m fledge.services.*' | grep -o 'fledge.services.*' | grep -v 'fledge.services.core' | grep -v 'fledge.services\.\*' | sed -e 's/--token.*--name/--name/g' || true @@ -557,7 +576,7 @@ fledge_status() { ps -ef | grep -v 'cpulimit*' | grep -o 'python3 -m fledge.tasks.*' | grep -o 'fledge.tasks.*' | grep -v 'fledge.tasks\.\*' || true # Show Tasks in C code - for task_name in `ls ${FLEDGE_ROOT}/tasks` + for task_name in $(ls ${FLEDGE_ROOT}/tasks) do ps -ef | grep "./tasks/$task_name" | grep -v python3 | grep -v grep | grep -v awk | awk '{printf "tasks/'$task_name' " ; for(i=9;i<=NF;++i) printf $i FS; printf "\n"}' || true done From 1082484002244eae45fd49af9dd8f478dd9a70db Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 11 Sep 2025 17:37:56 +0530 Subject: [PATCH 06/16] service registry to accept any Type Signed-off-by: ashish-jabble --- python/fledge/common/service_record.py | 3 ++- python/fledge/services/core/api/service.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/fledge/common/service_record.py b/python/fledge/common/service_record.py index b5279f43e..7036f7016 100644 --- a/python/fledge/common/service_record.py +++ b/python/fledge/common/service_record.py @@ -53,7 +53,8 @@ class InvalidServiceStatus(Exception): def __init__(self, s_id, s_name, s_type, s_protocol, s_address, s_port, m_port): self._id = s_id self._name = s_name - self._type = self.valid_type(s_type) # check with ServiceRecord.Type, if not a valid type raise error + # FIXME: We need to remove this once we have a valid type check in the ServiceRecord.Type + self._type = s_type #self.valid_type(s_type) # check with ServiceRecord.Type, if not a valid type raise error self._protocol = s_protocol self._address = s_address self._port = None diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index a6ee20b85..c19abfb58 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -698,7 +698,7 @@ async def add_service(request): if service_type is None: raise web.HTTPBadRequest(reason='Missing type property in payload.') - service_type = str(service_type).lower() + #service_type = str(service_type).lower() if plugin is None and service_type in ('south', 'north'): raise web.HTTPBadRequest(reason='Missing plugin property for type {} in payload.'.format(service_type)) if plugin and utils.check_reserved(plugin) is False: From 8a610f642ba1f1ecac057bd12268363493ba5402 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Fri, 12 Sep 2025 11:40:37 +0530 Subject: [PATCH 07/16] pipeline service type deletion from core Signed-off-by: ashish-jabble --- python/fledge/common/service_record.py | 1 - python/fledge/services/core/api/service.py | 1 - .../services/core/scheduler/scheduler.py | 8 ++--- scripts/plugins/storage/postgres/init.sql | 18 +++++------- scripts/plugins/storage/sqlite/init.sql | 13 ++++----- scripts/plugins/storage/sqlitelb/init.sql | 19 +++++------- scripts/services/pipeline_c | 29 ------------------- 7 files changed, 23 insertions(+), 66 deletions(-) delete mode 100755 scripts/services/pipeline_c diff --git a/python/fledge/common/service_record.py b/python/fledge/common/service_record.py index 7036f7016..86e5f57b9 100644 --- a/python/fledge/common/service_record.py +++ b/python/fledge/common/service_record.py @@ -29,7 +29,6 @@ class Type(IntEnum): Northbound = 6 Dispatcher = 7 BucketStorage = 8 - Pipeline = 9 class Status(IntEnum): """Enumeration for Service Status""" diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index c19abfb58..32911905e 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -603,7 +603,6 @@ async def add_service(request): curl -sX POST http://localhost:8081/fledge/service -d '{"name": "BucketServer", "type": "bucketstorage", "enabled": true}' | jq curl -X POST http://localhost:8081/fledge/service -d '{"name": "HTC", "plugin": "httpc", "type": "north", "enabled": true}' | jq curl -sX POST http://localhost:8081/fledge/service -d '{"name": "HT", "plugin": "http_north", "type": "north", "enabled": true, "config": {"verifySSL": {"value": "false"}}}' | jq - curl -sX POST http://localhost:8081/fledge/service -d '{"name": "Pipeline-Ingest", "type": "pipeline", "enabled": true}' | jq curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-notification"}' curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-dispatcher"}' diff --git a/python/fledge/services/core/scheduler/scheduler.py b/python/fledge/services/core/scheduler/scheduler.py index 7f30ac0a4..37dffe19f 100644 --- a/python/fledge/services/core/scheduler/scheduler.py +++ b/python/fledge/services/core/scheduler/scheduler.py @@ -300,14 +300,12 @@ def _get_delay_in_sec(pname): val = 3 elif pname == 'notification_c': val = 5 - elif pname == 'pipeline_c': - val = 7 elif pname == 'south_c': - val = 9 + val = 7 elif pname == 'north_C': - val = 11 + val = 9 else: - val = 14 + val = 12 return val # This check is necessary only if significant time can elapse between "await" and diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 25679d84f..1f5c0c51a 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -1064,6 +1064,12 @@ INSERT INTO fledge.statistics ( key, description, value, previous_value ) -- Use this to create guids: https://www.uuidgenerator.net/version1 */ -- Weekly repeat for timed schedules: set schedule_interval to 168:00:00 +-- South/North Services +-- +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); + + -- Core Tasks -- INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'purge', '["tasks/purge"]' ); @@ -1078,15 +1084,9 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'update checker INSERT INTO fledge.scheduled_processes (name, script) VALUES ('backup', '["tasks/backup"]' ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ('restore', '["tasks/restore"]' ); --- South, Notification, North Tasks +-- North Tasks -- -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'notification_c', '["services/notification_c"]', 30 ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'dispatcher_c', '["services/dispatcher_c"]', 20 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'bucket_storage_c', '["services/bucket_storage_c"]', 10 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'pipeline_c', '["services/pipeline_c"]', 90 ); -- Automation script tasks -- @@ -1099,10 +1099,6 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'automation_scr -- Weekly repeat for timed schedules: set schedule_interval to 168:00:00 -- - --- Core Tasks --- - -- Purge INSERT INTO fledge.schedules ( id, schedule_name, process_name, schedule_type, schedule_time, schedule_interval, exclusive, enabled ) diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 242ae088f..865333b5e 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -820,6 +820,11 @@ INSERT INTO fledge.statistics ( key, description, value, previous_value ) -- Use this to create guids: https://www.uuidgenerator.net/version1 */ -- Weekly repeat for timed schedules: set schedule_interval to 168:00:00 +-- South/North Services +-- +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); + -- Core Tasks -- INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'purge', '["tasks/purge"]' ); @@ -834,11 +839,8 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'update checker INSERT INTO fledge.scheduled_processes (name, script) VALUES ('backup', '["tasks/backup"]' ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ('restore', '["tasks/restore"]' ); --- South, Notification, North Tasks +-- North Tasks -- -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'pipeline_c', '["services/pipeline_c"]', 90 ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -- Automation script tasks @@ -853,9 +855,6 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'automation_scr -- --- Core Tasks --- - -- Purge INSERT INTO fledge.schedules ( id, schedule_name, process_name, schedule_type, schedule_time, schedule_interval, exclusive, enabled ) diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 9f9f147a9..c307a1589 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -820,6 +820,11 @@ INSERT INTO fledge.statistics ( key, description, value, previous_value ) -- Use this to create guids: https://www.uuidgenerator.net/version1 */ -- Weekly repeat for timed schedules: set schedule_interval to 168:00:00 +-- South/North Services +-- +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); +INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); + -- Core Tasks -- INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'purge', '["tasks/purge"]' ); @@ -834,15 +839,9 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'update checker INSERT INTO fledge.scheduled_processes (name, script) VALUES ('backup', '["tasks/backup"]' ); INSERT INTO fledge.scheduled_processes (name, script) VALUES ('restore', '["tasks/restore"]' ); --- South, Notification, North Tasks +-- North Tasks -- -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'south_c', '["services/south_c"]', 100 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'notification_c', '["services/notification_c"]', 30 ); -INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'north_C', '["services/north_C"]', 200 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'dispatcher_c', '["services/dispatcher_c"]', 20 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'bucket_storage_c', '["services/bucket_storage_c"]', 10 ); -INSERT INTO fledge.scheduled_processes (name, script, priority) VALUES ( 'pipeline_c', '["services/pipeline_c"]', 90 ); +INSERT INTO fledge.scheduled_processes (name, script) VALUES ( 'north_c', '["tasks/north_c"]' ); -- Automation script tasks @@ -856,10 +855,6 @@ INSERT INTO fledge.scheduled_processes ( name, script ) VALUES ( 'automation_scr -- Weekly repeat for timed schedules: set schedule_interval to 168:00:00 -- - --- Core Tasks --- - -- Purge INSERT INTO fledge.schedules ( id, schedule_name, process_name, schedule_type, schedule_time, schedule_interval, exclusive, enabled ) diff --git a/scripts/services/pipeline_c b/scripts/services/pipeline_c deleted file mode 100755 index 3af034d32..000000000 --- a/scripts/services/pipeline_c +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash -# Run a Fledge pipeline service written in C/C++ -if [ "${FLEDGE_ROOT}" = "" ]; then - FLEDGE_ROOT=/usr/local/fledge -fi - -if [ ! -d "${FLEDGE_ROOT}" ]; then - logger "Fledge home directory missing or incorrectly set environment." - exit 1 -fi - - -# startup with delay -delay() { - for ARG in "$@"; - do - PARAM=$(echo $ARG | cut -f1 -d=) - if [ $PARAM = '--delay' ]; then - PARAM_LENGTH=${#PARAM} - VALUE="${ARG:$PARAM_LENGTH+1}" - sleep $VALUE - break - fi - done -} - -cd "${FLEDGE_ROOT}/services" -delay "$@" -./fledge.services.pipeline "$@" From 8e8a68c5c8e488ad57de3289f9ff526664ca9a2a Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Fri, 12 Sep 2025 11:59:32 +0530 Subject: [PATCH 08/16] Service Record Type removal Signed-off-by: ashish-jabble --- python/fledge/common/service_record.py | 25 ++----------------- .../fledge/services/core/api/notification.py | 15 +++++++---- python/fledge/services/core/api/service.py | 5 ---- .../fledge/common/test_service_record.py | 15 ----------- .../fledge/services/core/api/test_service.py | 11 -------- 5 files changed, 12 insertions(+), 59 deletions(-) diff --git a/python/fledge/common/service_record.py b/python/fledge/common/service_record.py index 86e5f57b9..67fb5ad73 100644 --- a/python/fledge/common/service_record.py +++ b/python/fledge/common/service_record.py @@ -15,20 +15,7 @@ class ServiceRecord(object): - """Used to information regarding a registered microservice. - """ - - class Type(IntEnum): - """Enumeration for Service Types""" - - Storage = 1 - Core = 2 - Southbound = 3 - Notification = 4 - Management = 5 - Northbound = 6 - Dispatcher = 7 - BucketStorage = 8 + """Used to information regarding a registered microservice.""" class Status(IntEnum): """Enumeration for Service Status""" @@ -39,9 +26,6 @@ class Status(IntEnum): Unresponsive = 4 Restart = 5 - class InvalidServiceType(Exception): - # TODO: tell allowed service types? - pass class InvalidServiceStatus(Exception): # TODO: tell allowed service status? @@ -52,8 +36,7 @@ class InvalidServiceStatus(Exception): def __init__(self, s_id, s_name, s_type, s_protocol, s_address, s_port, m_port): self._id = s_id self._name = s_name - # FIXME: We need to remove this once we have a valid type check in the ServiceRecord.Type - self._type = s_type #self.valid_type(s_type) # check with ServiceRecord.Type, if not a valid type raise error + self._type = s_type self._protocol = s_protocol self._address = s_address self._port = None @@ -72,7 +55,3 @@ def __repr__(self): def __str__(self): return self.__repr__() - def valid_type(self, s_type): - if s_type not in ServiceRecord.Type.__members__: - raise ServiceRecord.InvalidServiceType - return s_type diff --git a/python/fledge/services/core/api/notification.py b/python/fledge/services/core/api/notification.py index 2b5392674..cafe98528 100644 --- a/python/fledge/services/core/api/notification.py +++ b/python/fledge/services/core/api/notification.py @@ -45,7 +45,8 @@ def __init__(self, message="Failed to fetch notification plugins."): async def fetch_plugins(): """ Fetch all rule and delivery plugins from notification service """ try: - notification_service = ServiceRegistry.get(s_type=ServiceRecord.Type.Notification.name) + # FIXME: Fetch type from Service Info + notification_service = ServiceRegistry.get(s_type="Notification") _address, _port = notification_service[0]._address, notification_service[0]._port except service_registry_exceptions.DoesNotExist: raise ValueError("No Notification service available.") @@ -197,7 +198,8 @@ async def post_notification(request): curl -X POST http://localhost:8081/fledge/notification -d '{"name": "Test Notification", "description":"Test Notification", "rule": "threshold", "channel": "email", "notification_type": "one shot", "enabled": false, "rule_config": {}, "delivery_config": {}}' """ try: - notification_service = ServiceRegistry.get(s_type=ServiceRecord.Type.Notification.name) + # FIXME: Fetch type from Service Info + notification_service = ServiceRegistry.get(s_type="Notification") _address, _port = notification_service[0]._address, notification_service[0]._port except service_registry_exceptions.DoesNotExist: raise web.HTTPNotFound(reason="No Notification service available.") @@ -331,7 +333,8 @@ async def put_notification(request): curl -X PUT http://localhost:8081/fledge/notification/ -d '{"description":"Test Notification", "rule": "threshold", "channel": "email", "notification_type": "one shot", "enabled": false, "rule_config": {}, "delivery_config": {}}' """ try: - notification_service = ServiceRegistry.get(s_type=ServiceRecord.Type.Notification.name) + # FIXME: Fetch type from Service Info + notification_service = ServiceRegistry.get(s_type="Notification") _address, _port = notification_service[0]._address, notification_service[0]._port except service_registry_exceptions.DoesNotExist: raise web.HTTPNotFound(reason="No Notification service available.") @@ -467,7 +470,8 @@ async def delete_notification(request): curl -X DELETE http://localhost:8081/fledge/notification/ """ try: - notification_service = ServiceRegistry.get(s_type=ServiceRecord.Type.Notification.name) + # FIXME: Fetch type from Service Info + notification_service = ServiceRegistry.get(s_type="Notification") _address, _port = notification_service[0]._address, notification_service[0]._port except service_registry_exceptions.DoesNotExist: raise web.HTTPNotFound(reason="No Notification service available.") @@ -756,7 +760,8 @@ async def delete_delivery_channel(request: web.Request) -> web.Response: """ try: - notification_service = ServiceRegistry.get(s_type=ServiceRecord.Type.Notification.name) + # FIXME: Fetch type from Service Info + notification_service = ServiceRegistry.get(s_type="Notification") _address, _port = notification_service[0]._address, notification_service[0]._port except service_registry_exceptions.DoesNotExist: diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 32911905e..23fe02a95 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -459,11 +459,6 @@ async def get_health(request): try: if 'type' in request.query and request.query['type'] != '': _type = request.query['type'] - svc_type_members = ServiceRecord.Type._member_names_ - is_type_exists = _type in svc_type_members - if not is_type_exists: - raise ValueError('{} is not a valid service type. Supported types are {}'.format(_type, - svc_type_members)) response = get_service_records(_type) else: response = get_service_records() diff --git a/tests/unit/python/fledge/common/test_service_record.py b/tests/unit/python/fledge/common/test_service_record.py index bb50f2684..7dc70c740 100644 --- a/tests/unit/python/fledge/common/test_service_record.py +++ b/tests/unit/python/fledge/common/test_service_record.py @@ -24,14 +24,6 @@ def test_slots(self): assert ['_id', '_name', '_type', '_protocol', '_address', '_port', '_management_port', '_status', '_debug' ] == slots - @pytest.mark.parametrize("name, value", [ - ('Storage', 1), ('Core', 2), ('Southbound', 3), ('Notification', 4), ('Management', 5), ('Northbound', 6), - ('Dispatcher', 7), ('BucketStorage', 8), ('Pipeline', 9) - ]) - def test_types(self, name, value): - assert 9 == len(ServiceRecord.Type) - assert name == ServiceRecord.Type(value).name - @pytest.mark.parametrize("name, value", [ ('Running', 1), ('Shutdown', 2), ('Failed', 3), ('Unresponsive', 4), ('Restart', 5) ]) @@ -68,10 +60,3 @@ def test_init_with_valid_type(self, s_type): assert "aName" == obj._name assert s_type == obj._type - @pytest.mark.parametrize("s_type", [ - "", None, 12, "southbound", "south", "South", "North", "Filter", "External" - ]) - def test_init_with_invalid_type(self, s_type): - with pytest.raises(Exception) as ex: - ServiceRecord("some id", "aName", s_type, "http", "127.0.0.1", None, 1234) - assert ex.type is ServiceRecord.InvalidServiceType diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index b851848dc..357bcbfda 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -178,17 +178,6 @@ async def test_get_health(self, mocker, client): } assert 11 == log_patch_info.call_count - @pytest.mark.parametrize("_type", ["blah", 1, "storage"]) - async def test_bad_get_service_with_type(self, client, _type): - svc_type_members = ServiceRecord.Type._member_names_ - expected_msg = "{} is not a valid service type. Supported types are {}".format(_type, svc_type_members) - resp = await client.get('/fledge/service?type={}'.format(_type)) - assert 400 == resp.status - assert expected_msg == resp.reason - result = await resp.text() - json_response = json.loads(result) - assert {"message": expected_msg} == json_response - async def test_get_service_with_type_not_found(self, client, _type="Notification"): expected_msg = "No record found for {} service type".format(_type) resp = await client.get('/fledge/service?type={}'.format(_type)) From 302c9eeb48c22bd6d29e565213efc52a51af8087 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Fri, 12 Sep 2025 15:23:23 +0530 Subject: [PATCH 09/16] dispatcher service handling in core startup Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 11 +- python/fledge/services/core/server.py | 111 ++++++++++++--------- 2 files changed, 67 insertions(+), 55 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 23fe02a95..eb85c6dff 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -593,18 +593,15 @@ async def add_service(request): :Example: curl -X POST http://localhost:8081/fledge/service -d '{"name": "DHT 11", "plugin": "dht11", "type": "south", "enabled": true}' curl -sX POST http://localhost:8081/fledge/service -d '{"name": "Sine", "plugin": "sinusoid", "type": "south", "enabled": true, "config": {"dataPointsPerSec": {"value": "10"}}}' | jq - curl -X POST http://localhost:8081/fledge/service -d '{"name": "NotificationServer", "type": "notification", "enabled": true}' | jq - curl -sX POST http://localhost:8081/fledge/service -d '{"name": "DispatcherServer", "type": "dispatcher", "enabled": true}' | jq - curl -sX POST http://localhost:8081/fledge/service -d '{"name": "BucketServer", "type": "bucketstorage", "enabled": true}' | jq + curl -X POST http://localhost:8081/fledge/service -d '{"name": "NotificationServer", "type": "Notification", "enabled": true}' | jq + curl -sX POST http://localhost:8081/fledge/service -d '{"name": "DispatcherServer", "type": "Dispatcher", "enabled": true}' | jq curl -X POST http://localhost:8081/fledge/service -d '{"name": "HTC", "plugin": "httpc", "type": "north", "enabled": true}' | jq curl -sX POST http://localhost:8081/fledge/service -d '{"name": "HT", "plugin": "http_north", "type": "north", "enabled": true, "config": {"verifySSL": {"value": "false"}}}' | jq curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-notification"}' curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-dispatcher"}' - curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-bucketStorage"}' - curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-notification", "version":"1.6.0"}' - curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-dispatcher", "version":"1.9.1"}' - curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-bucketStorage", "version":"1.9.2"}' + curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-notification", "version":"3.1.0"}' + curl -sX POST http://localhost:8081/fledge/service?action=install -d '{"format":"repository", "name": "fledge-service-dispatcher", "version":"3.1.0"}' """ try: diff --git a/python/fledge/services/core/server.py b/python/fledge/services/core/server.py index 754367683..216bef5e0 100755 --- a/python/fledge/services/core/server.py +++ b/python/fledge/services/core/server.py @@ -1174,15 +1174,9 @@ def _start_core(cls, loop=None): # Start Alert Manager loop.run_until_complete(cls._get_alerts()) - # If dispatcher installation: - # a) not found then add it as a StartUp service - # b) found then check the status of its schedule and take action - is_dispatcher = loop.run_until_complete(cls.is_dispatcher_running(cls._storage_client_async)) - if not is_dispatcher: - _logger.info("Dispatcher service installation found on the system, but not in running state. " - "Therefore, starting the service...") - loop.run_until_complete(cls.add_and_enable_dispatcher()) - _logger.info("Dispatcher service started.") + # Check dispatcher service installation and status + loop.run_until_complete(cls.check_dispatcher_service()) + # dryrun execution of all the tasks that are installed but have schedule type other than STARTUP schedule_list = loop.run_until_complete(cls.scheduler.get_schedules()) for sch in schedule_list: @@ -1578,6 +1572,7 @@ async def restart_service(cls, request): For BucketStorage type we have used proxy map for interfacing REST API endpoints to Microservice service API endpoints. Therefore we need to clear the proxy map on restart. """ + # FIXME: Fetch type from Service Info if services[0]._type == "BucketStorage": cls._API_PROXIES = {} except ValueError as err: @@ -2248,48 +2243,68 @@ async def refresh_token(cls, request): raise web.HTTPBadRequest(reason=msg, body=json.dumps({"error": msg})) @classmethod - async def is_dispatcher_running(cls, storage): + async def check_dispatcher_service(cls): + """ + Check dispatcher service installation and status. + Handles both cases: + a) Service schedule not found - add it as a StartUp service + b) Service schedule found but not enabled - enable it + """ from fledge.services.core.api import service as service_api - from fledge.common.storage_client.payload_builder import PayloadBuilder - - # Find the dispatcher service installation - get_svc = service_api.get_service_installed() - # if installation found: - if 'dispatcher' in get_svc: - payload = PayloadBuilder().SELECT("id", "schedule_name", "process_name", "enabled").payload() - res = await storage.query_tbl_with_payload('schedules', payload) - for sch in res['rows']: - if sch['process_name'] == 'dispatcher_c' and sch['enabled'] == 'f': - _logger.info("Dispatcher service found but not in enabled state. " - "Therefore, {} schedule name is enabled".format(sch['schedule_name'])) - # reset process_script priority for the service - cls.scheduler._process_scripts['dispatcher_c'] = ( - cls.scheduler._process_scripts['dispatcher_c'][0], 999) - await cls.scheduler.enable_schedule(uuid.UUID(sch["id"])) - return True - elif sch['process_name'] == 'dispatcher_c' and sch['enabled'] == 't': - # As such no action required for the case - return True - # If installation not found: - return False - return True - - @classmethod - async def add_and_enable_dispatcher(cls): - import datetime as dt from fledge.services.core.scheduler.entities import StartUpSchedule - name = "dispatcher" - process_name = 'dispatcher_c' - is_enabled = True - schedule = StartUpSchedule() - schedule.name = name - schedule.process_name = process_name - schedule.repeat = dt.timedelta(0) - schedule.exclusive = True - schedule.enabled = False - # Save schedule - await cls.scheduler.save_schedule(schedule, is_enabled) + try: + installed_service_name = 'dispatcher' + # Check if dispatcher service is installed + get_svc = service_api.get_service_installed() + if installed_service_name not in get_svc: + return + # Get dispatcher service info using the service API mechanism + service_info = await service_api._fetch_service_info(installed_service_name) + if not service_info or 'process' not in service_info: + _logger.warning(f"Could not fetch {installed_service_name} service information.") + return + process_name = service_info['process'] + # Check if dispatcher service is running/enabled + payload = payload_builder.PayloadBuilder().SELECT("id", "schedule_name", "process_name", "enabled").WHERE( + ["process_name", "=", process_name]).payload() + res = await cls._storage_client_async.query_tbl_with_payload('schedules', payload) + if res['rows']: + sch = res['rows'][0] + if sch['enabled'] == 'f': + _logger.info(f"Dispatcher service found but not in enabled state. " + f"Therefore, {sch['schedule_name']} service is being enabled.") + # Reset process_script priority for the service + if process_name in cls.scheduler._process_scripts: + cls.scheduler._process_scripts[process_name] = ( + cls.scheduler._process_scripts[process_name][0], 999) + await cls.scheduler.enable_schedule(uuid.UUID(sch["id"])) + _logger.info(f"{sch['schedule_name']} service enabled successfully.") + return + else: + return + # If no schedule found, create and enable dispatcher service + _logger.info("Dispatcher service installation found on the system, but not in running state. " + "Therefore, starting the service...") + # Create scheduled process entry for the service + column_name = {"name": process_name, "script": service_info['process_script']} + payload = payload_builder.PayloadBuilder().INSERT(**column_name).payload() + await cls._storage_client_async.insert_into_tbl("scheduled_processes", payload) + await cls.scheduler._get_process_scripts() + # Startup Schedule + name = service_info['name'] + is_enabled = True + schedule = StartUpSchedule() + schedule.name = name + schedule.process_name = process_name + schedule.repeat = timedelta(0) + schedule.exclusive = True + schedule.enabled = False + # Save schedule + await cls.scheduler.save_schedule(schedule, is_enabled) + _logger.info(f"{name} service started successfully.") + except Exception as ex: + _logger.error(f"Error checking dispatcher service: {str(ex)}") @classmethod def get_token_common(cls, request): From 8ec829d1726ece0a8a0b9c30e8bb19ea7cad78d3 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 16 Sep 2025 17:55:55 +0530 Subject: [PATCH 10/16] unit tests updated Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 7 +- .../microservice_management/test_instance.py | 5 - .../fledge/services/core/api/test_service.py | 489 +++++++----------- 3 files changed, 178 insertions(+), 323 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index eb85c6dff..1239d8aa3 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -688,8 +688,6 @@ async def add_service(request): raise web.HTTPBadRequest(reason="'{}' is reserved for Fledge and can not be used as service name!".format(name)) if service_type is None: raise web.HTTPBadRequest(reason='Missing type property in payload.') - - #service_type = str(service_type).lower() if plugin is None and service_type in ('south', 'north'): raise web.HTTPBadRequest(reason='Missing plugin property for type {} in payload.'.format(service_type)) if plugin and utils.check_reserved(plugin) is False: @@ -753,8 +751,7 @@ async def add_service(request): priority = service_info['startup_priority'] break if process_name is None or script is None or priority is None: - message = f"""Either the '{service_type}' service type is invalid or the '{service_type.capitalize()}' - service is not installed correctly.""" + message = f"The '{service_type}' service has not been installed correctly." raise web.HTTPNotFound(reason=message, body=json.dumps({"message": message})) storage = connect.get_storage_async() config_mgr = ConfigurationManager(storage) @@ -814,7 +811,7 @@ async def add_service(request): res = await check_schedule_entry(storage) for ps in res['rows']: if process_name in ps['process_name']: - msg = "A '{}' service schedule already exists.".format(name) + msg = f"A {process_name} service type schedule already exists." raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) # If all successful then lastly add a schedule to run the new service at startup diff --git a/tests/unit/python/fledge/services/common/microservice_management/test_instance.py b/tests/unit/python/fledge/services/common/microservice_management/test_instance.py index 939196c26..99925dd6a 100644 --- a/tests/unit/python/fledge/services/common/microservice_management/test_instance.py +++ b/tests/unit/python/fledge/services/common/microservice_management/test_instance.py @@ -78,11 +78,6 @@ async def test_duplicate_address_and_mgt_port_registration(self): Service.register("StorageService2", "Storage", "127.0.0.1", 9998, 1999) assert "AlreadyExistsWithTheSameAddressAndManagementPort" in str(excinfo) - async def test_register_wrong_type(self): - with pytest.raises(ServiceRecord.InvalidServiceType) as excinfo: - Service.register("StorageService1", "WrongType", "127.0.0.1", 9999, 1999) - assert "InvalidServiceType" in str(excinfo) - async def test_register_invalid_port(self): with pytest.raises(NonNumericPortError) as excinfo: Service.register("StorageService2", "Storage", "127.0.0.1", "808a", 1999) diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index 357bcbfda..f21c4cc9d 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -235,11 +235,10 @@ async def test_get_service_with_type(self, mocker, client, _type, svc_record_cou ('{"name": "test", "plugin": "dht11", "type": "south", "enabled": "0"}', 400, 'Only "true", "false", true, false are allowed for value of enabled.'), ('{"name": "test", "plugin": "dht11"}', 400, "Missing type property in payload."), - ('{"name": "test", "plugin": "dht11", "type": "blah"}', 400, - "Only south, north, notification, management, dispatcher, bucketstorage and pipeline types are supported."), ('{"name": "test", "type": "south"}', 400, "Missing plugin property for type south in payload.") ]) async def test_add_service_with_bad_params(self, client, code, payload, message): + data = json.loads(payload) resp = await client.post('/fledge/service', data=payload) assert code == resp.status assert message == resp.reason @@ -356,47 +355,35 @@ async def q_result(*arg): assert {"message": msg} == json_response patch_get_cat_info.assert_called_once_with(category_name=data['name']) - p1 = '{"name": "furnace4", "type": "south", "plugin": "dht11"}' - p2 = '{"name": "furnace4", "type": "south", "plugin": "dht11", "enabled": false}' - p3 = '{"name": "furnace4", "type": "south", "plugin": "dht11", "enabled": true}' - p4 = '{"name": "furnace4", "type": "south", "plugin": "dht11", "enabled": "true"}' - p5 = '{"name": "furnace4", "type": "south", "plugin": "dht11", "enabled": "false"}' - - @pytest.mark.parametrize("payload", [p1, p2, p3, p4, p5]) - async def test_add_service(self, client, payload): - data = json.loads(payload) - + @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, enabled", [ + ("furnace4", "south", "south_c", "[\"services/south_c\"]", 100, None), + ("Sine Wave", "south", "south_c", "[\"services/south_c\"]", 100, "true"), + ("PI", "north", "north_C", "[\"services/north_C\"]", 200, "false"), + ("PI2PI", "north", "north_C", "[\"services/north_C\"]", 100, "true"), + ]) + async def test_add_service(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority, enabled): async def async_mock_get_schedule(): schedule = StartUpSchedule() schedule.schedule_id = '2129cc95-c841-441a-ad39-6469a87dbc8b' return schedule - async def q_result(*arg): - table = arg[0] - _payload = arg[1] - if table == 'scheduled_processes': - assert {'return': ['name'], 'where': {'column': 'name', 'condition': '=', 'value': 'south_c', - 'and': {'column': 'script', 'condition': '=', - 'value': '[\"services/south_c\"]'}} - } == json.loads(_payload) - return {'count': 0, 'rows': []} - if table == 'schedules': - assert {'return': ['schedule_name'], 'where': {'column': 'schedule_name', 'condition': '=', - 'value': 'furnace4'}} == json.loads(_payload) - return {'count': 0, 'rows': []} - + payload_dict = {"name": svc_name, "type": svc_type, "plugin": "plugin"} + if enabled is not None: + payload_dict["enabled"] = enabled + payload = json.dumps(payload_dict) + data = json.loads(payload) expected_insert_resp = {'rows_affected': 1, "response": "inserted"} mock_plugin_info = { - 'name': "furnace4", + 'name': "Plugin", 'version': "1.1", - 'type': "south", + 'type': svc_type, 'interface': "1.0", 'mode': "async", 'config': { 'plugin': { - 'description': "DHT11 plugin", + 'description': "Plugin description", 'type': 'string', - 'default': 'dht11' + 'default': "plugin" } } } @@ -407,41 +394,38 @@ async def q_result(*arg): _rv2 = await self.async_mock(expected_insert_resp) _rv3 = await self.async_mock("") _rv4 = await async_mock_get_schedule() + _rv5 = await self.async_mock(0) + _rv6 = await self.async_mock({'count': 1}) with patch.object(common, 'load_and_fetch_python_plugin_info', side_effect=[mock_plugin_info]): with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(c_mgr, 'get_category_all_items', return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', return_value=_rv2) \ - as insert_table_patch: + with patch.object(service, 'check_schedules', return_value=_rv5) as patch_schedules: + with patch.object(service, 'check_scheduled_processes', return_value=_rv6) as patch_scheduled_processes: with patch.object(c_mgr, 'create_category', return_value=_rv2) as patch_create_cat: - with patch.object(c_mgr, 'create_child_category', return_value=_rv2) \ - as patch_create_child_cat: - with patch.object(server.Server.scheduler, 'save_schedule', - return_value=_rv3) as patch_save_schedule: - with patch.object(server.Server.scheduler, 'get_schedule_by_name', - return_value=_rv4) as patch_get_schedule: + with patch.object(c_mgr, 'create_child_category', return_value=_rv2) as patch_create_child_cat: + with patch.object(server.Server.scheduler, 'save_schedule', return_value=_rv3) as patch_save_schedule: + with patch.object(server.Server.scheduler, 'get_schedule_by_name', return_value=_rv4) as patch_get_schedule: resp = await client.post('/fledge/service', data=payload) server.Server.scheduler = None + print(resp.reason) assert 200 == resp.status result = await resp.text() json_response = json.loads(result) assert {'id': '2129cc95-c841-441a-ad39-6469a87dbc8b', - 'name': 'furnace4'} == json_response - patch_get_schedule.assert_called_once_with(data['name']) + 'name': svc_name} == json_response + patch_get_schedule.assert_called_once_with(svc_name) patch_save_schedule.assert_called_once() - patch_create_child_cat.assert_called_once_with('South', ['furnace4']) + patch_create_child_cat.assert_called_once_with(svc_type.capitalize(), [svc_name]) assert 2 == patch_create_cat.call_count - patch_create_cat.assert_called_with('South', {}, 'South microservices', True) - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - p = json.loads(args[1]) - assert {'name': 'south_c', 'script': '["services/south_c"]'} == p - patch_get_cat_info.assert_called_once_with(category_name='furnace4') + patch_create_cat.assert_called_with(svc_type.capitalize(), {}, f"{svc_type.capitalize()} microservices", True) + patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_process, svc_script) + patch_schedules.assert_called_once_with(storage_client_mock, svc_name) + patch_get_cat_info.assert_called_once_with(category_name=svc_name) - p1 = '{"name": "DispatcherServer", "type": "dispatcher"}' - p2 = '{"name": "NotificationServer", "type": "notification"}' - p3 = '{"name": "ManagementServer", "type": "management"}' - p4 = '{"name": "BucketServer", "type": "bucketstorage"}' + p1 = '{"name": "DispatcherServer", "type": "Dispatcher"}' + p2 = '{"name": "NotificationServer", "type": "Notification"}' + p3 = '{"name": "ManagementServer", "type": "Management"}' + p4 = '{"name": "BucketServer", "type": "BucketStorage"}' @pytest.mark.parametrize("payload", [p1, p2, p3, p4]) async def test_bad_external_service(self, client, payload): @@ -449,143 +433,110 @@ async def test_bad_external_service(self, client, payload): with patch('os.path.exists', return_value=False): resp = await client.post('/fledge/service', data=payload) assert 404 == resp.status - msg = '{} service is not installed correctly.'.format(data['type'].capitalize()) + msg = f"The '{data['type']}' service has not been installed correctly." assert msg == resp.reason result = await resp.text() json_response = json.loads(result) assert {"message": msg} == json_response - p1 = '{"name": "NotificationServer", "type": "notification"}' - p2 = '{"name": "NotificationServer", "type": "notification", "enabled": false}' - p3 = '{"name": "NotificationServer", "type": "notification", "enabled": true}' - p4 = '{"name": "DispatcherServer", "type": "dispatcher"}' - p5 = '{"name": "DispatcherServer", "type": "dispatcher", "enabled": false}' - p6 = '{"name": "DispatcherServer", "type": "dispatcher", "enabled": true}' - p7 = '{"name": "DispatcherServer", "type": "bucketstorage"}' - p8 = '{"name": "DispatcherServer", "type": "bucketstorage", "enabled": false}' - p9 = '{"name": "DispatcherServer", "type": "bucketstorage", "enabled": true}' - - @pytest.mark.parametrize("payload", [p1, p2, p3, p4, p5, p6, p7, p8, p9]) - async def test_add_external_service(self, client, payload): - data = json.loads(payload) - sch_id = '45876056-e04c-4cde-8a82-1d8dbbbe6d72' - + @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, enabled", [ + ("Mgt Server", "Management", "management", "[\"services/management\"]", 300, None), + ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30, "true"), + ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20, "false"), + ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10, None) + ]) + async def test_add_external_service(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority, enabled): async def async_mock_get_schedule(): schedule = StartUpSchedule() schedule.schedule_id = sch_id return schedule - async def q_result(*arg): - table = arg[0] - _payload = json.loads(arg[1]) - if table == 'schedules': - if _payload['return'][0] == 'process_name': - assert {"return": ["process_name"]} == _payload - return {'rows': [{'process_name': 'purge'}, {'process_name': 'stats collector'}], 'count': 2} - else: - assert {"return": ["schedule_name"], "where": {"column": "schedule_name", "condition": "=", - "value": data['name']}} == _payload - - return {'count': 0, 'rows': []} - if table == 'scheduled_processes': - sch_ps = data['type'] if data['type'] != "bucketstorage" else "bucket_storage" - assert {"return": ["name"], "where": {"column": "name", "condition": "=", - "value": "{}_c".format(sch_ps), - "and": {"column": "script", "condition": "=", - "value": "[\"services/{}_c\"]".format( - sch_ps)}}} == _payload - return {'count': 0, 'rows': []} - - expected_insert_resp = {'rows_affected': 1, "response": "inserted"} + sch_id = '45876056-e04c-4cde-8a82-1d8dbbbe6d72' + payload_dict = {"name": svc_name, "type": svc_type} + if enabled is not None: + payload_dict["enabled"] = enabled + payload = json.dumps(payload_dict) + data = json.loads(payload) + svc_info = { + "name": svc_name, + "type": svc_type, + "process": svc_process, + "process_script": svc_script, + "startup_priority": svc_priority + } server.Server.scheduler = Scheduler(None, None) storage_client_mock = MagicMock(StorageClientAsync) c_mgr = ConfigurationManager(storage_client_mock) - _rv1 = await self.async_mock(None) - _rv2 = await self.async_mock(expected_insert_resp) - _rv3 = await self.async_mock("") - _rv4 = await async_mock_get_schedule() + _rv1 = await self.async_mock(svc_info) + _rv2 = await self.async_mock(None) + _rv3 = await self.async_mock(0) + _rv4 = await self.async_mock({'count': 1}) + _rv5 = await self.async_mock({'count': 1, 'rows': [{'process_name': "blah"}]}) + _rv6 = await async_mock_get_schedule() with patch('os.path.exists', return_value=True): - with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): - with patch.object(c_mgr, 'get_category_all_items', return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', - return_value=_rv2) as insert_table_patch: - with patch.object(server.Server.scheduler, 'save_schedule', - return_value=_rv3) as patch_save_schedule: - with patch.object(server.Server.scheduler, 'get_schedule_by_name', - return_value=_rv4) as patch_get_schedule: - resp = await client.post('/fledge/service', data=payload) - server.Server.scheduler = None - assert 200 == resp.status - result = await resp.text() - json_response = json.loads(result) - assert {'id': sch_id, 'name': data['name']} == json_response - patch_get_schedule.assert_called_once_with(data['name']) - patch_save_schedule.assert_called_once() - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - ps = data['type'] if data['type'] != "bucketstorage" else "bucket_storage" - assert {'name': '{}_c'.format(ps), 'script': '["services/{}_c"]'.format( - ps)} == json.loads(args[1]) - patch_get_cat_info.assert_called_once_with(category_name=data['name']) + with patch.object(service, '_fetch_service_info', return_value=_rv1): + with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): + with patch.object(c_mgr, 'get_category_all_items', return_value=_rv2) as patch_get_cat_info: + with patch.object(service, 'check_schedules', return_value=_rv3) as patch_schedules: + with patch.object(service, 'check_scheduled_processes', return_value=_rv4) as patch_scheduled_processes: + with patch.object(service, 'check_schedule_entry', return_value=_rv5) as patch_schedule_entry: + with patch.object(server.Server.scheduler, 'save_schedule', return_value=_rv2) as patch_save_schedule: + with patch.object(server.Server.scheduler, 'get_schedule_by_name', return_value=_rv6) as patch_schedule_by_name: + resp = await client.post('/fledge/service', data=payload) + server.Server.scheduler = None + assert 200 == resp.status + result = await resp.text() + json_response = json.loads(result) + assert {'id': sch_id, 'name': data['name']} == json_response + patch_schedule_by_name.assert_called_once_with(data['name']) + patch_save_schedule.assert_called_once() + patch_schedules.assert_called_once_with(storage_client_mock, data['name']) + patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) + patch_get_cat_info.assert_called_once_with(category_name=data['name']) - @pytest.mark.parametrize("payload, svc_type", [ - ('{"name": "NotificationServer", "type": "notification"}', "notification"), - ('{"name": "DispatcherServer", "type": "dispatcher"}', "dispatcher"), - ('{"name": "BucketServer", "type": "bucketstorage"}', "bucketstorage") + + @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority", [ + ("Mgt Server", "Management", "management", "[\"services/management\"]", 300), + ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30), + ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20), + ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10) ]) - async def test_dupe_external_service_schedule(self, client, payload, svc_type): + async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority): + payload = json.dumps({"name": svc_name, "type": svc_type}) data = json.loads(payload) - - async def q_result(*arg): - table = arg[0] - _payload = json.loads(arg[1]) - sch_ps = svc_type if svc_type != "bucketstorage" else "bucket_storage" - if table == 'schedules': - if _payload['return'][0] == 'process_name': - assert {"return": ["process_name"]} == _payload - return {'rows': [{'process_name': 'stats collector'}, {'process_name': '{}_c'.format(sch_ps)}], - 'count': 2} - else: - assert {"return": ["schedule_name"], "where": {"column": "schedule_name", "condition": "=", - "value": data['name']}} == _payload - - return {'count': 0, 'rows': []} - if table == 'scheduled_processes': - assert {"return": ["name"], "where": {"column": "name", "condition": "=", - "value": "{}_c".format(sch_ps), - "and": {"column": "script", "condition": "=", - "value": "[\"services/{}_c\"]".format( - sch_ps)}}} == _payload - return {'count': 0, 'rows': []} - - expected_insert_resp = {'rows_affected': 1, "response": "inserted"} + svc_info = { + "name": svc_name, + "type": svc_type, + "process": svc_process, + "process_script": svc_script, + "startup_priority": svc_priority + } + msg = f"A {svc_info['process']} service type schedule already exists." server.Server.scheduler = Scheduler(None, None) storage_client_mock = MagicMock(StorageClientAsync) c_mgr = ConfigurationManager(storage_client_mock) - _rv1 = await self.async_mock(None) - _rv2 = await self.async_mock(expected_insert_resp) + _rv1 = await self.async_mock(svc_info) + _rv2 = await self.async_mock(None) + _rv3 = await self.async_mock(0) + _rv4 = await self.async_mock({'count': 1}) + _rv5 = await self.async_mock({'count': 1, 'rows': [{'process_name': svc_info['process']}]}) with patch('os.path.exists', return_value=True): - with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): - with patch.object(c_mgr, 'get_category_all_items', return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', - return_value=_rv2) as insert_table_patch: - resp = await client.post('/fledge/service', data=payload) - server.Server.scheduler = None - assert 400 == resp.status - svc_record = svc_type.capitalize() if svc_type != "bucketstorage" else "BucketStorage" - msg = "A {} service type schedule already exists.".format(svc_record) - assert msg == resp.reason - result = await resp.text() - json_response = json.loads(result) - assert {"message": msg} == json_response - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - p = json.loads(args[1]) - ps = svc_type if svc_type != "bucketstorage" else "bucket_storage" - assert {'name': '{}_c'.format(ps), 'script': '["services/{}_c"]'.format(ps)} == p - patch_get_cat_info.assert_called_once_with(category_name=data['name']) + with patch.object(service, '_fetch_service_info', return_value=_rv1): + with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): + with patch.object(c_mgr, 'get_category_all_items', return_value=_rv2) as patch_get_cat_info: + with patch.object(service, 'check_schedules', return_value=_rv3) as patch_schedules: + with patch.object(service, 'check_scheduled_processes', return_value=_rv4) as patch_scheduled_processes: + with patch.object(service, 'check_schedule_entry', return_value=_rv5) as patch_schedule_entry: + resp = await client.post('/fledge/service', data=payload) + server.Server.scheduler = None + assert 400 == resp.status + assert msg == resp.reason + result = await resp.text() + json_response = json.loads(result) + assert {"message": msg} == json_response + patch_schedules.assert_called_once_with(storage_client_mock, data['name']) + patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) + patch_get_cat_info.assert_called_once_with(category_name=data['name']) async def test_add_service_with_config(self, client): payload = '{"name": "Sine", "type": "south", "plugin": "sinusoid", "enabled": "false",' \ @@ -597,21 +548,6 @@ async def async_mock_get_schedule(): schedule.schedule_id = '2129cc95-c841-441a-ad39-6469a87dbc8b' return schedule - async def q_result(*arg): - table = arg[0] - _payload = arg[1] - if table == 'scheduled_processes': - assert {'return': ['name'], 'where': {'column': 'name', 'condition': '=', 'value': 'south_c', - 'and': {'column': 'script', 'condition': '=', - 'value': '[\"services/south_c\"]'}} - } == json.loads(_payload) - return {'count': 0, 'rows': []} - if table == 'schedules': - assert {'return': ['schedule_name'], - 'where': {'column': 'schedule_name', 'condition': '=', - 'value': data['name']}} == json.loads(_payload) - return {'count': 0, 'rows': []} - expected_insert_resp = {'rows_affected': 1, "response": "inserted"} mock_plugin_info = { 'name': data['name'], @@ -640,12 +576,13 @@ async def q_result(*arg): _rv2 = await self.async_mock(expected_insert_resp) _rv3 = await self.async_mock("") _rv4 = await async_mock_get_schedule() + _rv5 = await self.async_mock(0) + _rv6 = await self.async_mock({'count': 1}) with patch.object(common, 'load_and_fetch_python_plugin_info', side_effect=[mock_plugin_info]): with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(c_mgr, 'get_category_all_items', return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', - return_value=_rv2) as insert_table_patch: + with patch.object(service, 'check_schedules', return_value=_rv5) as patch_schedules: + with patch.object(service, 'check_scheduled_processes', return_value=_rv6) as patch_scheduled_processes: with patch.object(c_mgr, 'create_category', return_value=_rv1) as patch_create_cat: with patch.object(c_mgr, 'create_child_category', return_value=_rv1) as patch_create_child_cat: @@ -668,10 +605,8 @@ async def q_result(*arg): patch_create_child_cat.assert_called_once_with('South', ['Sine']) assert 2 == patch_create_cat.call_count patch_create_cat.assert_called_with('South', {}, 'South microservices', True) - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - p = json.loads(args[1]) - assert {'name': 'south_c', 'script': '["services/south_c"]'} == p + patch_scheduled_processes.assert_called_once_with(storage_client_mock, "south_c", "[\"services/south_c\"]") + patch_schedules.assert_called_once_with(storage_client_mock, data['name']) patch_get_cat_info.assert_called_once_with(category_name=data['name']) async def test_delete_service(self, mocker, client): @@ -988,11 +923,11 @@ async def test_bad_get_service_available(self, client): ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage'])], [(['/usr/local/fledge/python/fledge/services/management'], [], [])], ["south", "storage"]), ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage'])], - [(['/usr/local/fledge/python/fledge/services/management'], [], ['__main__.py'])], + [(['/usr/local/fledge/python/fledge/services/management'], [], ['__init__.py', '__main__.py'])], ["south", "storage", "management"]), ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage', 'fledge.services.notification'])], - [(['/usr/local/fledge/python/fledge/services/management'], [], ['__main__.py'])], + [(['/usr/local/fledge/python/fledge/services/management'], [], ['__init__.py', '__main__.py'])], ["south", "storage", "notification", "management"]), ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage', 'fledge.services.north'])], [], @@ -1003,136 +938,64 @@ async def test_bad_get_service_available(self, client): ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage', 'fledge.services.north', 'fledge.services.notification', 'fledge.services.dispatcher', 'fledge.services.bucket'])], [], - ["south", "storage", "north", "notification", "dispatcher", "bucket"]) + ["south", "storage", "north", "notification", "dispatcher", "bucket"]), + ([(['/usr/local/fledge/services'], [], ['fledge.services.south', 'fledge.services.storage'])], + [(['/usr/local/fledge/python/fledge/services/management'], [], ['__init__.py', '__main__.py']), + (['/usr/local/fledge/python/fledge/services/test'], [], ['__init__.py', '__main__.py'])], + ["south", "storage", "management", "test"]) ]) async def test_get_service_installed(self, client, mock_value1, mock_value2, exp_result): - with patch('os.walk', side_effect=(mock_value1, mock_value2)) as mockwalk: + with patch('os.walk', side_effect=(mock_value1,)) as mockwalk, \ + patch('os.listdir') as mocklistdir, \ + patch('os.path.isfile') as mockisfile, \ + patch('os.path.isdir') as mockisdir, \ + patch('os.path.exists') as mockexists, \ + patch('fledge.services.core.api.service._logger') as mocklogger: + + # Mock os.path.exists to return True for both service paths + mockexists.return_value = True + + # Set up mocks for Python services based on mock_value2 + if mock_value2: # If there's Python service mock data + # Extract service names from the mock_value2 directory paths + python_services = [] + available_files = {} + + for walk_data in mock_value2: + root_list, dirs, files = walk_data + root = root_list[0] # Get the actual path string from the list + service_name = root.split('/')[-1] # Extract service name from path + python_services.append(service_name) + available_files[service_name] = files + + mocklistdir.return_value = python_services + mockisdir.return_value = True + + # Mock os.path.isfile based on the files available for each service + def mock_isfile_side_effect(path): + for service_name, files in available_files.items(): + if service_name in path: + filename = path.split('/')[-1] + return filename in files + return False + + mockisfile.side_effect = mock_isfile_side_effect + else: + mocklistdir.return_value = [] + mockisdir.return_value = False + mockisfile.return_value = False + resp = await client.get('/fledge/service/installed') assert 200 == resp.status result = await resp.text() json_response = json.loads(result) assert json_response == {'services': exp_result} - assert 2 == mockwalk.call_count - - p1 = '{"name": "FL Agent", "type": "management"}' - p2 = '{"name": "FL #1", "type": "management", "enabled": false}' - p3 = '{"name": "FL_MGT", "type": "management", "enabled": true}' - - @pytest.mark.parametrize("payload", [p1, p2, p3]) - async def test_add_management_service(self, client, payload): - data = json.loads(payload) - sch_id = '4624d3e4-c295-4bfd-848b-8a843cc90c3f' - - async def async_mock_get_schedule(): - schedule = StartUpSchedule() - schedule.schedule_id = sch_id - return schedule - - async def q_result(*arg): - table = arg[0] - _payload = json.loads(arg[1]) - if table == 'schedules': - if _payload['return'][0] == 'process_name': - assert {"return": ["process_name"]} == _payload - return {'rows': [{'process_name': 'purge'}, {'process_name': 'stats collector'}], 'count': 2} - else: - assert {"return": ["schedule_name"], "where": {"column": "schedule_name", "condition": "=", - "value": data['name']}} == _payload - - return {'count': 0, 'rows': []} - if table == 'scheduled_processes': - assert {"return": ["name"], "where": {"column": "name", "condition": "=", "value": "management", - "and": {"column": "script", "condition": "=", - "value": "[\"services/management\"]"}} - } == _payload - return {'count': 0, 'rows': []} - - expected_insert_resp = {'rows_affected': 1, "response": "inserted"} - server.Server.scheduler = Scheduler(None, None) - storage_client_mock = MagicMock(StorageClientAsync) - c_mgr = ConfigurationManager(storage_client_mock) - _rv1 = await self.async_mock(None) - _rv2 = await self.async_mock(expected_insert_resp) - _rv3 = await self.async_mock("") - _rv4 = await async_mock_get_schedule() - with patch('os.path.exists', return_value=True): - with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): - with patch.object(c_mgr, 'get_category_all_items', return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', return_value=_rv2) as insert_table_patch: - with patch.object(server.Server.scheduler, 'save_schedule', return_value=_rv3) as patch_save_schedule: - with patch.object(server.Server.scheduler, 'get_schedule_by_name', return_value=_rv4) as patch_get_schedule: - resp = await client.post('/fledge/service', data=payload) - server.Server.scheduler = None - assert 200 == resp.status - result = await resp.text() - json_response = json.loads(result) - assert {'id': sch_id, 'name': data['name']} == json_response - patch_get_schedule.assert_called_once_with(data['name']) - patch_save_schedule.assert_called_once() - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - p = json.loads(args[1]) - assert {'name': 'management', 'priority': 300, 'script': '["services/management"]'} == p - patch_get_cat_info.assert_called_once_with(category_name=data['name']) - - async def test_dupe_management_service_schedule(self, client): - payload = '{"name": "FL Agent", "type": "management"}' - data = json.loads(payload) - async def q_result(*arg): - table = arg[0] - _payload = json.loads(arg[1]) - if table == 'schedules': - if _payload['return'][0] == 'process_name': - assert {"return": ["process_name"]} == _payload - return {'rows': [{'process_name': 'stats collector'}, {'process_name': 'management'}], - 'count': 2} - else: - assert {"return": ["schedule_name"], "where": {"column": "schedule_name", "condition": "=", - "value": data['name']}} == _payload - - return {'count': 0, 'rows': []} - if table == 'scheduled_processes': - assert {"return": ["name"], "where": {"column": "name", "condition": "=", "value": "management", - "and": {"column": "script", "condition": "=", - "value": "[\"services/management\"]"}} - } == _payload - return {'count': 0, 'rows': []} - - expected_insert_resp = {'rows_affected': 1, "response": "inserted"} - msg = "A Management service type schedule already exists." - server.Server.scheduler = Scheduler(None, None) - storage_client_mock = MagicMock(StorageClientAsync) - c_mgr = ConfigurationManager(storage_client_mock) - _rv1 = await self.async_mock(None) - _rv2 = await self.async_mock(expected_insert_resp) - with patch('os.path.exists', return_value=True): - with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): - with patch.object(c_mgr, 'get_category_all_items', - return_value=_rv1) as patch_get_cat_info: - with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): - with patch.object(storage_client_mock, 'insert_into_tbl', - return_value=_rv2) as insert_table_patch: - resp = await client.post('/fledge/service', data=payload) - server.Server.scheduler = None - assert 400 == resp.status - assert msg == resp.reason - result = await resp.text() - json_response = json.loads(result) - assert {"message": msg} == json_response - args, kwargs = insert_table_patch.call_args - assert 'scheduled_processes' == args[0] - p = json.loads(args[1]) - assert {'name': 'management', 'priority': 300, 'script': '["services/management"]'} == p - patch_get_cat_info.assert_called_once_with(category_name=data['name']) + # Only assert walk was called if there's C-service data + if mock_value1: + mockwalk.assert_called_once() - @pytest.mark.parametrize("param", [ - "blah", - 1, - "storage" - "south" - ]) + @pytest.mark.parametrize("param", ["blah", 1, "storage", "south"]) async def test_bad_type_update_package(self, client, param): resp = await client.put('/fledge/service/{}/name/update'.format(param), data=None) assert 400 == resp.status From 6c0bd3f731415211c9ef95384183ee4e794455e1 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 16 Sep 2025 18:01:39 +0530 Subject: [PATCH 11/16] Schema version with 77 alongwith upgrade/downgrade scripts Signed-off-by: ashish-jabble --- VERSION | 2 +- scripts/plugins/storage/postgres/downgrade/76.sql | 1 + scripts/plugins/storage/postgres/upgrade/77.sql | 1 + scripts/plugins/storage/sqlite/downgrade/76.sql | 1 + scripts/plugins/storage/sqlite/upgrade/77.sql | 1 + scripts/plugins/storage/sqlitelb/downgrade/76.sql | 1 + scripts/plugins/storage/sqlitelb/upgrade/77.sql | 1 + 7 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 scripts/plugins/storage/postgres/downgrade/76.sql create mode 100644 scripts/plugins/storage/postgres/upgrade/77.sql create mode 100644 scripts/plugins/storage/sqlite/downgrade/76.sql create mode 100644 scripts/plugins/storage/sqlite/upgrade/77.sql create mode 100644 scripts/plugins/storage/sqlitelb/downgrade/76.sql create mode 100644 scripts/plugins/storage/sqlitelb/upgrade/77.sql diff --git a/VERSION b/VERSION index 47c14db6b..d40af75c6 100644 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ fledge_version=3.1.0 -fledge_schema=76 +fledge_schema=77 diff --git a/scripts/plugins/storage/postgres/downgrade/76.sql b/scripts/plugins/storage/postgres/downgrade/76.sql new file mode 100644 index 000000000..bd4968f86 --- /dev/null +++ b/scripts/plugins/storage/postgres/downgrade/76.sql @@ -0,0 +1 @@ +-- No downgrade is required, as the Pipeline service is no longer in use. \ No newline at end of file diff --git a/scripts/plugins/storage/postgres/upgrade/77.sql b/scripts/plugins/storage/postgres/upgrade/77.sql new file mode 100644 index 000000000..f792f6bef --- /dev/null +++ b/scripts/plugins/storage/postgres/upgrade/77.sql @@ -0,0 +1 @@ +DELETE FROM fledge.scheduled_processes WHERE name = 'pipeline_c'; \ No newline at end of file diff --git a/scripts/plugins/storage/sqlite/downgrade/76.sql b/scripts/plugins/storage/sqlite/downgrade/76.sql new file mode 100644 index 000000000..bd4968f86 --- /dev/null +++ b/scripts/plugins/storage/sqlite/downgrade/76.sql @@ -0,0 +1 @@ +-- No downgrade is required, as the Pipeline service is no longer in use. \ No newline at end of file diff --git a/scripts/plugins/storage/sqlite/upgrade/77.sql b/scripts/plugins/storage/sqlite/upgrade/77.sql new file mode 100644 index 000000000..f792f6bef --- /dev/null +++ b/scripts/plugins/storage/sqlite/upgrade/77.sql @@ -0,0 +1 @@ +DELETE FROM fledge.scheduled_processes WHERE name = 'pipeline_c'; \ No newline at end of file diff --git a/scripts/plugins/storage/sqlitelb/downgrade/76.sql b/scripts/plugins/storage/sqlitelb/downgrade/76.sql new file mode 100644 index 000000000..bd4968f86 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/downgrade/76.sql @@ -0,0 +1 @@ +-- No downgrade is required, as the Pipeline service is no longer in use. \ No newline at end of file diff --git a/scripts/plugins/storage/sqlitelb/upgrade/77.sql b/scripts/plugins/storage/sqlitelb/upgrade/77.sql new file mode 100644 index 000000000..f792f6bef --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/upgrade/77.sql @@ -0,0 +1 @@ +DELETE FROM fledge.scheduled_processes WHERE name = 'pipeline_c'; \ No newline at end of file From 12ec6d1d23dd0864cbf31d1eb5752b3e2b5fde35 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 16 Sep 2025 19:04:38 +0530 Subject: [PATCH 12/16] service installed patch fixes in api tests Signed-off-by: ashish-jabble --- .../fledge/services/core/api/test_service.py | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index f21c4cc9d..9f9253a71 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -439,13 +439,13 @@ async def test_bad_external_service(self, client, payload): json_response = json.loads(result) assert {"message": msg} == json_response - @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, enabled", [ - ("Mgt Server", "Management", "management", "[\"services/management\"]", 300, None), - ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30, "true"), - ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20, "false"), - ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10, None) + @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, enabled, svc_installed", [ + ("Mgt Server", "Management", "management", "[\"services/management\"]", 300, None, ["management"]), + ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30, "true", ["notification"]), + ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20, "false", ["dispatcher"]), + ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10, None, ["bucket"]) ]) - async def test_add_external_service(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority, enabled): + async def test_add_external_service(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority, enabled, svc_installed): async def async_mock_get_schedule(): schedule = StartUpSchedule() schedule.schedule_id = sch_id @@ -464,16 +464,18 @@ async def async_mock_get_schedule(): "process_script": svc_script, "startup_priority": svc_priority } + svc_installed.extend(["storage", "south", "north"]) server.Server.scheduler = Scheduler(None, None) storage_client_mock = MagicMock(StorageClientAsync) c_mgr = ConfigurationManager(storage_client_mock) + _rv0 = await self.async_mock(svc_installed) _rv1 = await self.async_mock(svc_info) _rv2 = await self.async_mock(None) _rv3 = await self.async_mock(0) _rv4 = await self.async_mock({'count': 1}) _rv5 = await self.async_mock({'count': 1, 'rows': [{'process_name': "blah"}]}) _rv6 = await async_mock_get_schedule() - with patch('os.path.exists', return_value=True): + with patch.object(service, 'get_service_installed', return_value=_rv0) as patch_svc_installed: with patch.object(service, '_fetch_service_info', return_value=_rv1): with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(c_mgr, 'get_category_all_items', return_value=_rv2) as patch_get_cat_info: @@ -484,6 +486,7 @@ async def async_mock_get_schedule(): with patch.object(server.Server.scheduler, 'get_schedule_by_name', return_value=_rv6) as patch_schedule_by_name: resp = await client.post('/fledge/service', data=payload) server.Server.scheduler = None + print(resp.reason) assert 200 == resp.status result = await resp.text() json_response = json.loads(result) @@ -493,15 +496,15 @@ async def async_mock_get_schedule(): patch_schedules.assert_called_once_with(storage_client_mock, data['name']) patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) patch_get_cat_info.assert_called_once_with(category_name=data['name']) + assert 2 == patch_svc_installed.call_count - - @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority", [ - ("Mgt Server", "Management", "management", "[\"services/management\"]", 300), - ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30), - ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20), - ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10) + @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, svc_installed", [ + ("Mgt Server", "Management", "management", "[\"services/management\"]", 300, ["management"]), + ("NF Server", "Notification", "notification_c", "[\"services/notification_c\"]", 30, ["notification"]), + ("DS Server", "Dispatcher", "dispatcher_c", "[\"services/dispatcher_c\"]", 20, ["dispatcher"]), + ("BS Server", "BucketStorage", "bucket_storage_c", "[\"services/bucket_storage_c\"]", 10, ["bucket"]) ]) - async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority): + async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, svc_process, svc_script, svc_priority, svc_installed): payload = json.dumps({"name": svc_name, "type": svc_type}) data = json.loads(payload) svc_info = { @@ -515,12 +518,14 @@ async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, server.Server.scheduler = Scheduler(None, None) storage_client_mock = MagicMock(StorageClientAsync) c_mgr = ConfigurationManager(storage_client_mock) + svc_installed.extend(["storage", "south", "north"]) + _rv0 = await self.async_mock(svc_installed) _rv1 = await self.async_mock(svc_info) _rv2 = await self.async_mock(None) _rv3 = await self.async_mock(0) _rv4 = await self.async_mock({'count': 1}) _rv5 = await self.async_mock({'count': 1, 'rows': [{'process_name': svc_info['process']}]}) - with patch('os.path.exists', return_value=True): + with patch.object(service, 'get_service_installed', return_value=_rv0) as patch_svc_installed: with patch.object(service, '_fetch_service_info', return_value=_rv1): with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(c_mgr, 'get_category_all_items', return_value=_rv2) as patch_get_cat_info: @@ -537,6 +542,7 @@ async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, patch_schedules.assert_called_once_with(storage_client_mock, data['name']) patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) patch_get_cat_info.assert_called_once_with(category_name=data['name']) + assert 2 == patch_svc_installed.call_count async def test_add_service_with_config(self, client): payload = '{"name": "Sine", "type": "south", "plugin": "sinusoid", "enabled": "false",' \ From be2c43130bbd66b55daacd6fe39ab3001a838f8a Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 16 Sep 2025 19:32:51 +0530 Subject: [PATCH 13/16] assertion fixes in service API tests Signed-off-by: ashish-jabble --- tests/unit/python/fledge/services/core/api/test_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index 9f9253a71..e40d53120 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -496,7 +496,7 @@ async def async_mock_get_schedule(): patch_schedules.assert_called_once_with(storage_client_mock, data['name']) patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) patch_get_cat_info.assert_called_once_with(category_name=data['name']) - assert 2 == patch_svc_installed.call_count + patch_svc_installed.assert_called_once_with() @pytest.mark.parametrize("svc_name, svc_type, svc_process, svc_script, svc_priority, svc_installed", [ ("Mgt Server", "Management", "management", "[\"services/management\"]", 300, ["management"]), @@ -542,7 +542,7 @@ async def test_dupe_external_service_schedule(self, client, svc_name, svc_type, patch_schedules.assert_called_once_with(storage_client_mock, data['name']) patch_scheduled_processes.assert_called_once_with(storage_client_mock, svc_info['process'], svc_info['process_script']) patch_get_cat_info.assert_called_once_with(category_name=data['name']) - assert 2 == patch_svc_installed.call_count + patch_svc_installed.assert_called_once_with() async def test_add_service_with_config(self, client): payload = '{"name": "Sine", "type": "south", "plugin": "sinusoid", "enabled": "false",' \ From aa7eba621db2b48f58f8a4fdf48f69576ee722bc Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Fri, 19 Sep 2025 15:37:51 +0530 Subject: [PATCH 14/16] service type normalization in case of south, north only for backward compatibility Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 28 ++++++++++---------- tests/system/python/api/test_notification.py | 16 +++++++---- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 1239d8aa3..e149efd81 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -614,7 +614,6 @@ async def add_service(request): service_type = data.get('type', None) enabled = data.get('enabled', None) config = data.get('config', None) - if name is None: raise web.HTTPBadRequest(reason='Missing name property in payload.') if 'action' in request.query and request.query['action'] != '': @@ -686,10 +685,11 @@ async def add_service(request): raise web.HTTPBadRequest(reason='Invalid name property in payload.') if utils.check_fledge_reserved(name) is False: raise web.HTTPBadRequest(reason="'{}' is reserved for Fledge and can not be used as service name!".format(name)) - if service_type is None: + normalized_type = str(service_type).lower() if str(service_type).lower() in ('south', 'north') else service_type + if normalized_type is None: raise web.HTTPBadRequest(reason='Missing type property in payload.') - if plugin is None and service_type in ('south', 'north'): - raise web.HTTPBadRequest(reason='Missing plugin property for type {} in payload.'.format(service_type)) + if plugin is None and normalized_type in ('south', 'north'): + raise web.HTTPBadRequest(reason='Missing plugin property for type {} in payload.'.format(normalized_type)) if plugin and utils.check_reserved(plugin) is False: raise web.HTTPBadRequest(reason='Invalid plugin property in payload.') @@ -707,13 +707,13 @@ async def add_service(request): # Check if a valid plugin has been provided plugin_module_path, plugin_config, process_name, script = "", {}, "", "" - if service_type == 'south' or service_type == 'north': + if normalized_type in ('south', 'north'): # "plugin_module_path" is fixed by design. It is MANDATORY to keep the plugin in the exactly similar named # folder, within the plugin_module_path. # if multiple plugin with same name are found, then python plugin import will be tried first - plugin_module_path = "{}/python/fledge/plugins/{}/{}".format(_FLEDGE_ROOT, service_type, plugin) + plugin_module_path = "{}/python/fledge/plugins/{}/{}".format(_FLEDGE_ROOT, normalized_type, plugin) # FIXME: FOGL-10225 For south, north service type derived values from service info - if service_type == 'south': + if normalized_type == 'south': process_name = 'south_c' script = '["services/south_c"]' priority = 100 @@ -722,7 +722,7 @@ async def add_service(request): script = '["services/north_C"]' priority = 200 try: - plugin_info = common.load_and_fetch_python_plugin_info(plugin_module_path, plugin, service_type) + plugin_info = common.load_and_fetch_python_plugin_info(plugin_module_path, plugin, normalized_type) plugin_config = plugin_info['config'] if not plugin_config: msg = "Plugin '{}' import problem from path '{}''.".format(plugin, plugin_module_path) @@ -730,8 +730,8 @@ async def add_service(request): raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) except FileNotFoundError as ex: # Checking for C-type plugins - plugin_config = load_c_plugin(plugin, service_type) - plugin_module_path = "{}/plugins/{}/{}".format(_FLEDGE_ROOT, service_type, plugin) + plugin_config = load_c_plugin(plugin, normalized_type) + plugin_module_path = "{}/plugins/{}/{}".format(_FLEDGE_ROOT, normalized_type, plugin) if not plugin_config: msg = "Plugin '{}' not found in path '{}'.".format(plugin, plugin_module_path) _logger.exception(ex, msg) @@ -745,13 +745,13 @@ async def add_service(request): for service_name in get_service_installed(): if service_name not in _PREBUILT_SERVICES: service_info = await _fetch_service_info(service_name) - if service_info['type'] == service_type: + if service_info['type'] == normalized_type: process_name = service_info['process'] script = service_info['process_script'] priority = service_info['startup_priority'] break if process_name is None or script is None or priority is None: - message = f"The '{service_type}' service has not been installed correctly." + message = f"The '{normalized_type}' service has not been installed correctly." raise web.HTTPNotFound(reason=message, body=json.dumps({"message": message})) storage = connect.get_storage_async() config_mgr = ConfigurationManager(storage) @@ -783,7 +783,7 @@ async def add_service(request): _logger.error(ex, "Failed to create scheduled process.") raise web.HTTPInternalServerError(reason='Failed to create service.') - if service_type == 'south' or service_type == 'north': + if normalized_type in ('south', 'north'): try: # Create a configuration category from the configuration defined in the plugin category_desc = plugin_config['plugin']['description'] @@ -792,7 +792,7 @@ async def add_service(request): category_value=plugin_config, keep_original_items=True) # Create the parent category for all South services - parent_cat_name = service_type.capitalize() + parent_cat_name = normalized_type.capitalize() await config_mgr.create_category(parent_cat_name, {}, "{} microservices".format(parent_cat_name), True) await config_mgr.create_child_category(parent_cat_name, [name]) diff --git a/tests/system/python/api/test_notification.py b/tests/system/python/api/test_notification.py index b72cf7225..82109860a 100644 --- a/tests/system/python/api/test_notification.py +++ b/tests/system/python/api/test_notification.py @@ -80,12 +80,18 @@ def test_notification_service_add(self, service_branch, fledge_url, wait_time, r finally: remove_directories("/tmp/fledge-service-{}".format(SERVICE)) - # Start service + # GET service type conn = http.client.HTTPConnection(fledge_url) - data = {"name": SERVICE_NAME, - "type": "notification", - "enabled": "true" - } + conn.request("GET", '/fledge/service/info/{}'.format(SERVICE)) + r = conn.getresponse() + assert 200 == r.status + r = r.read().decode() + jdoc = json.loads(r) + SERVICE_TYPE = jdoc['type'] + + # Add service + conn = http.client.HTTPConnection(fledge_url) + data = {"name": SERVICE_NAME, "type": SERVICE_TYPE, "enabled": "true"} conn.request("POST", '/fledge/service', json.dumps(data)) r = conn.getresponse() assert 200 == r.status From 0ea458fbd0cf8f6386271f076a9b5c08617e7fea Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Mon, 22 Sep 2025 18:38:36 +0530 Subject: [PATCH 15/16] notification tests updated Signed-off-by: ashish-jabble --- tests/system/python/api/test_notification.py | 7 ++----- .../test_e2e_notification_service_with_plugins.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/system/python/api/test_notification.py b/tests/system/python/api/test_notification.py index 82109860a..83b2962ac 100644 --- a/tests/system/python/api/test_notification.py +++ b/tests/system/python/api/test_notification.py @@ -82,16 +82,13 @@ def test_notification_service_add(self, service_branch, fledge_url, wait_time, r # GET service type conn = http.client.HTTPConnection(fledge_url) - conn.request("GET", '/fledge/service/info/{}'.format(SERVICE)) + conn.request("GET", f'/fledge/service/info/{SERVICE}') r = conn.getresponse() assert 200 == r.status r = r.read().decode() jdoc = json.loads(r) - SERVICE_TYPE = jdoc['type'] - # Add service - conn = http.client.HTTPConnection(fledge_url) - data = {"name": SERVICE_NAME, "type": SERVICE_TYPE, "enabled": "true"} + data = {"name": SERVICE_NAME, "type": jdoc['type'], "enabled": "true"} conn.request("POST", '/fledge/service', json.dumps(data)) r = conn.getresponse() assert 200 == r.status diff --git a/tests/system/python/e2e/test_e2e_notification_service_with_plugins.py b/tests/system/python/e2e/test_e2e_notification_service_with_plugins.py index dc433ccd1..717853333 100644 --- a/tests/system/python/e2e/test_e2e_notification_service_with_plugins.py +++ b/tests/system/python/e2e/test_e2e_notification_service_with_plugins.py @@ -42,12 +42,15 @@ def _configure_and_start_service(service_branch, fledge_url, remove_directories) finally: remove_directories("/tmp/fledge-service-{}".format(SERVICE)) - # Start service + # GET service type conn = http.client.HTTPConnection(fledge_url) - data = {"name": SERVICE_NAME, - "type": "notification", - "enabled": "true" - } + conn.request("GET", f'/fledge/service/info/{SERVICE}') + r = conn.getresponse() + assert 200 == r.status + r = r.read().decode() + jdoc = json.loads(r) + # Start service + data = {"name": SERVICE_NAME, "type": jdoc['type'], "enabled": "true"} conn.request("POST", '/fledge/service', json.dumps(data)) r = conn.getresponse() assert 200 == r.status From 5b26f1144f6bf712d98f53905708cc690b7ee66e Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Mon, 22 Sep 2025 19:31:51 +0530 Subject: [PATCH 16/16] Updates as per FOGL-10225; removed pre inbuilt service core handling Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 88 ++++++---------------- 1 file changed, 23 insertions(+), 65 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index e149efd81..69ebf8d59 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -135,31 +135,6 @@ def size(self): # service info cache instance _service_info_cache = ServiceInfoCache() -# TODO: FOGL-1022 - This is a temporary solution to get the service info for the prebuilt services. -# Prebuilt services configuration -_PREBUILT_SERVICES = { - "south": { - "name": "south", - "description": "Service used to interact with device, API and generic sources of data", - "type": "south", - "process": "south", - "process_script": "south_c" - }, - "north": { - "name": "north", - "description": "Service used to interact with device, API and generic sources of data", - "type": "north", - "process": "north", - "process_script": "north_C" - }, - "storage": { - "name": "storage", - "description": "The storage service buffers data within a single Fledge instance", - "type": "storage", - "process": "storage", - "process_script": "storage" - } -} ################################# # Service @@ -204,12 +179,11 @@ async def _fetch_service_info(service_name: str) -> dict: Returns: Service info dictionary with service details, or empty response if all methods fail """ - # Check cache first for non-prebuilt services - if service_name not in _PREBUILT_SERVICES: - cached_info = _service_info_cache.get(service_name) - if cached_info is not None: - _logger.debug(f"Retrieved service info for {service_name} from cache") - return cached_info + # Check cache first + cached_info = _service_info_cache.get(service_name) + if cached_info is not None: + _logger.debug(f"Retrieved service info for {service_name} from cache") + return cached_info def _create_empty_service_response(name: str) -> dict: return { @@ -291,10 +265,9 @@ def _get_service_info_from_path(service_path: str, is_python: bool = False, serv try: service_info = _get_service_info_from_path(service_path, service_name=service_name) if service_info: - # Cache successful result for non-prebuilt services - if service_name not in _PREBUILT_SERVICES: - _service_info_cache.update(service_name, service_info) - _logger.debug(f"Cached service info for {service_name}") + # Cache successful result + _service_info_cache.update(service_name, service_info) + _logger.debug(f"Cached service info for {service_name}") return service_info else: # File found but unable to get info - return early @@ -310,10 +283,9 @@ def _get_service_info_from_path(service_path: str, is_python: bool = False, serv try: service_info = _get_service_info_from_path(python_service_path, is_python=True, service_name=service_name) if service_info: - # Cache successful result for non-prebuilt services - if service_name not in _PREBUILT_SERVICES: - _service_info_cache.update(service_name, service_info) - _logger.debug(f"Cached service info for {service_name}") + # Cache successful result + _service_info_cache.update(service_name, service_info) + _logger.debug(f"Cached service info for {service_name}") return service_info else: # File found but unable to get info - return early @@ -394,10 +366,6 @@ async def get_service_info(request): installed_services = get_service_installed() services = [] for service_name in installed_services: - # Check if it's a prebuilt service - if service_name in _PREBUILT_SERVICES: - services.append(_PREBUILT_SERVICES[service_name]) - continue service_info = await _fetch_service_info(service_name) services.append(service_info) response = {"services": services} @@ -432,12 +400,8 @@ async def get_service_info_by_name(request): msg = f"Service '{service_name}' not found in installed services." raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) - # Check if it's a prebuilt service - if service_name in _PREBUILT_SERVICES: - service_info = _PREBUILT_SERVICES[service_name] - else: - # Try to get service info from C or Python service - service_info = await _fetch_service_info(service_name) + # Get service info from C or Python service + service_info = await _fetch_service_info(service_name) return web.json_response(service_info) except Exception as ex: msg = str(ex) @@ -712,15 +676,10 @@ async def add_service(request): # folder, within the plugin_module_path. # if multiple plugin with same name are found, then python plugin import will be tried first plugin_module_path = "{}/python/fledge/plugins/{}/{}".format(_FLEDGE_ROOT, normalized_type, plugin) - # FIXME: FOGL-10225 For south, north service type derived values from service info - if normalized_type == 'south': - process_name = 'south_c' - script = '["services/south_c"]' - priority = 100 - else: - process_name = 'north_C' - script = '["services/north_C"]' - priority = 200 + service_info = await _fetch_service_info(normalized_type) + process_name = service_info['process'] + script = service_info['process_script'] + priority = service_info['startup_priority'] try: plugin_info = common.load_and_fetch_python_plugin_info(plugin_module_path, plugin, normalized_type) plugin_config = plugin_info['config'] @@ -743,13 +702,12 @@ async def add_service(request): raise web.HTTPInternalServerError(reason='Failed to fetch plugin configuration') else: for service_name in get_service_installed(): - if service_name not in _PREBUILT_SERVICES: - service_info = await _fetch_service_info(service_name) - if service_info['type'] == normalized_type: - process_name = service_info['process'] - script = service_info['process_script'] - priority = service_info['startup_priority'] - break + service_info = await _fetch_service_info(service_name) + if service_info['type'] == normalized_type: + process_name = service_info['process'] + script = service_info['process_script'] + priority = service_info['startup_priority'] + break if process_name is None or script is None or priority is None: message = f"The '{normalized_type}' service has not been installed correctly." raise web.HTTPNotFound(reason=message, body=json.dumps({"message": message}))