diff --git a/owslib/ogcapi/connectedsystems.py b/owslib/ogcapi/connectedsystems.py index fe996702..f3391ab6 100644 --- a/owslib/ogcapi/connectedsystems.py +++ b/owslib/ogcapi/connectedsystems.py @@ -146,7 +146,7 @@ def system_create(self, data: str) -> dict: @returns: `dict` of system metadata """ - path = 'systems/' + path = 'systems' return self._request(path=path, method='POST', data=data) @@ -813,51 +813,51 @@ def observations_delete(self, observation_id: str) -> dict: return self._request(path=path, method='DELETE') -class ControlChannels(ConnectedSystems): +class ControlStreams(ConnectedSystems): def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, auth: Authentication = None): __doc__ = Collections.__doc__ # noqa super().__init__(url, json_, timeout, headers, auth) - def controls(self, **kwargs) -> dict: + def controlstreams(self, **kwargs) -> dict: """ - implements /controls + implements /controlstreams @returns: `dict` of control channel objects """ - path = 'controls' + path = 'controlstreams' query_params = QueryArgs(**kwargs) p_list = ['id', 'q', 'issueTime', 'executionTime', 'system', 'foi', 'controlledProperty', 'limit'] return self._request(path=path, kwargs=query_params.check_params(p_list)) - def control(self, control_id: str) -> dict: + def controlstream(self, control_id: str) -> dict: """ - implements /controls/{control_id} + implements /controlstreams/{control_id} @type control_id: string @param control_id: id of control channel @returns: `dict` of control channels """ - path = f'controls/{control_id}' + path = f'controlstreams/{control_id}' return self._request(path=path) def controls_of_system(self, system_id: str, **kwargs) -> dict: """ - implements /systems/{system_id}/controls + implements /systems/{system_id}/controlstreams @type system_id: string @param system_id: id of system @returns: `dict` of control channels """ - path = f'systems/{system_id}/controls' + path = f'systems/{system_id}/controlstreams' query_params = QueryArgs(**kwargs) p_list = ['id', 'q', 'issueTime', 'executionTime', 'limit'] return self._request(path=path, kwargs=query_params.check_params(p_list)) def control_create_in_system(self, system_id: str, data: str) -> dict: """ - implements /controls + implements /controlstreams @type system_id: string @param system_id: id of system @type data: dict @@ -865,12 +865,12 @@ def control_create_in_system(self, system_id: str, data: str) -> dict: @returns: `dict` of control channels """ - path = f'systems/{system_id}/controls' + path = f'systems/{system_id}/controlstreams' return self._request(path=path, data=data, method='POST') def control_update(self, control_id: str, data: str) -> dict: """ - implements /controls/{control_id} + implements /controlstreams/{control_id} @type control_id: string @param control_id: id of control channel @type data: dict @@ -878,36 +878,36 @@ def control_update(self, control_id: str, data: str) -> dict: @returns: `dict` of control channels """ - path = f'controls/{control_id}' + path = f'controlstreams/{control_id}' return self._request(path=path, data=data, method='PUT') def control_delete(self, control_id: str) -> dict: """ - implements /controls/{control_id} + implements /controlstreams/{control_id} @type control_id: string @param control_id: id of control channel @returns: `dict` of control channels """ - path = f'controls/{control_id}' + path = f'controlstreams/{control_id}' return self._request(path=path, method='DELETE') def control_retrieve_schema(self, control_id: str, **kwargs) -> dict: """ - implements /controls/{control_id}/schema + implements /controlstreams/{control_id}/schema @type control_id: string @param control_id: id of control channel @returns: `dict` of control channels """ - path = f'controls/{control_id}/schema' + path = f'controlstreams/{control_id}/schema' query_params = QueryArgs(**kwargs) p_list = ['cmdFormat', 'type'] return self._request(path=path, kwargs=query_params.check_params(p_list)) def control_update_schema(self, control_id: str, data: str) -> dict: """ - implements /controls/{control_id}/schema + implements /controlstreams/{control_id}/schema @type control_id: string @param control_id: id of control channel @type data: dict @@ -915,7 +915,7 @@ def control_update_schema(self, control_id: str, data: str) -> dict: @returns: `dict` of control channels """ - path = f'controls/{control_id}/schema' + path = f'controlstreams/{control_id}/schema' return self._request(path=path, data=data, method='PUT') @@ -950,20 +950,20 @@ def command(self, command_id: str) -> dict: def commands_of_control_channel(self, control_id: str, **kwargs) -> dict: """ - implements /controls/{control_id}/commands + implements /controlstreams/{control_id}/commands @type control_id: string @param control_id: id of control channel @returns: `dict` of commands object """ - path = f'controls/{control_id}/commands' + path = f'controlstreams/{control_id}/commands' query_params = QueryArgs(**kwargs) p_list = ['id', 'issueTime', 'executionTime', 'foi', 'controlledProperty', 'limit'] return self._request(path=path, kwargs=query_params.check_params(p_list)) def commands_send_command_in_control_stream(self, control_id: str, data: str) -> dict: """ - implements /commands + implements /controlstreams/{control_id}/commands @type control_id: string @param control_id: id of control channel @type data: dict @@ -971,7 +971,7 @@ def commands_send_command_in_control_stream(self, control_id: str, data: str) -> @returns: `dict` of command metadata """ - path = f'controls/{control_id}/commands' + path = f'controlstreams/{control_id}/commands' return self._request(path=path, data=data, method='POST') def commands_delete_command(self, command_id: str) -> dict: diff --git a/tests/test_ogcapi_connectedsystems_osh.py b/tests/test_ogcapi_connectedsystems_osh.py index dde37340..0ae8a651 100644 --- a/tests/test_ogcapi_connectedsystems_osh.py +++ b/tests/test_ogcapi_connectedsystems_osh.py @@ -5,24 +5,28 @@ # # Contact email: ian@botts-inc.com # ============================================================================== - -from datetime import datetime +from collections import namedtuple +from datetime import datetime, timezone, timedelta import json -from tests.utils import service_ok - import pytest +from pytest_httpserver import HTTPServer -from owslib.ogcapi.connectedsystems import Commands, ControlChannels, Datastreams, Deployments, Observations, \ +from owslib.ogcapi.connectedsystems import Commands, ControlStreams, Datastreams, Deployments, Observations, \ Properties, SamplingFeatures, SystemEvents, SystemHistory, Systems from owslib.util import Authentication -# Directs to OSH (Open Sensor Hub) hosted test server -TEST_URL = 'http://34.67.197.57:8585/sensorhub/api/' @pytest.fixture(scope="session") def fixtures(): class OSHFixtures: + NODE_TEST_OK_URL = 'http://34.67.197.57:8585/sensorhub/test' + # Directs to OSH hosted test server + # TEST_URL = 'http://34.67.197.57:8585/sensorhub/api/' + SERVER = 'localhost' + SERVER_PORT = 8585 + SERVER_CSAPI_EP = '/sensorhub/api/' + TEST_URL = f'http://{SERVER}:{SERVER_PORT}{SERVER_CSAPI_EP}' auth = Authentication('auto_test', 'automated_tester24') sml_headers = {'Content-Type': 'application/sml+json'} json_headers = {'Content-Type': 'application/json'} @@ -172,8 +176,8 @@ class OSHFixtures: properties_api = Properties(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) datastream_api = Datastreams(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) observations_api = Observations(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) - control_channels_api = ControlChannels(TEST_URL, auth=auth, - headers={'Content-Type': 'application/json'}) + control_channels_api = ControlStreams(TEST_URL, auth=auth, + headers={'Content-Type': 'application/json'}) commands_api = Commands(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) system_events_api = SystemEvents(TEST_URL, auth=auth, headers=omjson_headers) system_history_api = SystemHistory(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) @@ -215,213 +219,523 @@ def delete_all_sampling_features(self): yield OSHFixtures() -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestSystems: - @pytest.mark.online - def test_system_readonly(self, fixtures): - # get all systems - res = fixtures.systems_api.systems() - assert len(res['items']) > 0 - check_ids = ["0s2lbn2n1bnc8", "94n1f19ld7tlc"] - assert [any(sys_id == item['id'] for item in res['items']) for sys_id in check_ids] +@pytest.fixture(scope="session") +def system_ids(): + yield ["0s2lbn2n1bnc8", "94n1f19ld7tlc"] - # get a single system - res = fixtures.systems_api.system(check_ids[0]) - assert res is not None - assert res['id'] == check_ids[0] - - @pytest.mark.skip(reason="Skip transactional test") - def test_system_functions(self, fixtures): - # insertion of systems - fixtures.systems_api.headers = fixtures.sml_headers - sys_create_res = fixtures.systems_api.system_create(json.dumps(fixtures.system_definitions)) - assert sys_create_res is not None - - # update of system and retrieval - sml_desc_copy = fixtures.sys_sml_to_update.copy() - sml_desc_copy['description'] = 'Updated Description' - sml_str = json.dumps(sml_desc_copy) - post_systems = fixtures.systems_api.system_update('blid74chqmses', sml_str) - - check_result = fixtures.systems_api.system('blid74chqmses') - assert check_result['properties']['description'] == 'Updated Description' - - # deletion of system - all_systems = fixtures.systems_api.systems() - - # clear datastreams - fixtures.delete_all_datastreams() - - for system in all_systems['items']: - res = fixtures.systems_api.system_delete(system['id']) - assert res == {} - - -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestDeployments: - @pytest.mark.skip(reason="Skip transactional test") - def test_deployment_create(self, fixtures): - res1 = fixtures.deployment_api.deployment_create(json.dumps(fixtures.deployment_definition)) - assert res1 - res2 = fixtures.deployment_api.deployments() - assert fixtures.deployment_expected_id in [x['id'] for x in res2['items']] - res3 = fixtures.deployment_api.deployment(fixtures.deployment_expected_id) - assert res3['properties']['name'] == 'Test Deployment 001' and res3[ - 'id'] == fixtures.deployment_expected_id - - @pytest.mark.skip(reason="Skip transactional test") - def test_deployment_update(self, fixtures): - fixtures.deployment_definition['properties']['description'] = 'Updated Description of Deployment 001' - res = fixtures.deployment_api.deployment_update(fixtures.deployment_expected_id, - json.dumps(fixtures.deployment_definition)) - assert res is not None - @pytest.mark.skip(reason="Skip transactional test") - def test_deployment_delete(self, fixtures): - res = fixtures.deployment_api.deployment_delete(fixtures.deployment_expected_id) - assert res is not None +@pytest.fixture(scope="session") +def system_definitions(system_ids): + yield { + system_ids[0]: { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:001", + "label": "Test SML Sensor", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + }, + system_ids[1]: { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:002", + "label": "Test SML Sensor #2", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + } + } -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestSamplingFeatures: - @pytest.mark.online - def test_sampling_features_readonly(self, fixtures): - all_features = fixtures.sampling_feature_api.sampling_features(use_fois=True) - assert len(all_features['items']) == 51 - - feature_id = "c4nce3peo8hvc" - feature = fixtures.sampling_feature_api.sampling_feature(feature_id, use_fois=True) - assert feature['id'] == feature_id - assert feature['properties']['name'] == 'Station WS013' - - @pytest.mark.skip(reason="Skip transactional test") - def test_sampling_features_all(self, fixtures): - # setup - fixtures.delete_all_systems() - system_id = fixtures.create_single_system() - - # create a sampling feature - fixtures.sampling_feature_api.headers = fixtures.geojson_headers - res = fixtures.sampling_feature_api.sampling_feature_create(system_id, - json.dumps(fixtures.feature_def), True) - assert fixtures.sampling_feature_api.response_headers['Location'] is not None - sampling_feature_id = fixtures.sampling_feature_api.response_headers['Location'].split('/')[-1] - - # get all sampling features - res = fixtures.sampling_feature_api.sampling_features(use_fois=True) - assert len(res['items']) > 0 - assert any(x['id'] == sampling_feature_id for x in res['items']) - - # get the sampling feature we created - res = fixtures.sampling_feature_api.sampling_feature(sampling_feature_id, use_fois=True) - assert res['properties']['name'] == 'Test Station 001' - assert res['properties']['featureType'] == 'http://www.w3.org/ns/sosa/Station' - - # get sampling features from a system - res = fixtures.sampling_feature_api.sampling_features_from_system(system_id, use_fois=True) - assert len(res['items']) > 0 - assert any(x['id'] == sampling_feature_id for x in res['items']) - - # delete the sampling feature - res = fixtures.sampling_feature_api.sampling_feature_delete(sampling_feature_id, use_fois=True) - res = fixtures.sampling_feature_api.sampling_features(use_fois=True) - assert res == {'items': []} - - -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestDatastreams: - @pytest.mark.online - def test_datastreams_readonly(self, fixtures): - ds_id = 'kjg2qrcm40rfk' - datastreams = fixtures.datastream_api.datastreams() - assert len(datastreams['items']) > 0 - assert any(x['id'] == ds_id for x in datastreams['items']) - - datastream = fixtures.datastream_api.datastream(ds_id) - assert datastream['id'] == ds_id - assert datastream['name'] == "Simulated Weather Station Network - weather" - - @pytest.mark.skip(reason="Skip transactional test") - def test_all_ds_functions(self, fixtures): - # preflight cleanup - fixtures.delete_all_systems() - # setup systems needed - fixtures.systems_api.headers = fixtures.sml_headers - # systems = fixtures.systems_api.system_create(json.dumps(fixtures.system_definitions)) - system = fixtures.create_single_system() +@pytest.fixture(scope="session") +def feature_id(): + yield "c4nce3peo8hvc" - # insert a datastream - ds_def_str = json.dumps(fixtures.ds_definition) - ds_api = Datastreams(fixtures.TEST_URL, auth=fixtures.auth, headers=fixtures.json_headers) - datastream_create = ds_api.datastream_create_in_system(system, ds_def_str) - - # get the datastream id from Location header - ds_id = ds_api.response_headers['Location'].split('/')[-1] - ds = ds_api.datastream(ds_id) - ds2 = ds_api.datastreams_of_system(system) - assert ds['id'] == ds_id - assert any(x['id'] == ds_id for x in ds2['items']) - - # update the datastream omitted due to server error - # update schema has a similar server side issue - - # retrieve the schema for the datastream - res = ds_api.datastream_retrieve_schema_for_format(ds_id) - assert res is not None and len(res) > 0 - - # delete the datastream - ds_delete = ds_api.datastream_delete(ds_id) - assert ds_delete == {} - - -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestObservations: - @pytest.mark.online - def test_observations_readonly(self, fixtures): - ds_id = 'kjg2qrcm40rfk' - observations = fixtures.observations_api.observations_of_datastream(ds_id) - assert len(observations['items']) > 0 - assert 'result' in observations['items'][0] - - observation_of_ds = fixtures.observations_api.observations_of_datastream(ds_id) - assert observation_of_ds['items'][0]['result']['stationID'] == "WS013" - keys = ['stationID', 'temperature', 'pressure', 'humidity', 'windSpeed', 'windDirection'] - assert [key in observation_of_ds['items'][0]['result'] for key in keys] - - @pytest.mark.skip(reason="Skip transactional test") - def test_observations(self, fixtures): - # setup - fixtures.delete_all_systems() - system = fixtures.create_single_system() - ds = fixtures.create_single_datastream(system) - the_time = datetime.utcnow().isoformat() + 'Z' - - observation = { - "phenomenonTime": the_time, - "resultTime": the_time, + +@pytest.fixture(scope="session") +def feature_definition(system_ids): + yield { + "geometry": { + "type": "Point", + "coordinates": [-80.0, 35.0] + }, + "type": "Feature", + "properties": { + "featureType": "http://www.w3.org/ns/sosa/Station", + "uid": "urn:osh:sensor:teststation:001", + "name": "Test Station 001", + "description": "A test station", + "parentSystem@link": {"href": f"http://localhost:8585/sensorhub/api/systems/{system_ids[0]}"}, + "sampledFeature@link": { + "href": "https://data.example.com/link/to/resource", + "rel": "alternate", + "type": "application/json", + "hreflang": "en-US", + "title": "Resource Name", + "uid": "urn:x-org:resourceType:0001", + "rt": "http://www.example.org/uri/of/concept", + "if": "http://www.opengis.net/spec/spec-id/version"} + } + } + + +@pytest.fixture(scope="session") +def datastream_ids(): + yield ["datastream001", "datastream002"] + + +@pytest.fixture(scope="session") +def datastream_definitions(datastream_ids): + yield { + datastream_ids[0]: { + "name": "Test Datastream", + "outputName": "Test Output #1", + "schema": { + "obsFormat": "application/swe+json", + "encoding": { + "type": "JSONEncoding", + "vectorAsArrays": False + }, + "recordSchema": { + "type": "DataRecord", + "label": "Test Datastream Record", + "updatable": False, + "optional": False, + "definition": "http://test.com/Record", + "fields": [ + { + "type": "Time", + "label": "Test Datastream Time", + "updatable": False, + "optional": False, + "definition": "http://test.com/Time", + "name": "timestamp", + "uom": { + "href": "http://test.com/TimeUOM" + } + }, + { + "type": "Boolean", + "label": "Test Datastream Boolean", + "updatable": False, + "optional": False, + "definition": "http://test.com/Boolean", + "name": "testboolean" + } + ] + } + } + } + } + + +@pytest.fixture(scope="session") +def observation_ids(): + yield ["obs0001", "obs0002"] + + +@pytest.fixture(scope="session") +def times(observation_ids): + TimeInfo = namedtuple('TimeInfo', ["iso_time", "timestamp"]) + iso_time = datetime.now(timezone.utc).isoformat() + 'Z' + timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) + + yield TimeInfo(iso_time, timestamp) + + +def offset_time(timestamp, offset_seconds): + dt = datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc) + offset_dt = dt + timedelta(seconds=offset_seconds) + TimeInfo = namedtuple('TimeInfo', ["iso_time", "timestamp"]) + return TimeInfo(offset_dt.isoformat() + 'Z', int(offset_dt.timestamp() * 1000)) + + +@pytest.fixture(scope="session") +def observations_definitions(observation_ids, times): + offset_ti = offset_time(times.timestamp, 1) + yield { + observation_ids[0]: { + "phenomenonTime": times.iso_time, + "resultTime": times.iso_time, "result": { - "timestamp": datetime.now().timestamp() * 1000, + "timestamp": times.timestamp, "testboolean": True } + }, + observation_ids[1]: { + "phenomenonTime": offset_ti.iso_time, + "resultTime": offset_ti.iso_time, + "result": { + "timestamp": offset_ti.timestamp, + "testboolean": False + } } - fixtures.observations_api.headers = {'Content-Type': 'application/om+json'} - res = fixtures.observations_api.observations_create_in_datastream(ds, json.dumps(observation)) - obs = fixtures.observations_api.observations_of_datastream(ds) - assert obs['items'][0]['phenomenonTime'] == the_time - obs_id = obs['items'][0]['id'] - res = fixtures.observations_api.observations_delete(obs_id) - obs = fixtures.observations_api.observations_of_datastream(ds) - assert obs['items'] == [] - fixtures.delete_all_systems() - - -@pytest.mark.skipif(not service_ok(TEST_URL), reason="OSH server is unreachable") -class TestSystemHistory: - @pytest.mark.online - def test_system_history(self, fixtures): - sys_id = '0s2lbn2n1bnc8' - res = fixtures.system_history_api.system_history(sys_id) - assert len(res['items']) > 0 - history_id = res['items'][0]['properties']['validTime'][0] - res = fixtures.system_history_api.system_history_by_id(system_id=sys_id, history_id=history_id) - assert res['id'] == sys_id + } + + +@pytest.fixture(scope="session") +def controlstream_ids(): + # yield ["ctrlchan123", "ctrlchan456"] + yield ["ctrlchan123"] + + +@pytest.fixture(scope="session") +def controlstream_definitions(controlstream_ids): + yield { + controlstream_ids[0]: { + "name": "Test Control Channel 001", + "description": "A test control channel", + "issueTime": None, + "executionTime": None, + "live": False, + "async": True, + "schema": { + "commandFormat": "application/json", + "parametersSchema": { + "type": "DataRecord", + "fields": [ + { + "type": "Text", + "name": "textCommand", + "definition": "http://test.com/Command", + "label": "Text Command" + } + ] + } + } + } + } + + +@pytest.fixture(scope="session") +def command_ids(): + yield ["cmd0001", "cmd0002"] + + +@pytest.fixture(scope="session") +def command_definitions(command_ids): + yield { + command_ids[0]: { + "parameters": { + "textCommand": "Start Measurement" + } + }, + command_ids[1]: { + "parameters": { + "textCommand": "Stop Measurement" + } + } + } + + +@pytest.fixture(scope="session") +def server_data(system_definitions, system_ids, feature_id, feature_definition, datastream_ids, datastream_definitions, + controlstream_definitions, observation_ids, observations_definitions, command_definitions): + class ServerData: + systems = { + "items": list(system_definitions.values()) + } + features = { + "items": [ + feature_definition + ] + } + datastreams = { + "items": list(datastream_definitions.values()) + } + controlstreams = { + "items": list(controlstream_definitions.values()) + } + observations = { + "items": list(observations_definitions.values()) + } + commands = { + "items": list(command_definitions.values()) + } + + yield ServerData + + +@pytest.mark.online +def test_systems_get(server_data, system_ids, system_definitions): + with HTTPServer(port=8585) as ts: + # This is not strictly what a server would respond with, but sufficient for testing + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request('/sensorhub/api/systems').respond_with_json( + (server_data.systems)) + ts.expect_request(f'/sensorhub/api/systems/{system_ids[0]}').respond_with_json( + system_definitions[system_ids[0]]) + ts.expect_request('/sensorhub/api/systems/94n1f19ld7tlc').respond_with_json( + system_definitions[system_ids[1]]) + ts.expect_request(f'/sensorhub/api/systems', method='POST').respond_with_json({}, status=201, headers={ + 'Location': 'http://localhost:8585/sensorhub/api/systems/insertedsystem001'}) + + systems_api = Systems('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # get all systems + res = systems_api.systems() + assert len(res['items']) == 2 + check_ids = system_ids + assert res['items'] == server_data.systems['items'] + + # get a single system + res = systems_api.system(check_ids[0]) + assert res is not None + assert res == system_definitions[check_ids[0]] + + +@pytest.mark.online +def test_datastreams_get(server_data, datastream_ids, datastream_definitions): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request('/sensorhub/api/datastreams').respond_with_json( + server_data.datastreams) + for ds_id in datastream_ids: + ts.expect_request(f'/sensorhub/api/datastreams/{ds_id}').respond_with_json( + datastream_definitions) + + datastream_api = Datastreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # get all datastreams + res = datastream_api.datastreams() + ds_ret = res['items'] + assert len(ds_ret) == 1 + + # get a single datastream + for ds_id in datastream_ids: + res = datastream_api.datastream(ds_id) + assert res is not None + assert res == datastream_definitions + + +@pytest.mark.online +def test_observations_get(server_data, observation_ids, observations_definitions): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request('/sensorhub/api/observations').respond_with_json( + server_data.observations) + for obs_id in observation_ids: + ts.expect_request(f'/sensorhub/api/observations/{obs_id}').respond_with_json( + observations_definitions[obs_id]) + + observations_api = Observations('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # get all observations + res = observations_api.observations() + obs_ret = res['items'] + assert len(obs_ret) == 2 + + # get a single observation + for obs_id in observation_ids: + res = observations_api.observation(obs_id) + assert res is not None + assert res == observations_definitions[obs_id] + + +@pytest.mark.online +def test_controlstreams_get(server_data, controlstream_ids, controlstream_definitions): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request('/sensorhub/api/controlstreams').respond_with_json( + server_data.controlstreams) + for cs_id in controlstream_ids: + ts.expect_request(f'/sensorhub/api/controlstreams/{cs_id}').respond_with_json( + controlstream_definitions[cs_id]) + + control_channels_api = ControlStreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # get all control channels + res = control_channels_api.controlstreams() + cs_ret = res['items'] + assert len(cs_ret) == 1 + + # get a single control channel + res = control_channels_api.controlstream(controlstream_ids[0]) + assert res is not None + assert res == controlstream_definitions.get(controlstream_ids[0]) + + +@pytest.mark.online +def test_commands_get(server_data, command_ids, command_definitions): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request('/sensorhub/api/commands').respond_with_json( + server_data.commands) + for cmd_id in command_ids: + ts.expect_request(f'/sensorhub/api/commands/{cmd_id}').respond_with_json( + command_definitions[cmd_id]) + + commands_api = Commands('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # get all commands + res = commands_api.commands() + cmd_ret = res['items'] + assert len(cmd_ret) == 2 + + # get a single command + for cmd_id in command_ids: + res = commands_api.command(cmd_id) + assert res is not None + assert res == command_definitions[cmd_id] + + +# Tests for resource creation don't work well against a mocked server since the design of the core OGC api request method does not make the headers available +@pytest.mark.online +def test_system_insert(server_data, system_definitions, system_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + # ts.expect_request('/sensorhub/api/systems').respond_with_json( + # (server_data.systems)) + ts.expect_request(f'/sensorhub/api/systems', method='POST').respond_with_json({}, status=201, headers={ + 'Location': f'http://localhost:8585/sensorhub/api/systems/{system_ids[0]}'}) + + systems_api = Systems('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # insert a system + res = systems_api.system_create(json.dumps(system_definitions.get(system_ids[0]))) + assert res == {} + + +@pytest.mark.online +def test_datastream_insert(server_data, datastream_definitions, datastream_ids, system_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/systems/{system_ids[0]}/datastreams', method='POST').respond_with_json({}, + status=201, + headers={ + 'Location': f'http://localhost:8585/sensorhub/api/datastreams/{datastream_ids[0]}'}) + + datastream_api = Datastreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # insert a datastream + res = datastream_api.datastream_create_in_system(system_ids[0], json.dumps( + datastream_definitions.get(datastream_ids[0]))) + assert res == {} + + +@pytest.mark.online +def test_observation_insert(server_data, observations_definitions, observation_ids, datastream_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/datastreams/{datastream_ids[0]}/observations', + method='POST').respond_with_json({}, status=201, headers={ + 'Location': f'http://localhost:8585/sensorhub/api/observations/{observation_ids[0]}'}) + + observations_api = Observations('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # insert an observation + res = observations_api.observations_create_in_datastream(datastream_ids[0], json.dumps( + observations_definitions.get(observation_ids[0]))) + assert res == {} + + +@pytest.mark.online +def test_controlstream_insert(server_data, controlstream_definitions, controlstream_ids, system_ids): + with (HTTPServer(port=8585) as ts): + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/systems/{system_ids[0]}/controlstreams', + method='POST' + ).respond_with_json({}, + status=201, + headers={ + 'Location': f'http://localhost:8585/sensorhub/api/controlstreams/{controlstream_ids[0]}'}) + + control_channels_api = ControlStreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # insert a control stream + res = control_channels_api.control_create_in_system(system_ids[0], json.dumps( + controlstream_definitions.get(controlstream_ids[0]))) + assert res == {} + + +@pytest.mark.online +def test_command_insert(server_data, command_definitions, command_ids, controlstream_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/controlstreams/{controlstream_ids[0]}/commands', + method='POST').respond_with_json({}, status=201, headers={ + 'Location': f'http://localhost:8585/sensorhub/api/commands/{command_ids[0]}'}) + + commands_api = Commands('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # insert a command + res = commands_api.commands_send_command_in_control_stream(controlstream_ids[0], json.dumps( + command_definitions.get(command_ids[0]))) + assert res == {} + +@pytest.mark.online +def test_system_delete(system_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/systems/{system_ids[0]}', method='DELETE').respond_with_data( + '', status=204) + + systems_api = Systems('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # delete a system + res = systems_api.system_delete(system_ids[0]) + assert res == {} + +@pytest.mark.online +def test_datastream_delete(datastream_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/datastreams/{datastream_ids[0]}', method='DELETE').respond_with_data( + '', status=204) + + datastream_api = Datastreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # delete a datastream + res = datastream_api.datastream_delete(datastream_ids[0]) + assert res == {} + +@pytest.mark.online +def test_observation_delete(observation_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/observations/{observation_ids[0]}', method='DELETE').respond_with_data( + '', status=204) + + observations_api = Observations('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # delete an observation + res = observations_api.observations_delete(observation_ids[0]) + assert res == {} + +@pytest.mark.online +def test_controlstream_delete(controlstream_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/controlstreams/{controlstream_ids[0]}', method='DELETE').respond_with_data( + '', status=204) + + control_channels_api = ControlStreams('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # delete a control stream + res = control_channels_api.control_delete(controlstream_ids[0]) + assert res == {} + +@pytest.mark.online +def test_command_delete(command_ids): + with HTTPServer(port=8585) as ts: + ts.expect_request('/sensorhub/api/').respond_with_json({"title": "SensorHub OGC API - Connected Systems"}) + ts.expect_request(f'/sensorhub/api/commands/{command_ids[0]}', method='DELETE').respond_with_data( + '', status=204) + + commands_api = Commands('http://localhost:8585/sensorhub/api/', auth=None, + headers={'Content-Type': 'application/json'}) + + # delete a command + res = commands_api.commands_delete_command(command_ids[0]) + assert res == {}