diff --git a/.github/workflows/new-pycram-ci.yml b/.github/workflows/new-pycram-ci.yml index 7ad8b1486..0860f0dde 100644 --- a/.github/workflows/new-pycram-ci.yml +++ b/.github/workflows/new-pycram-ci.yml @@ -61,6 +61,7 @@ jobs: cd /opt/ros/overlay_ws/src/pycram pip3 install -r requirements.txt pip3 install -r requirements-resolver.txt + pip3 install -r requirements-ontology.txt - name: Install pytest & pyjpt run: | diff --git a/doc/source/examples.rst b/doc/source/examples.rst index f4cc7bc1d..055406048 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -66,3 +66,9 @@ ORM Examples notebooks/orm_example notebooks/migrate_neems notebooks/orm_querying_examples + +Ontology +========== + +.. nbgallery:: + notebooks/ontology \ No newline at end of file diff --git a/doc/source/installation.rst b/doc/source/installation.rst index 49498a283..2b1ebecc4 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -14,7 +14,7 @@ All dependencies are available via PyPi. PyCRAM is developed and tested currently with Python3.8, Ubuntu 20.04 and ROS Noetic. -This guide excpects you to have a GitHub account with an SSH key (you can read about adding a new ssh key +This guide expects you to have a GitHub account with an SSH key (you can read about adding a new ssh key `here `_). Installing ROS @@ -166,13 +166,16 @@ Install the requirements in your python interpreter. .. code-block:: shell + cd ~/workspace/ros/src/pycram/doc pip install -r requirements.txt Run pycram and build the docs. .. code-block:: shell + cd ~/workspace/ros roslaunch pycram ik_and_description.launch + cd src/pycram/doc make html Show the index. @@ -193,7 +196,8 @@ Install PyCharm Professional First, `install PyCharm Professional `_. -Create a JetBrains account and verify it for educational purpose. Now you can unlock the PyCharm Professional features in PyCharm. +Create a JetBrains account and verify it for educational purpose. Normally, a school email address would suffice, otherwise you would have to upload your student/employee id card. The verification process typically takes 1~2-week time, so until then please use Trial version. +Once your account is verified, you can unlock the PyCharm Professional features in PyCharm. The next step will set up the virtual Python environment, so it can be used as a project interpreter in PyCharm. @@ -207,9 +211,20 @@ The virtualenvwrapper allows to manage virtual Python environments, where additi .. code-block:: shell sudo pip3 install virtualenvwrapper + + +(Optional but recommended) Set virtualenvwrapper's `WORKON_HOME` env variable, of which the default value is `~/.virtualenvs` + +.. code-block:: shell + echo "export WORKON_HOME=~/envs" >> ~/.bashrc - echo "source /usr/local/bin/virtualenvwrapper.sh" >> ~/.bashrc mkdir -p $WORKON_HOME + +Activate virtualenvwrapper at terminal start + +.. code-block:: shell + + echo "source /usr/local/bin/virtualenvwrapper.sh" >> ~/.bashrc source ~/.bashrc Create a virtual env based on the workspaces libraries (see build-ws_) and add the `--system-site-packages` to get them properly. The env will be registered in `$WORKON_HOME`. @@ -264,6 +279,29 @@ folder as Tests and the resources as Resources. To verify that it works, you can execute any Testcase. +**Useful tips** + +- `Keyboard shortcuts `_ + - `Keymap `_ + +- `Python interpreter `_ + - `Python virtual environment `_ +- `Python packages `_ +- `Python console `_ + +- **View | Active Editor | Soft-wrap**: wrap text inside the editor view + +- **View | Tool Windows | Structure**: display structure window for easy content navigation + +- **F12**: Open terminal + +- **Double Shift**: Quick file search + +- **Ctrl F/R**: Find/Replace text in current file + +- **Ctrl Shift F/R**: Find/Replace text in the whole project, module, directory, scope + +- **Settings | Editor | Inspections | Code is compatible with specific Python versions**: Enable/Disable Python version-specific warnings Using IPython as REPL ===================== @@ -281,7 +319,8 @@ Enable autoreload To use changes made in the Python file while the Repl is running you need to enable the iPython extension ``autoreload``. This can be done using the iPython startup files, these are files which are always run if iPython is started. -The startup files are located in ``~/.ipython/profile_default/startup`` along with a README file which explains the usage +First run ``ipython profile create`` to create a `default profile `_. +Then you will find the startup files located in ``~/.ipython/profile_default/startup`` along with a README file which explains the usage of the startup files. In this directory create a file called ``00-autoreload.ipy`` and enter the following code to the file. @@ -297,7 +336,7 @@ code in the shell is executed. Run scripts ----------- -IPython allows to run Python files and enabled the access to created variables. This can be helpful +IPython allows to run Python files and enables the access to created variables. This can be helpful if you want to create a setup script which initializes things like the BulletWorld, Objects and imports relevant modules. diff --git a/doc/source/remarks.rst b/doc/source/remarks.rst index 9b7003f6d..902ae7b1b 100644 --- a/doc/source/remarks.rst +++ b/doc/source/remarks.rst @@ -16,9 +16,16 @@ To fix this issue one has to execute python -m ipykernel install --user --name --display-name "" +, eg. + +.. code-block:: shell + + python -m ipykernel install --user --name pycram --display-name "pycram" + in your terminal. --name is the name of your virtual environment and --display-name is the name -that will display in the drop down menu of jupyter. After that, select the correct kernel and +that will display in the drop down menu of jupyter. After that, select the correct Python interpreter kernel (``pycram``) and everything should work now. +Refer `here `_ for details. Adding Notebooks to the Documentation ===================================== diff --git a/examples/ontology.ipynb b/examples/ontology.ipynb new file mode 100644 index 000000000..0bb9ff549 --- /dev/null +++ b/examples/ontology.ipynb @@ -0,0 +1,1107 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "# Ontology interface\n", + "\n", + "This tutorial demonstrates basic usages of __owlready2__ API for ontology manipulation. Notably, new ontology concept triple classes (subject, predicate, object) will be dynamically created, with optional existing ontology parent classes that are loaded from an OWL ontology. Then through the interconnected relations specified in triples, designators and their corresponding ontology concepts can be double-way queried for input information in certain tasks, eg. making a robot motion plan. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.317484Z", + "start_time": "2024-04-12T11:49:12.022030Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Unknown attribute \"type\" in /robot[@name='pr2']/link[@name='base_laser_link']\n", + "Unknown attribute \"type\" in /robot[@name='pr2']/link[@name='wide_stereo_optical_frame']\n", + "Unknown attribute \"type\" in /robot[@name='pr2']/link[@name='narrow_stereo_optical_frame']\n", + "Unknown attribute \"type\" in /robot[@name='pr2']/link[@name='laser_tilt_link']\n", + "Failed to import Giskard messages, the real robot will not be available\n", + "Could not import RoboKudo messages, RoboKudo interface could not be initialized\n", + "pybullet build time: Nov 28 2023 23:51:11\n" + ] + } + ], + "source": [ + "from pathlib import Path\n", + "from typing import Type, TYPE_CHECKING\n", + "import pycram\n", + "from pycram.designator import ObjectDesignatorDescription" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "# Owlready2\n", + "\n", + "[Owlready2](https://owlready2.readthedocs.io/en/latest/intro.html) is a Python package providing a transparent access to OWL ontologies. It supports various manipulation operations, including but not limited to loading, modification, saving ontologies. Built-in supported reasoners include [HermiT](http://www.hermit-reasoner.com) and [Pellet](https://github.com/stardog-union/pellet)." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.333080Z", + "start_time": "2024-04-12T11:49:13.321031Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [], + "source": [ + "import logging\n", + "try:\n", + " import owlready2\n", + " from owlready2 import *\n", + "except ImportError:\n", + " owlready2 = None\n", + " logging.error(\"Could not import owlready2, Ontology Manager could not be initialized!\")\n", + "\n", + "logging.getLogger().setLevel(logging.INFO)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "# Ontology Manager\n", + "\n", + "`OntologyManager` is the singleton class acting as the main interface between PyCram with ontologies, whereby object instances in the former could query relevant information based on the semantic connection with their corresponding ontology concepts.\n", + "\n", + "Such connection, as represented by triples (subject-predicate-object), could be also created on the fly if not pre-existing in the loaded ontology.\n", + "\n", + "Also new and updated concepts with their properties defined in runtime could be stored into an [SQLite3 file database](https://owlready2.readthedocs.io/en/latest/world.html) for reuse.\n", + "\n", + "Here we will use [SOMA ontology](https://ease-crc.github.io/soma) as the baseline to utilize the generalized concepts provided by it." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.386628Z", + "start_time": "2024-04-12T11:49:13.333901Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911825.215667]: Ontology [http://www.ease-crc.org/ont/SOMA-HOME.owl#]'s name: SOMA-HOME has been loaded\n", + "[INFO] [1716911825.216314]: - main namespace: SOMA-HOME\n", + "[INFO] [1716911825.216700]: - loaded ontologies:\n", + "[INFO] [1716911825.216988]: http://www.ease-crc.org/ont/SOMA-HOME.owl#\n", + "[INFO] [1716911825.217239]: http://www.ease-crc.org/ont/DUL.owl#\n", + "[INFO] [1716911825.217476]: http://www.ease-crc.org/ont/SOMA.owl#\n" + ] + } + ], + "source": [ + "from pycram.ontology.ontology import OntologyManager, SOMA_HOME_ONTOLOGY_IRI\n", + "from pycram.ontology.ontology_common import OntologyConceptHolderStore, OntologyConceptHolder\n", + "\n", + "ontology_manager = OntologyManager(SOMA_HOME_ONTOLOGY_IRI)\n", + "main_ontology = ontology_manager.main_ontology\n", + "soma = ontology_manager.soma\n", + "dul = ontology_manager.dul" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Ontology Concept Holder\n", + "__OntologyConceptHolder__ class, encapsulating an __owlready2.Thing__ instance, is used primarily as the binding connection between the `owlready2.Thing` ontology concept to PyCram designators. We make it that way, instead of creating a custom concept class that inherits from `owlready2.Thing` for the reasons below:\n", + "\n", + "- `owlready2` API does not have very robust support for client classes to inherit from theirs with added (non-semantic) attributes, particularly in our case, where classes like `DesignatorDescription` have their `metaclass` as `ABCMeta`, while it is `EntityClass` that is the metaclass used for basically all concepts (classes, properties) in `owlready2`. Since those two metaclasses just bear no relationship, for the inheritance to work, the only way is to create a child metaclass with both of those as parents, however without full support by `owlready2`, plus the second reason below will point out it's not worth the effort.\n", + "\n", + "\n", + "- Essentially, we will have new ontology concept classes created dynamically, if their types inherit from `owlready2.Thing`, all custom non-semantic (of types known only by PyCram) attributes, which are defined by their own in child classes, will apparently be not savable into the ontology by `owlready2` api. Then the next time the ontology is loaded, those same dynamic classes will not be created anymore, thus without those attributes either, causing running error.\n", + "\n", + "As such, in short, an ontology concept class, either newly created on the fly or loaded from ontologies, has to be `owlready2.Thing` or its pure derived class (without non-semantic attributes), so to make itself reusable upon reloading.\n", + "\n", + "Notable attributes:\n", + "\n", + "- `ontology_concept`: An ontology concept of `owlready2.Thing` type or its pure child class (without custom non-semantic attributes), either dynamically created, or loaded from an ontology\n", + "\n", + "- `designators`: a list of `DesignatorDescription` instances associated with `ontology_concept`\n", + "\n", + "- `resolve`: a `Callable` typically returning a list of `DesignatorDescription` as specific designators, like `designators` or its subset, inferred from the ontology concept. In fact, it can be resolved to anything else relevant, up to the caller." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Query ontology classes and their properties\n", + "\n", + "Classes in the loaded ontology can be queried based on their exact names, or part of them, or by namespace.\n", + "Here, we can see essential info (ancestors, super/sub-classes, properties, direct instances, etc.) of the found ontology class." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.472991Z", + "start_time": "2024-04-12T11:49:13.387739Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911825.339363]: -------------------\n", + "[INFO] [1716911825.340001]: SOMA.DesignedContainer \n", + "[INFO] [1716911825.340401]: Super classes: [DUL.DesignedArtifact, DUL.DesignedArtifact, SOMA.hasDisposition.some(SOMA.Containment), SOMA.hasDisposition.some(SOMA.Containment)]\n", + "[INFO] [1716911825.340892]: Ancestors: {SOMA.DesignedContainer, DUL.DesignedArtifact, owl.Thing, DUL.PhysicalArtifact, DUL.Object, DUL.Entity, DUL.PhysicalObject}\n", + "[INFO] [1716911825.341236]: Subclasses: [SOMA.Bottle, SOMA.Crockery, SOMA.Box, SOMA.Building, SOMA.Carafe, SOMA.Cupboard, SOMA.Dishwasher, SOMA.Dispenser, SOMA.Drawer, SOMA.Jar, SOMA.Pack, SOMA.Oven, SOMA.Shaker, SOMA.Refrigerator, SOMA.TrashContainer, SOMA-HOME.CustomContainerConcept, SOMA-HOME.AnotherCustomContainerConcept, SOMA-HOME.OntologyPlaceHolderObject, SOMA-HOME.OntologyLiquidHolderObject]\n", + "[INFO] [1716911825.341683]: Properties: [rdf-schema.isDefinedBy, rdf-schema.comment, SOMA.hasDisposition, rdf-schema.label]\n", + "[INFO] [1716911825.400233]: Instances: [SOMA-HOME.ontology_custom_container_concept, SOMA-HOME.another_custom_container_concept, SOMA-HOME.table_concept, SOMA-HOME.stool_concept, SOMA-HOME.shelf_concept, SOMA-HOME.egg_tray_concept, SOMA-HOME.cup_concept, SOMA-HOME.bowl_concept, SOMA-HOME.pitcher_concept]\n", + "[INFO] [1716911825.401077]: Direct Instances: []\n", + "[INFO] [1716911825.401662]: Inverse Restrictions: []\n", + "DUL.PhysicalObject\n", + "[SOMA.Affordance, SOMA.Disposition]\n" + ] + } + ], + "source": [ + "ontology_designed_container_class = ontology_manager.get_ontology_class('DesignedContainer')\n", + "ontology_manager.print_ontology_class(ontology_designed_container_class)\n", + "classes = ontology_manager.get_ontology_classes_by_subname('PhysicalObject'); print(classes[0])\n", + "classes = ontology_manager.get_ontology_classes_by_namespace('SOMA'); print(classes[:2])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "__Descendants__ of an ontology class can be also queried by" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.490775Z", + "start_time": "2024-04-12T11:49:13.473745Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "data": { + "text/plain": [ + "[SOMA.Bottle, SOMA.DesignedContainer, SOMA.Bowl, SOMA.Crockery, SOMA.Box]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ontology_manager.get_ontology_descendant_classes(ontology_designed_container_class)[:5]" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Create a new ontology concept class and its individual\n", + "\n", + "A new ontology class can be created dynamically as inheriting from an existing class in the loaded ontology.\n", + "Here we create the class and its instance, also known as [__individual__](https://owlready2.readthedocs.io/en/latest/class.html#creating-equivalent-classes) in ontology terms, which is then wrapped inside an `OntologyConceptHolder`." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.496929Z", + "start_time": "2024-04-12T11:49:13.491199Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [], + "source": [ + "ontology_custom_container_class = ontology_manager.create_ontology_concept_class('CustomContainerConcept',\n", + " ontology_designed_container_class)\n", + "custom_container_concept_holder = OntologyConceptHolder(ontology_custom_container_class(name='ontology_custom_container_concept',\n", + " namespace=main_ontology))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Access ontology concept classes and individuals\n", + "All ontology classes created on the fly purely inherit (without added non-semantic attributes) from `owlready2.Thing`, and so share the same namespace with the loaded ontology instance, `main_ontology`. They can then be accessible through that namespace by __main_ontology.__.\n", + "The same applies for individuals of those classes, accessible by __main_ontology.__" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.510281Z", + "start_time": "2024-04-12T11:49:13.497378Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911825.469569]: -------------------\n", + "[INFO] [1716911825.470353]: SOMA-HOME.CustomContainerConcept \n", + "[INFO] [1716911825.470808]: Super classes: [SOMA.DesignedContainer, owl.Thing]\n", + "[INFO] [1716911825.471233]: Ancestors: {SOMA-HOME.CustomContainerConcept, SOMA.DesignedContainer, DUL.DesignedArtifact, owl.Thing, DUL.PhysicalArtifact, DUL.Object, DUL.Entity, DUL.PhysicalObject}\n", + "[INFO] [1716911825.471560]: Subclasses: []\n", + "[INFO] [1716911825.471912]: Properties: []\n", + "[INFO] [1716911825.472469]: Instances: [SOMA-HOME.ontology_custom_container_concept]\n", + "[INFO] [1716911825.472774]: Direct Instances: [SOMA-HOME.ontology_custom_container_concept]\n", + "[INFO] [1716911825.473048]: Inverse Restrictions: []\n", + "custom_container_concept is SOMA-HOME.ontology_custom_container_concept: True\n" + ] + } + ], + "source": [ + "ontology_manager.print_ontology_class(main_ontology.CustomContainerConcept)\n", + "print(f\"custom_container_concept is {main_ontology.ontology_custom_container_concept}: {custom_container_concept_holder.ontology_concept is main_ontology.ontology_custom_container_concept}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "For ones already existing in the ontology, they can only be accessed through their corresponding ontology, eg: `soma` as follows" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.530908Z", + "start_time": "2024-04-12T11:49:13.510723Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911825.485656]: -------------------\n", + "[INFO] [1716911825.486269]: SOMA.Cup \n", + "[INFO] [1716911825.486616]: Super classes: [SOMA.Crockery, SOMA.hasPhysicalComponent.some(SOMA.DesignedHandle)]\n", + "[INFO] [1716911825.486935]: Ancestors: {SOMA.Crockery, SOMA.DesignedContainer, SOMA.Tableware, DUL.DesignedArtifact, SOMA.Cup, owl.Thing, DUL.PhysicalArtifact, SOMA.DesignedTool, DUL.Object, DUL.Entity, DUL.PhysicalObject}\n", + "[INFO] [1716911825.487221]: Subclasses: []\n", + "[INFO] [1716911825.487572]: Properties: [rdf-schema.isDefinedBy, rdf-schema.comment, SOMA.hasPhysicalComponent]\n", + "[INFO] [1716911825.488186]: Instances: []\n", + "[INFO] [1716911825.488503]: Direct Instances: []\n", + "[INFO] [1716911825.488776]: Inverse Restrictions: []\n" + ] + } + ], + "source": [ + "ontology_manager.print_ontology_class(soma.Cup)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Connect ontology class individuals with designators\n", + "After creating `custom_container_concept_holder` (wrapping `custom_container_concept` as an `owlready2.Thing`), we connect it to a designator (say `obj_designator`) by:\n", + "\n", + "- Appending to `obj_designator.ontology_concept_holders` with `custom_container_concept_holder`\n", + "\n", + "- Appending to `custom_container_concept_holder.designators` with `obj_designator`" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.544458Z", + "start_time": "2024-04-12T11:49:13.531362Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [], + "source": [ + "custom_container_designator = ObjectDesignatorDescription(names=[\"obj\"])\n", + "custom_container_designator.ontology_concept_holders.append(custom_container_concept_holder)\n", + "custom_container_concept_holder.designators.append(custom_container_designator)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "We can also automatize all the above setup with a single function call" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.555369Z", + "start_time": "2024-04-12T11:49:13.545449Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ontology concept: another_custom_container_concept of class SOMA-HOME.AnotherCustomContainerConcept\n", + "Designator: another_custom_container of type \n" + ] + } + ], + "source": [ + "another_custom_container_designator = ontology_manager.create_ontology_linked_designator(object_name=\"another_custom_container\",\n", + " designator_class=ObjectDesignatorDescription,\n", + " ontology_concept_name=\"AnotherCustomContainerConcept\",\n", + " ontology_parent_class=ontology_designed_container_class)\n", + "another_custom_container_concept = another_custom_container_designator.ontology_concept_holders[0].ontology_concept \n", + "print(f\"Ontology concept: {another_custom_container_concept.name} of class {type(another_custom_container_concept)}\")\n", + "another_custom_container_designator = OntologyConceptHolderStore().get_ontology_concept_holder_by_name(main_ontology.AnotherCustomContainerConcept.instances()[0].name).get_default_designator()\n", + "print(f\"Designator: {another_custom_container_designator.names[0]} of type {type(another_custom_container_designator)}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Create new ontology triple classes\n", + "\n", + "Concept classes of a triple, aka [__subject, predicate, object__], can be created dynamically. Here we will make an example creating ones for [__handheld objects__] and [__placeholder objects__], with a pair of predicate and inverse predicate signifying their mutual relation." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.565635Z", + "start_time": "2024-04-12T11:49:13.555823Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [], + "source": [ + "PLACEABLE_ON_PREDICATE_NAME = \"placeable_on\"\n", + "HOLD_OBJ_PREDICATE_NAME = \"hold_obj\"\n", + "ontology_manager.create_ontology_triple_classes(ontology_subject_parent_class=soma.DesignedContainer,\n", + " subject_class_name=\"OntologyPlaceHolderObject\",\n", + " ontology_object_parent_class=soma.Shape,\n", + " object_class_name=\"OntologyHandheldObject\",\n", + " predicate_name=PLACEABLE_ON_PREDICATE_NAME,\n", + " inverse_predicate_name=HOLD_OBJ_PREDICATE_NAME,\n", + " ontology_property_parent_class=soma.affordsBearer,\n", + " ontology_inverse_property_parent_class=soma.isBearerAffordedBy)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "There, we use `soma.DesignedContainer` & `soma.Shape`, existing concept in SOMA ontology, as the parent classes for the subject & object concepts respectively.\n", + "There is also a note that those classes will have the same namespace with `main_ontology`, so later on to be accessible through it.\n", + "\n", + "Then now we define some instances of the newly created triple classes, and link them to object designators, again using `ontology_manager.create_ontology_linked_designator()`" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.586549Z", + "start_time": "2024-04-12T11:49:13.566094Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [], + "source": [ + "def create_ontology_handheld_object_designator(object_name: str, ontology_parent_class: Type[owlready2.Thing]):\n", + " return ontology_manager.create_ontology_linked_designator(object_name=object_name,\n", + " designator_class=ObjectDesignatorDescription,\n", + " ontology_concept_name=f\"Onto{object_name}\",\n", + " ontology_parent_class=ontology_parent_class)\n", + "# Holdable Objects\n", + "cookie_box = create_ontology_handheld_object_designator(\"cookie_box\", main_ontology.OntologyHandheldObject)\n", + "egg = create_ontology_handheld_object_designator(\"egg\", main_ontology.OntologyHandheldObject)\n", + " \n", + "# Placeholder objects\n", + "placeholders = [create_ontology_handheld_object_designator(object_name, main_ontology.OntologyPlaceHolderObject)\n", + " for object_name in ['table', 'stool', 'shelf']]\n", + "\n", + "egg_tray = create_ontology_handheld_object_designator(\"egg_tray\", main_ontology.OntologyPlaceHolderObject)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "### Create ontology relations\n", + "\n", + "Now we will create ontology relations or predicates between __placeholder objects__ and __handheld objects__ with `ontology_manager.set_ontology_relation()`" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.596771Z", + "start_time": "2024-04-12T11:49:13.587021Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "for place_holder in placeholders:\n", + " ontology_manager.set_ontology_relation(subject_designator=cookie_box, object_designator=place_holder,\n", + " predicate_name=PLACEABLE_ON_PREDICATE_NAME)\n", + "\n", + "ontology_manager.set_ontology_relation(subject_designator=egg_tray, object_designator=egg,\n", + " predicate_name=HOLD_OBJ_PREDICATE_NAME)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Query designators based on their ontology-concept relations\n", + "\n", + "Now we can make queries for designators from designators, based on the relation among their corresponding ontology concepts setup above" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.611368Z", + "start_time": "2024-04-12T11:49:13.597218Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['cookie_box']'s placeholder candidates: [['table'], ['stool'], ['shelf']]\n", + "['egg']'s placeholder candidates: [['egg_tray']]\n", + "['table'] can hold: [['cookie_box']]\n", + "['stool'] can hold: [['cookie_box']]\n", + "['shelf'] can hold: [['cookie_box']]\n", + "['egg_tray'] can hold: [['egg']]\n" + ] + } + ], + "source": [ + "print(f\"{cookie_box.names}'s placeholder candidates:\",\n", + " f\"\"\"{[placeholder.names for placeholder in\n", + " ontology_manager.get_designators_by_subject_predicate(subject=cookie_box,\n", + " predicate_name=PLACEABLE_ON_PREDICATE_NAME)]}\"\"\")\n", + "\n", + "print(f\"{egg.names}'s placeholder candidates:\",\n", + " f\"\"\"{[placeholder.names for placeholder in\n", + " ontology_manager.get_designators_by_subject_predicate(subject=egg,\n", + " predicate_name=PLACEABLE_ON_PREDICATE_NAME)]}\"\"\")\n", + "\n", + "for place_holder in placeholders:\n", + " print(f\"{place_holder.names} can hold:\",\n", + " f\"\"\"{[placeholder.names for placeholder in\n", + " ontology_manager.get_designators_by_subject_predicate(subject=place_holder,\n", + " predicate_name=HOLD_OBJ_PREDICATE_NAME)]}\"\"\")\n", + "\n", + "print(f\"{egg_tray.names} can hold:\",\n", + " f\"\"\"{[placeholder.names for placeholder in\n", + " ontology_manager.get_designators_by_subject_predicate(subject=egg_tray,\n", + " predicate_name=HOLD_OBJ_PREDICATE_NAME)]}\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "# Practical examples\n", + "\n", + "## Example 1\n", + "How about creating ontology concept classes encapsulating `pycram.datastructures.enums.ObjectType`? We can do it by:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:13.621797Z", + "start_time": "2024-04-12T11:49:13.611864Z" + }, + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "GenericEdible object types:\n", + "SOMA-HOME.milk_concept [['milk']]\n", + "SOMA-HOME.breakfast_cereal_concept [['breakfast_cereal']]\n" + ] + } + ], + "source": [ + "from pycram.datastructures.enums import ObjectType\n", + "\n", + "# Create a generic ontology concept class for edible objects\n", + "generic_edible_class = ontology_manager.create_ontology_concept_class('GenericEdible')\n", + "\n", + "# Create a list of object designators sharing the same concept class as [generic_edible_class]\n", + "edible_obj_types = [ObjectType.MILK, ObjectType.BREAKFAST_CEREAL]\n", + "for object_type in ObjectType:\n", + " if object_type in edible_obj_types:\n", + " # Create a designator for the edible object\n", + " ontology_manager.create_ontology_object_designator_from_type(object_type, generic_edible_class)\n", + "\n", + "print(f'{generic_edible_class.name} object types:')\n", + "for edible_ontology_concept in generic_edible_class.direct_instances():\n", + " print(edible_ontology_concept,\n", + " [des.types for des in OntologyConceptHolderStore().get_ontology_concept_holder_by_name(edible_ontology_concept.name).designators])\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "## Example 2\n", + "We could also make use of relations between ontology concepts that designators are associated with, to enable more abstract inputs in robot motion plans.\n", + "\n", + "In a similar style to the scenario of __placeholder objects__ and __handheld objects__ above, but with a bit difference, we will ask the robot to query which content holders (eg. cup, pitcher, bowl) whereby a milk box could be pourable into.\n", + "\n", + "Basically, we will provide an ontology-based implementation for the query:\n", + " \n", + "`abstract_ontology_concept -> specific_objects_in_world?`\n", + "\n", + "To achieve it, we will create triple classes and configure a customized `resolve()` for the abstract concept, which returns its associated specific designators.\n", + "These designators are then used to again resolve for the target objects of interest, which become the inputs to a robot motion plan.\n", + "\n", + "### Setup simulated environment" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.414260Z", + "start_time": "2024-04-12T11:49:13.622258Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Scalar element defined multiple times: limit\n", + "Scalar element defined multiple times: limit\n" + ] + } + ], + "source": [ + "from pycram.worlds.bullet_world import BulletWorld, Object\n", + "from pycram.datastructures.pose import Pose\n", + "\n", + "from pycram.process_module import simulated_robot\n", + "from pycram.designators.action_designator import *\n", + "from pycram.designators.location_designator import *\n", + "\n", + "world = BulletWorld()\n", + "kitchen = Object(\"kitchen\", ObjectType.ENVIRONMENT, \"kitchen.urdf\")\n", + "pr2 = Object(\"pr2\", ObjectType.ROBOT, \"pr2.urdf\")\n", + "kitchen_designator = ObjectDesignatorDescription(names=[\"kitchen\"])\n", + "robot_designator = ObjectDesignatorDescription(names=[\"pr2\"]).resolve()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create PourableObject-LiquidHolder triple ontology classes" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.501124Z", + "start_time": "2024-04-12T11:49:14.415241Z" + } + }, + "outputs": [], + "source": [ + "POURABLE_INTO_PREDICATE_NAME = \"pourable_into\"\n", + "HOLD_LIQUID_PREDICATE_NAME = \"hold_liquid\"\n", + "ontology_manager.create_ontology_triple_classes(ontology_subject_parent_class=soma.DesignedContainer,\n", + " subject_class_name=\"OntologyLiquidHolderObject\",\n", + " ontology_object_parent_class=soma.Shape,\n", + " object_class_name=\"OntologyPourableObject\",\n", + " predicate_name=POURABLE_INTO_PREDICATE_NAME,\n", + " inverse_predicate_name=HOLD_LIQUID_PREDICATE_NAME,\n", + " ontology_property_parent_class=soma.affordsBearer,\n", + " ontology_inverse_property_parent_class=soma.isBearerAffordedBy)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Spawn a pourable object & liquid holders into the world and Create their designators" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.583818Z", + "start_time": "2024-04-12T11:49:14.501719Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Unknown tag \"rgba_color\" in /robot[@name='milk_object']/link[@name='milk_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='milk_object']/link[@name='milk_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='cup_object']/link[@name='cup_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='bowl_object']/link[@name='bowl_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='cup_object']/link[@name='cup_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='bowl_object']/link[@name='bowl_main']/visual[1]/material[@name='white']\n", + "Unknown tag \"rgba_color\" in /robot[@name='pitcher_object']/link[@name='pitcher_main']/visual[1]/material[@name='white']\n" + ] + } + ], + "source": [ + "# Holdable obj\n", + "milk_box = Object(\"milk_box\", ObjectType.MILK, \"milk.stl\")\n", + "milk_box_designator = create_ontology_handheld_object_designator(milk_box.name, main_ontology.OntologyPourableObject)\n", + "\n", + "# Liquid-holders\n", + "cup = Object(\"cup\", ObjectType.JEROEN_CUP, \"jeroen_cup.stl\", pose=Pose([1.4, 1, 0.9]))\n", + "bowl = Object(\"bowl\", ObjectType.BOWL, \"bowl.stl\", pose=Pose([1.4, 0.5, 0.9]))\n", + "pitcher = Object(\"pitcher\", ObjectType.GENERIC_OBJECT, \"Static_MilkPitcher.stl\", pose=Pose([1.4, 0, 0.9]))\n", + "milk_holders = [cup, bowl, pitcher]\n", + "milk_holder_designators = [create_ontology_handheld_object_designator(obj.name, main_ontology.OntologyLiquidHolderObject)\n", + " for obj in milk_holders]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create an ontology relation between the designators of the pourable object & its liquid holders" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.602526Z", + "start_time": "2024-04-12T11:49:14.584461Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Unknown tag \"rgba_color\" in /robot[@name='pitcher_object']/link[@name='pitcher_main']/visual[1]/material[@name='white']\n" + ] + } + ], + "source": [ + "for milk_holder_desig in milk_holder_designators:\n", + " ontology_manager.set_ontology_relation(subject_designator=milk_box_designator, object_designator=milk_holder_desig,\n", + " predicate_name=POURABLE_INTO_PREDICATE_NAME)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set up `resolve` for the ontology concept of the pourable object" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.612923Z", + "start_time": "2024-04-12T11:49:14.603173Z" + } + }, + "outputs": [], + "source": [ + "milk_box_concept_holder = milk_box_designator.ontology_concept_holders[0]\n", + "def milk_box_concept_resolve(): \n", + " object_designator = ontology_manager.get_designators_by_subject_predicate(subject=milk_box_designator, predicate_name=POURABLE_INTO_PREDICATE_NAME)[0]\n", + " return object_designator, object_designator.resolve()\n", + "\n", + "milk_box_concept_holder.resolve = milk_box_concept_resolve" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here, for demonstration purpose only, we specify the resolving result by `milk_box_concept_holder` as `cup`, the first-registered (default) pourable-into target milk holder, utilizing the ontology relation setup above.\n", + "\n", + "Now, we can query the milk box's target liquid holder by resolving `milk_box_concept_holder`" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:14.627982Z", + "start_time": "2024-04-12T11:49:14.613382Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Pickup target object: cup, a content holder for ['milk_box'] as in relation `pourable_into`\n" + ] + } + ], + "source": [ + "target_milk_holder_designator, target_milk_holder = milk_box_concept_holder.resolve()\n", + "print(f\"Pickup target object: {target_milk_holder.name}, a content holder for {milk_box_designator.names} as in relation `{POURABLE_INTO_PREDICATE_NAME}`\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Robot picks up the target liquid holder" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:23.283909Z", + "start_time": "2024-04-12T11:49:14.628600Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911828.077699]: Waiting for IK service: /pr2_left_arm_kinematics/get_ik\n", + "CostmapLocation.Location(pose=header: \n", + " seq: 0\n", + " stamp: \n", + " secs: 1716911828\n", + " nsecs: 58246850\n", + " frame_id: \"map\"\n", + "pose: \n", + " position: \n", + " x: 0.6399999999999999\n", + " y: 1.24\n", + " z: 0.0\n", + " orientation: \n", + " x: -0.0\n", + " y: 0.0\n", + " z: 0.15234391170286138\n", + " w: -0.988327543159185, reachable_arms=['left', 'right']) left\n" + ] + } + ], + "source": [ + "with simulated_robot:\n", + " ParkArmsAction([Arms.BOTH]).resolve().perform()\n", + "\n", + " MoveTorsoAction([0.3]).resolve().perform()\n", + "\n", + " pickup_pose = CostmapLocation(target=target_milk_holder, reachable_for=robot_designator).resolve()\n", + " pickup_arm = pickup_pose.reachable_arms[0]\n", + "\n", + " print(pickup_pose, pickup_arm)\n", + "\n", + " NavigateAction(target_locations=[pickup_pose.pose]).resolve().perform()\n", + "\n", + " PickUpAction(object_designator_description=target_milk_holder_designator, arms=[pickup_arm], grasps=[\"front\"]).resolve().perform()\n", + "\n", + " ParkArmsAction([Arms.BOTH]).resolve().perform()\n", + "\n", + " place_island = SemanticCostmapLocation(\"kitchen_island_surface\", kitchen_designator.resolve(), target_milk_holder_designator.resolve()).resolve()\n", + "\n", + " place_stand = CostmapLocation(place_island.pose, reachable_for=robot_designator, reachable_arm=pickup_arm).resolve()\n", + "\n", + " NavigateAction(target_locations=[place_stand.pose]).resolve().perform()\n", + "\n", + " PlaceAction(target_milk_holder_designator, target_locations=[place_island.pose], arms=[pickup_arm]).resolve().perform()\n", + "\n", + " ParkArmsAction([Arms.BOTH]).resolve().perform()\n", + "world.exit()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save ontologies to an OWL file\n", + "After all the above operations on our ontologies, we now can save them to an OWL file on disk" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T11:49:23.337691Z", + "start_time": "2024-04-12T11:49:23.286322Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] [1716911835.388276]: Ontologies have been saved to /home/ducthan/ontologies/NewSOMA-HOME.owl\n" + ] + }, + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ontology_manager.save(f\"{Path.home()}/ontologies/New{main_ontology.name}.owl\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Optimize ontology loading with SQLite3\n", + "Upon the initial ontology loading from OWL, an SQLite3 file is automatically created, acting as the quadstore cache for the loaded ontologies. This allows them to be __selectively__ reusable the next time being loaded.\n", + "More info can be referenced [here](https://owlready2.readthedocs.io/en/latest/world.html). " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pycram", + "language": "python", + "name": "pycram" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/requirements-ontology.txt b/requirements-ontology.txt new file mode 100644 index 000000000..f151f0854 --- /dev/null +++ b/requirements-ontology.txt @@ -0,0 +1,2 @@ +-r requirements.txt +owlready2>=0.45 diff --git a/src/pycram/designator.py b/src/pycram/designator.py index a08dd17bd..dcc8b7820 100644 --- a/src/pycram/designator.py +++ b/src/pycram/designator.py @@ -5,15 +5,21 @@ from abc import ABC, abstractmethod from inspect import isgenerator, isgeneratorfunction -from sqlalchemy.orm.session import Session import rospy +try: + import owlready2 +except ImportError: + owlready2 = None + rospy.logwarn("owlready2 is not installed!") + +from sqlalchemy.orm.session import Session from .datastructures.world import World from .world_concepts.world_object import Object as WorldObject from .utils import GeneratorList, bcolors from threading import Lock from time import time -from typing_extensions import List, Dict, Any, Optional, Union, Callable, Iterable +from typing_extensions import Type, List, Dict, Any, Optional, Union, get_type_hints, Callable, Iterable, TYPE_CHECKING from .local_transformer import LocalTransformer from .language import Language @@ -29,6 +35,9 @@ from .orm.base import RobotState, ProcessMetaData from .tasktree import with_tree +if TYPE_CHECKING: + from .ontology.ontology_common import OntologyConceptHolder + class DesignatorError(Exception): """Implementation of designator errors.""" @@ -319,15 +328,17 @@ class DesignatorDescription(ABC): :ivar resolve: The specialized_designators function to use for this designator, defaults to self.ground """ - def __init__(self, resolver: Optional[Callable] = None): + def __init__(self, resolver: Optional[Callable] = None, ontology_concept_holders: Optional[List[OntologyConceptHolder]] = None): """ Create a Designator description. :param resolver: The grounding method used for the description. The grounding method creates a location instance that matches the description. + :param ontology_concept_holders: A list of holders of ontology concepts that the designator is categorized as or associated with """ if resolver is None: self.resolve = self.ground + self.ontology_concept_holders = [] if ontology_concept_holders is None else ontology_concept_holders def make_dictionary(self, properties: List[str]): """ @@ -362,6 +373,11 @@ def get_slots(self) -> List[str]: def copy(self) -> DesignatorDescription: return self + def get_default_ontology_concept(self) -> owlready2.Thing | None: + """ + Returns the first element of ontology_concept_holders if there is, else None + """ + return self.ontology_concept_holders[0].ontology_concept if self.ontology_concept_holders else None class ActionDesignatorDescription(DesignatorDescription, Language): """ @@ -437,14 +453,37 @@ def insert(self, session: Session, *args, **kwargs) -> ORMAction: return action - def __init__(self, resolver=None): - super().__init__(resolver) + def __init__(self, resolver=None, ontology_concept_holders: Optional[List[OntologyConceptHolder]] = None): + """ + Base of all action designator descriptions. + + :param resolver: An alternative resolver that returns an action designator + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with + """ + super().__init__(resolver, ontology_concept_holders) Language.__init__(self) + from .ontology.ontology import OntologyManager + self.soma = OntologyManager().soma def ground(self) -> Action: """Fill all missing parameters and chose plan to execute. """ raise NotImplementedError(f"{type(self)}.ground() is not implemented.") + def init_ontology_concepts(self, ontology_concept_classes: Dict[str, Type[owlready2.Thing]]): + """ + Initialize the ontology concept holders for this action designator + + :param ontology_concept_classes: The ontology concept classes that the action is categorized as or associated with + :param ontology_concept_name: The name of the ontology concept instance to be created + """ + from .ontology.ontology_common import OntologyConceptHolderStore, OntologyConceptHolder + if not self.ontology_concept_holders: + for concept_name, concept_class in ontology_concept_classes.items(): + if concept_class: + existing_holders = OntologyConceptHolderStore().get_ontology_concept_holders_by_class(concept_class) + self.ontology_concept_holders.extend(existing_holders if existing_holders \ + else [OntologyConceptHolder(concept_class(concept_name))]) + def __iter__(self): """ Iterate through all possible performables fitting this description @@ -470,8 +509,8 @@ class Location: The resolved pose of the location designator. Pose is inherited by all location designator. """ - def __init__(self, resolver=None): - super().__init__(resolver) + def __init__(self, resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): + super().__init__(resolver, ontology_concept_holders) def ground(self) -> Location: """ @@ -617,15 +656,16 @@ def special_knowledge_adjustment_pose(self, grasp: str, pose: Pose) -> Pose: return pose def __init__(self, names: Optional[List[str]] = None, types: Optional[List[ObjectType]] = None, - resolver: Optional[Callable] = None): + resolver: Optional[Callable] = None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Base of all object designator descriptions. Every object designator has the name and type of the object. :param names: A list of names that could describe the object :param types: A list of types that could represent the object :param resolver: An alternative specialized_designators that returns an object designator for the list of names and types + :param ontology_concept_holders: A list of ontology concepts that the object is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.types: Optional[List[ObjectType]] = types self.names: Optional[List[str]] = names diff --git a/src/pycram/designators/action_designator.py b/src/pycram/designators/action_designator.py index b9ee04fa2..371d7f212 100644 --- a/src/pycram/designators/action_designator.py +++ b/src/pycram/designators/action_designator.py @@ -1,6 +1,12 @@ import itertools -from typing_extensions import List, Union, Callable -from typing_extensions import Any, Union +from typing_extensions import Any, List, Union, Callable, Optional + +import rospy +try: + from owlready2 import * +except ImportError: + owlready2 = None + rospy.logwarn("owlready2 is not installed!") from .object_designator import ObjectDesignatorDescription, BelieveObject, ObjectPart from ..datastructures.enums import Arms @@ -13,6 +19,7 @@ CloseActionPerformable, GraspingActionPerformable, ReleaseActionPerformable) from ..datastructures.pose import Pose +from ..ontology.ontology import OntologyConceptHolder class MoveTorsoAction(ActionDesignatorDescription): @@ -20,16 +27,21 @@ class MoveTorsoAction(ActionDesignatorDescription): Action Designator for Moving the torso of the robot up and down """ - def __init__(self, positions: List[float], resolver=None): + def __init__(self, positions: List[float], resolver=None, + ontology_concept_holders: Optional[List[OntologyConceptHolder]] = None): """ Create a designator description to move the torso of the robot up and down. :param positions: List of possible positions of the robots torso, possible position is a float of height in metres :param resolver: An optional specialized_designators that returns a performable designator for a designator description. + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.positions: List[float] = positions + if self.soma: + self.init_ontology_concepts({"move_torso": self.soma.MoveTorso}) + def ground(self) -> MoveTorsoActionPerformable: """ Creates a performable action designator with the first element from the list of possible torso heights. @@ -53,18 +65,23 @@ class SetGripperAction(ActionDesignatorDescription): Set the gripper state of the robot """ - def __init__(self, grippers: List[str], motions: List[str], resolver=None): + def __init__(self, grippers: List[str], motions: List[str], resolver=None, + ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Sets the gripper state, the desired state is given with the motion. Motion can either be 'open' or 'close'. :param grippers: A list of possible grippers :param motions: A list of possible motions :param resolver: An alternative specialized_designators that returns a performable designator for a designator description + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.grippers: List[str] = grippers self.motions: List[str] = motions + if self.soma: + self.init_ontology_concepts({"setting_gripper": self.soma.SettingGripper}) + def ground(self) -> SetGripperActionPerformable: """ Default specialized_designators that returns a performable designator with the first element in the grippers and motions list. @@ -91,11 +108,14 @@ class ReleaseAction(ActionDesignatorDescription): """ def __init__(self, grippers: List[str], object_designator_description: ObjectDesignatorDescription, - resolver=None): - super().__init__(resolver) + resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): + super().__init__(resolver, ontology_concept_holders) self.grippers: List[str] = grippers self.object_designator_description = object_designator_description + if self.soma: + self.init_ontology_concepts({"releasing": self.soma.Releasing}) + def ground(self) -> ReleaseActionPerformable: return ReleaseActionPerformable(self.grippers[0], self.object_designator_description.ground()) @@ -112,31 +132,37 @@ class GripAction(ActionDesignatorDescription): """ def __init__(self, grippers: List[str], object_designator_description: ObjectDesignatorDescription, - efforts: List[float], resolver=None): - super().__init__(resolver) + efforts: List[float], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): + super().__init__(resolver, ontology_concept_holders) self.grippers: List[str] = grippers self.object_designator_description: ObjectDesignatorDescription = object_designator_description self.efforts: List[float] = efforts + if self.soma: + self.init_ontology_concepts({"holding": self.soma.Holding}) + def ground(self) -> GripActionPerformable: return GripActionPerformable(self.grippers[0], self.object_designator_description.ground(), self.efforts[0]) - class ParkArmsAction(ActionDesignatorDescription): """ Park the arms of the robot. """ - def __init__(self, arms: List[Arms], resolver=None): + def __init__(self, arms: List[Arms], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Moves the arms in the pre-defined parking position. Arms are taken from pycram.enum.Arms :param arms: A list of possible arms, that could be used :param resolver: An optional specialized_designators that returns a performable designator from the designator description + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.arms: List[Arms] = arms + if self.soma: + self.init_ontology_concepts({"parking_arms": self.soma.ParkingArms}) + def ground(self) -> ParkArmsActionPerformable: """ Default specialized_designators that returns a performable designator with the first element of the list of possible arms @@ -152,7 +178,7 @@ class PickUpAction(ActionDesignatorDescription): """ def __init__(self, object_designator_description: Union[ObjectDesignatorDescription, ObjectDesignatorDescription.Object], - arms: List[str], grasps: List[str], resolver=None): + arms: List[str], grasps: List[str], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Lets the robot pick up an object. The description needs an object designator describing the object that should be picked up, an arm that should be used as well as the grasp from which side the object should be picked up. @@ -161,13 +187,17 @@ def __init__(self, object_designator_description: Union[ObjectDesignatorDescrip :param arms: List of possible arms that could be used :param grasps: List of possible grasps for the object :param resolver: An optional specialized_designators that returns a performable designator with elements from the lists of possible paramter + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: Union[ ObjectDesignatorDescription, ObjectDesignatorDescription.Object] = object_designator_description self.arms: List[str] = arms self.grasps: List[str] = grasps + if self.soma: + self.init_ontology_concepts({"picking_up": self.soma.PickingUp}) + def ground(self) -> PickUpActionPerformable: """ Default specialized_designators, returns a performable designator with the first entries from the lists of possible parameter. @@ -190,7 +220,7 @@ class PlaceAction(ActionDesignatorDescription): def __init__(self, object_designator_description: Union[ObjectDesignatorDescription, ObjectDesignatorDescription.Object], target_locations: List[Pose], - arms: List[str], resolver=None): + arms: List[str], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Create an Action Description to place an object @@ -198,13 +228,17 @@ def __init__(self, :param target_locations: List of possible positions/orientations to place the object :param arms: List of possible arms to use :param resolver: Grounding method to resolve this designator + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: Union[ ObjectDesignatorDescription, ObjectDesignatorDescription.Object] = object_designator_description self.target_locations: List[Pose] = target_locations self.arms: List[str] = arms + if self.soma: + self.init_ontology_concepts({"placing": self.soma.Placing}) + def ground(self) -> PlaceActionPerformable: """ Default specialized_designators that returns a performable designator with the first entries from the list of possible entries. @@ -222,16 +256,20 @@ class NavigateAction(ActionDesignatorDescription): Navigates the Robot to a position. """ - def __init__(self, target_locations: List[Pose], resolver=None): + def __init__(self, target_locations: List[Pose], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Navigates the robot to a location. :param target_locations: A list of possible target locations for the navigation. :param resolver: An alternative specialized_designators that creates a performable designator from the list of possible parameter + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.target_locations: List[Pose] = target_locations + if self.soma: + self.init_ontology_concepts({"navigating": self.soma.Navigating}) + def ground(self) -> NavigateActionPerformable: """ Default specialized_designators that returns a performable designator with the first entry of possible target locations. @@ -249,7 +287,7 @@ class TransportAction(ActionDesignatorDescription): def __init__(self, object_designator_description: Union[ObjectDesignatorDescription, ObjectDesignatorDescription.Object], arms: List[str], - target_locations: List[Pose], resolver=None): + target_locations: List[Pose], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Designator representing a pick and place plan. @@ -257,13 +295,17 @@ def __init__(self, :param arms: A List of possible arms that could be used for transporting :param target_locations: A list of possible target locations for the object to be placed :param resolver: An alternative specialized_designators that returns a performable designator for the list of possible parameter + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: Union[ ObjectDesignatorDescription, ObjectDesignatorDescription.Object] = object_designator_description self.arms: List[str] = arms self.target_locations: List[Pose] = target_locations + if self.soma: + self.init_ontology_concepts({"transporting": self.soma.Transporting}) + def ground(self) -> TransportActionPerformable: """ Default specialized_designators that returns a performable designator with the first entries from the lists of possible parameter. @@ -282,16 +324,20 @@ class LookAtAction(ActionDesignatorDescription): Lets the robot look at a position. """ - def __init__(self, targets: List[Pose], resolver=None): + def __init__(self, targets: List[Pose], resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Moves the head of the robot such that it points towards the given target location. :param targets: A list of possible locations to look at :param resolver: An alternative specialized_designators that returns a performable designator for a list of possible target locations + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.targets: List[Pose] = targets + if self.soma: + self.init_ontology_concepts({"looking_at": self.soma.LookingAt}) + def ground(self) -> LookAtActionPerformable: """ Default specialized_designators that returns a performable designator with the first entry in the list of possible targets @@ -306,16 +352,22 @@ class DetectAction(ActionDesignatorDescription): Detects an object that fits the object description and returns an object designator describing the object. """ - def __init__(self, object_designator_description: ObjectDesignatorDescription, resolver=None): + def __init__(self, object_designator_description: ObjectDesignatorDescription, resolver=None, + ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Tries to detect an object in the field of view (FOV) of the robot. :param object_designator_description: Object designator describing the object :param resolver: An alternative specialized_designators + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: ObjectDesignatorDescription = object_designator_description + if self.soma: + self.init_ontology_concepts({"looking_for": self.soma.LookingFor, + "checking_object_presence": self.soma.CheckingObjectPresence}) + def ground(self) -> DetectActionPerformable: """ Default specialized_designators that returns a performable designator with the resolved object description. @@ -332,18 +384,23 @@ class OpenAction(ActionDesignatorDescription): Can currently not be used """ - def __init__(self, object_designator_description: ObjectPart, arms: List[str], resolver=None): + def __init__(self, object_designator_description: ObjectPart, arms: List[str], resolver=None, + ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Moves the arm of the robot to open a container. :param object_designator_description: Object designator describing the handle that should be used to open :param arms: A list of possible arms that should be used :param resolver: A alternative specialized_designators that returns a performable designator for the lists of possible parameter. + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: ObjectPart = object_designator_description self.arms: List[str] = arms + if self.soma: + self.init_ontology_concepts({"opening": self.soma.Opening}) + def ground(self) -> OpenActionPerformable: """ Default specialized_designators that returns a performable designator with the resolved object description and the first entries @@ -362,18 +419,22 @@ class CloseAction(ActionDesignatorDescription): """ def __init__(self, object_designator_description: ObjectPart, arms: List[str], - resolver=None): + resolver=None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Attempts to close an open container :param object_designator_description: Object designator description of the handle that should be used :param arms: A list of possible arms to use :param resolver: An alternative specialized_designators that returns a performable designator for the list of possible parameter + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.object_designator_description: ObjectPart = object_designator_description self.arms: List[str] = arms + if self.soma: + self.init_ontology_concepts({"closing": self.soma.Closing}) + def ground(self) -> CloseActionPerformable: """ Default specialized_designators that returns a performable designator with the resolved object designator and the first entry from @@ -390,7 +451,7 @@ class GraspingAction(ActionDesignatorDescription): """ def __init__(self, arms: List[str], object_description: Union[ObjectDesignatorDescription, ObjectPart], - resolver: Callable = None): + resolver: Callable = None, ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Will try to grasp the object described by the given description. Grasping is done by moving into a pre grasp position 10 cm before the object, opening the gripper, moving to the object and then closing the gripper. @@ -398,11 +459,15 @@ def __init__(self, arms: List[str], object_description: Union[ObjectDesignatorDe :param arms: List of Arms that should be used for grasping :param object_description: Description of the object that should be grasped :param resolver: An alternative specialized_designators to get a specified designator from the designator description + :param ontology_concept_holders: A list of ontology concepts that the action is categorized as or associated with """ - super().__init__(resolver) + super().__init__(resolver, ontology_concept_holders) self.arms: List[str] = arms self.object_description: ObjectDesignatorDescription = object_description + if self.soma: + self.init_ontology_concepts({"grasping": self.soma.Grasping}) + def ground(self) -> GraspingActionPerformable: """ Default specialized_designators that takes the first element from the list of arms and the first solution for the object diff --git a/src/pycram/designators/object_designator.py b/src/pycram/designators/object_designator.py index 7c0114cc9..85d090499 100644 --- a/src/pycram/designators/object_designator.py +++ b/src/pycram/designators/object_designator.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import dataclasses -from typing_extensions import List, Optional, Callable +from typing_extensions import List, Optional, Callable, TYPE_CHECKING import sqlalchemy.orm from ..datastructures.world import World from ..world_concepts.world_object import Object as WorldObject @@ -9,6 +11,8 @@ from ..datastructures.pose import Pose from ..external_interfaces.robokudo import query +if TYPE_CHECKING: + import owlready2 class BelieveObject(ObjectDesignatorDescription): """ @@ -68,6 +72,7 @@ def __init__(self, names: List[str], :param part_of: Parent object of which the part should be described :param type: Type of the part :param resolver: An alternative specialized_designators to resolve the input parameter to an object designator + :param ontology_concept_holders: A list of ontology concepts that the object part is categorized as or associated with """ super().__init__(names, type, resolver) @@ -116,7 +121,8 @@ class Object(ObjectDesignatorDescription.Object): """ def __init__(self, names: List[str], types: List[str], - reference_frames: List[str], timestamps: List[float], resolver: Optional[Callable] = None): + reference_frames: List[str], timestamps: List[float], resolver: Optional[Callable] = None, + ontology_concept_holders: Optional[List[owlready2.Thing]] = None): """ Describing an object resolved through knowrob. @@ -125,8 +131,9 @@ def __init__(self, names: List[str], types: List[str], :param reference_frames: Frame of reference in which the object position should be :param timestamps: Timestamps for which positions should be returned :param resolver: An alternative specialized_designators that resolves the input parameter to an object designator. + :param ontology_concept_holders: A list of ontology concepts that the object is categorized as """ - super(LocatedObject, self).__init__(names, types, resolver) + super(LocatedObject, self).__init__(names, types, resolver, ontology_concept_holders) self.reference_frames: List[str] = reference_frames self.timestamps: List[float] = timestamps diff --git a/src/pycram/helper.py b/src/pycram/helper.py index e69de29bb..73cf77dbc 100644 --- a/src/pycram/helper.py +++ b/src/pycram/helper.py @@ -0,0 +1,19 @@ +"""Implementation of helper functions and classes for internal usage only. + +Classes: +Singleton -- implementation of singleton metaclass +""" +class Singleton(type): + """ + Metaclass for singletons + """ + + _instances = {} + """ + Dictionary of singleton child classes inheriting from this metaclass, keyed by child class objects. + """ + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] \ No newline at end of file diff --git a/src/pycram/ontology/__init__.py b/src/pycram/ontology/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pycram/ontology/ontology.py b/src/pycram/ontology/ontology.py new file mode 100644 index 000000000..2fe6d8c4a --- /dev/null +++ b/src/pycram/ontology/ontology.py @@ -0,0 +1,506 @@ +from __future__ import annotations + +import inspect +import logging +from pathlib import Path +from typing import Callable, Dict, List, Optional, Type + +import rospy + +try: + from owlready2 import * +except ImportError: + owlready2 = None + rospy.logwarn("Could not import owlready2, OntologyManager could not be initialized!") + +from ..datastructures.enums import ObjectType +from ..helper import Singleton +from ..designator import DesignatorDescription, ObjectDesignatorDescription + +from ..ontology.ontology_common import OntologyConceptHolderStore, OntologyConceptHolder + +SOMA_HOME_ONTOLOGY_IRI = "http://www.ease-crc.org/ont/SOMA-HOME.owl" +SOMA_ONTOLOGY_IRI = "http://www.ease-crc.org/ont/SOMA.owl" +SOMA_ONTOLOGY_NAMESPACE = "SOMA" +DUL_ONTOLOGY_NAMESPACE = "DUL" + + +class OntologyManager(object, metaclass=Singleton): + """ + Singleton class as the adapter accessing data of an OWL ontology, largely based on owlready2. + """ + + def __init__(self, main_ontology_iri: str = "", ontology_search_path: str = ""): + """ + Create the singleton object of OntologyManager class + + :param main_ontology_iri: Ontology IRI (Internationalized Resource Identifier), either a URL to a remote OWL file or the full name path of a local one + :param ontology_search_path: directory path from which a possibly existing ontology is searched. This is appended to `owlready2.onto_path`, a global variable containing a list of directories for searching local copies of ontologies (similarly to python `sys.path` for modules/packages). If not specified, the path is "$HOME/ontologies" + """ + if owlready2: + if not ontology_search_path: + ontology_search_path = f"{Path.home()}/ontologies" + Path(ontology_search_path).mkdir(parents=True, exist_ok=True) + owlready2.onto_path.append(ontology_search_path) + else: + return + + #: A dictionary of OWL ontologies, keyed by ontology name (same as its namespace name), eg. 'SOMA' + self.ontologies: Dict[str, owlready2.Ontology] = {} + + #: The main ontology instance as the result of an ontology loading operation + self.main_ontology: Optional[owlready2.Ontology] = None + + #: The SOMA ontology instance, referencing :attr:`ontology` in case of ontology loading from `SOMA.owl`. Ref: http://www.ease-crc.org/ont/SOMA.owl + self.soma: Optional[owlready2.Ontology] = None + + #: The DUL ontology instance, referencing :attr:`ontology` in case of ontology loading from `DUL.owl`. Ref: http://www.ease-crc.org/ont/DUL.owl + self.dul: Optional[owlready2.Ontology] = None + + #: Ontology world, the placeholder of triples stored by owlready2. Ref: https://owlready2.readthedocs.io/en/latest/world.html + self.ontology_world: Optional[owlready2.World] = None + + #: Ontology IRI (Internationalized Resource Identifier), either a URL to a remote OWL file or the full name path of a local one + self.main_ontology_iri: str = main_ontology_iri + + #: Namespace of the main ontology + self.main_ontology_namespace: Optional[owlready2.Namespace] = None + + # Create an ontology world with parallelized file parsing enabled + self.ontology_world = World(filename=f"{ontology_search_path}/{Path(main_ontology_iri).stem}.sqlite3", + exclusive=False, enable_thread_parallelism=True) + + self.main_ontology, self.main_ontology_namespace = self.load_ontology(main_ontology_iri) + if self.main_ontology.loaded: + self.soma = self.ontologies.get(SOMA_ONTOLOGY_NAMESPACE) + self.dul = self.ontologies.get(DUL_ONTOLOGY_NAMESPACE) + + @staticmethod + def print_ontology_class(ontology_class: Type[owlready2.Thing]): + """ + Print information (ancestors, super classes, subclasses, properties, etc.) of an ontology class + + :param ontology_class: An ontology class + """ + if ontology_class is None: + return + rospy.loginfo("-------------------") + rospy.loginfo(f"{ontology_class} {type(ontology_class)}") + rospy.loginfo(f"Super classes: {ontology_class.is_a}") + rospy.loginfo(f"Ancestors: {ontology_class.ancestors()}") + rospy.loginfo(f"Subclasses: {list(ontology_class.subclasses())}") + rospy.loginfo(f"Properties: {list(ontology_class.get_class_properties())}") + rospy.loginfo(f"Instances: {list(ontology_class.instances())}") + rospy.loginfo(f"Direct Instances: {list(ontology_class.direct_instances())}") + rospy.loginfo(f"Inverse Restrictions: {list(ontology_class.inverse_restrictions())}") + + def load_ontology(self, ontology_iri: str) -> tuple[owlready2.Ontology, owlready2.Namespace]: + """ + Load an ontology from an IRI + + :param ontology_iri: An ontology IRI + :return: A tuple including an ontology instance & its namespace + """ + ontology = self.ontology_world.get_ontology(ontology_iri).load(reload_if_newer=True) + ontology_namespace = owlready2.get_namespace(ontology_iri) + if ontology.loaded: + rospy.loginfo( + f'Ontology [{ontology.base_iri}]\'s name: {ontology.name} has been loaded') + rospy.loginfo(f'- main namespace: {ontology_namespace.name}') + rospy.loginfo(f'- loaded ontologies:') + + def fetch_ontology(ontology__): + self.ontologies[ontology__.name] = ontology__ + rospy.loginfo(ontology__.base_iri) + + self.browse_ontologies(ontology, condition=None, func=lambda ontology__: fetch_ontology(ontology__)) + else: + rospy.logerr(f"Ontology [{ontology.base_iri}]\'s name: {ontology.name} failed being loaded") + return ontology, ontology_namespace + + def initialized(self) -> bool: + """ + Check if the main ontology has been loaded + + :return: True if loaded, otherwise False + """ + return hasattr(self, "main_ontology") and self.main_ontology.loaded + + @staticmethod + def browse_ontologies(ontology: owlready2.Ontology, + condition: Optional[Callable] = None, func: Optional[Callable] = None, **kwargs) -> None: + """ + Browse the loaded ontologies (including the main and imported ones), doing operations based on a condition. + + :param ontology: An ontology instance as the result of ontology loading + :param condition: a Callable condition that if not None needs to be passed before doing operations, otherwise just always carry the operations + :param func: a Callable specifying the operations to perform on all the loaded ontologies if condition is None, otherwise only the first ontology which meets the condition + """ + if ontology is None: + rospy.logerr(f"Ontology {ontology=} is None!") + return + elif not ontology.loaded: + rospy.logerr(f"Ontology {ontology} was not loaded!") + return + + will_do_func = func is not None + # No condition: Do func for all ontologies + if condition is None: + if will_do_func: + func(ontology, **kwargs) + for sub_onto in ontology.get_imported_ontologies(): + func(sub_onto, **kwargs) + # Else: Only do func for the first ontology which meets the condition + elif condition(ontology, **kwargs): + if will_do_func: func(ontology, **kwargs) + else: + for sub_onto in ontology.get_imported_ontologies(): + if condition(sub_onto, **kwargs) and will_do_func: + func(sub_onto, **kwargs) + break + + def save(self, target_filename: str = "", overwrite: bool = False) -> bool: + """ + Save the current ontology to disk + + :param target_filename: full name path of a file which the ontologies are saved into. + :param overwrite: overwrite an existing file if it exists. If empty, they are saved to the same original OWL file from which the main ontology was loaded, or a file at the same folder with ontology search path specified at constructor if it was loaded from a remote IRI. + :return: True if the ontology was successfully saved, False otherwise + """ + + # Commit the whole graph data of the current ontology world, saving it into SQLite3, to be reused the next time + # the ontologies are loaded + self.ontology_world.save() + + # Save ontologies to OWL + is_current_ontology_local = Path(self.main_ontology_iri).exists() + current_ontology_filename = self.main_ontology_iri if is_current_ontology_local \ + else f"{Path(self.ontology_world.filename).parent.absolute()}/{Path(self.main_ontology_iri).stem}.owl" + save_to_same_file = is_current_ontology_local and (target_filename == current_ontology_filename) + if save_to_same_file and not overwrite: + rospy.logerr(f"Ontologies cannot be saved to the originally loaded [{target_filename}] if not by overwriting") + return False + else: + save_filename = target_filename if target_filename else current_ontology_filename + self.main_ontology.save(save_filename) + if save_to_same_file and overwrite: + rospy.logwarn(f"Ontologies have been overwritten to {save_filename}") + else: + rospy.loginfo(f"Ontologies have been saved to {save_filename}") + return True + + def create_ontology_concept_class(self, class_name: str, + ontology_parent_concept_class: Optional[owlready2.Thing] = None) \ + -> Type[owlready2.Thing]: + """ + Create a new concept class in ontology + + :param class_name: A given name to the new class + :param ontology_parent_concept_class: An optional parent ontology class of the new class + :return: The created ontology class + """ + ontology_concept_class = self.get_ontology_class_by_ontology(self.main_ontology, class_name) + if ontology_concept_class: + return ontology_concept_class + + with self.main_ontology: + return types.new_class(class_name, (owlready2.Thing, ontology_parent_concept_class,) + if inspect.isclass(ontology_parent_concept_class) else (owlready2.Thing,)) + + @staticmethod + def create_ontology_property_class(class_name: str, + ontology_parent_property_class: Optional[Type[owlready2.Property]] = None) \ + -> Optional[Type[owlready2.Property]]: + """ + Create a new property class in ontology + + :param class_name: A given name to the new class + :param ontology_parent_property_class: An optional parent ontology property class of the new class + :return: The created ontology class + """ + parent_class = ontology_parent_property_class if (ontology_parent_property_class and + issubclass(ontology_parent_property_class, + owlready2.Property)) \ + else None + return types.new_class(class_name, (parent_class,) if parent_class else (owlready2.Property,)) + + def get_ontology_classes_by_condition(self, condition: Callable, first_match_only=False, **kwargs) \ + -> List[Type[owlready2.Thing]]: + """ + Get an ontology class by a given condition + + :param condition: condition of searching + :param first_match_only: whether to only fetch the first class matching the given condition + :return: The ontology class satisfying the given condition if found else None + """ + out_classes = [] + for ontology_class in list(self.main_ontology.classes()): + if condition(ontology_class, **kwargs): + out_classes.append(ontology_class) + if first_match_only: + return out_classes + + for sub_onto in self.main_ontology.get_imported_ontologies(): + for sub_ontology_class in list(sub_onto.classes()): + if condition(sub_ontology_class, **kwargs): + out_classes.append(sub_ontology_class) + if first_match_only: + return out_classes + + if not out_classes: + rospy.loginfo(f"No class with {kwargs} is found in the ontology {self.main_ontology}") + return out_classes + + @staticmethod + def get_ontology_class_by_ontology(ontology: owlready2.Ontology, class_name: str) -> Optional[Type[owlready2.Thing]]: + """ + Get an ontology class if it exists in a given ontology + + :param ontology: an ontology instance + :return: The ontology class if it exists under the namespace of the given ontology, None otherwise + """ + return getattr(ontology, class_name) if ontology and hasattr(ontology, class_name) else None + + def get_ontology_class(self, class_name: str) -> Optional[Type[owlready2.Thing]]: + """ + Get an ontology class by name + + :param class_name: name of the searched-for ontology class + :return: The ontology class of the given name if existing else None + """ + + def is_matching_class_name(ontology_class: Type[owlready2.Thing], ontology_class_name: str): + return ontology_class.name == ontology_class_name + + found_classes = self.get_ontology_classes_by_condition(condition=is_matching_class_name, + ontology_class_name=class_name, + first_match_only=True) + return found_classes[0] if len(found_classes) > 0 else None + + def get_ontology_classes_by_namespace(self, ontology_namespace: str) -> List[ + Type[owlready2.Thing]]: + """ + Get all ontologies classes by namespace + + :param ontology_namespace: namespace of the searched-for ontology classes + :return: A list of the ontology classes under the given namespace + """ + + def is_matching_ontology_namespace(ontology_class: Type[owlready2.Thing], ontology_namespace_: str): + return ontology_class.namespace.name == ontology_namespace_ + + return self.get_ontology_classes_by_condition(condition=is_matching_ontology_namespace, + ontology_namespace_=ontology_namespace) + + def get_ontology_classes_by_subname(self, class_subname: str) -> List[Type[owlready2.Thing]]: + """ + Get all ontologies classes by subname + + :param class_subname: a string as part of the full names of the searched-for ontology classes + :return: A list of the ontology classes of which the name contains the given subname + """ + + def is_matching_class_subname(ontology_class: Type[owlready2.Thing], ontology_class_subname: str): + return ontology_class_subname.lower() in ontology_class.name.lower() + + return self.get_ontology_classes_by_condition(condition=is_matching_class_subname, + ontology_class_subname=class_subname) + + def get_ontology_descendant_classes(self, ancestor_class: Type[owlready2.Thing], class_subname: str = "") \ + -> List[Type[owlready2.Thing]]: + """ + Get ontology descendant classes of an ancestor class given descendant class subname + + :param class_subname: a string as part of the ancestor class full name + :return: A list of the ontology descendant classes + """ + return [ontology_class for ontology_class in self.main_ontology.classes() + if (class_subname.lower() in ontology_class.name.lower()) and + (ancestor_class in ontology_class.ancestors())] + + def create_ontology_triple_classes(self, subject_class_name: str, object_class_name: str, + predicate_name: str, inverse_predicate_name: str, + ontology_subject_parent_class: Optional[Type[owlready2.Thing]] = None, + ontology_object_parent_class: Optional[Type[owlready2.Thing]] = None, + ontology_property_parent_class: Optional[Type[ + owlready2.Property]] = owlready2.ObjectProperty if owlready2 else None, + ontology_inverse_property_parent_class: Optional[Type[ + owlready2.Property]] = owlready2.ObjectProperty if owlready2 else None) -> None: + """ + Dynamically create ontology triple classes under same namespace with the main ontology, + as known as {subject, predicate, object}, with the relations among them + + :param subject_class_name: name of the subject class + :param object_class_name: name of the object class + :param predicate_name: name of predicate class, also used as a Python attribute of the subject class to query object instances + :param inverse_predicate_name: name of inverse predicate + :param ontology_subject_parent_class: a parent class of the subject class + :param ontology_object_parent_class: a parent class of the object class + :param ontology_property_parent_class: a parent ontology property class, default: owlready2.ObjectProperty + :param ontology_inverse_property_parent_class: a parent ontology inverse property class, default: owlready2.ObjectProperty + """ + + # This context manager ensures all classes created here-in share the same namepsace with `self.main_ontology` + with self.main_ontology: + # Subject + ontology_subject_class = self.create_ontology_concept_class(subject_class_name, + ontology_subject_parent_class) + + # Object + ontology_object_class = self.create_ontology_concept_class(object_class_name, ontology_object_parent_class) + + # Predicate + ontology_predicate_class = self.create_ontology_property_class("OntologyPredicate", + ontology_property_parent_class) + ontology_predicate_class.domain = [ontology_subject_class] + ontology_predicate_class.range = [ontology_object_class] + ontology_predicate_class.python_name = predicate_name + + # Inverse Predicate + ontology_inverse_predicate = self.create_ontology_property_class("OntologyInversePredicate", + ontology_inverse_property_parent_class) + ontology_inverse_predicate.inverse_property = ontology_predicate_class + ontology_inverse_predicate.python_name = inverse_predicate_name + + def create_ontology_linked_designator(self, designator_class: Type[DesignatorDescription], + ontology_concept_name: str, + object_name: Optional[str] = "", + ontology_parent_class: Optional[Type[owlready2.Thing]] = None) \ + -> Optional[DesignatorDescription]: + """ + Create a designator linked to a given ontology concept + + :param designator_class: A given designator class + :param ontology_concept_name: Ontology concept name + :param object_name: Name of object in case of the designator to be created is an Object Designator + :param ontology_parent_class: Parent ontology class from which the class of designator inherits + :return: A designator associated with an ontology concept + """ + ontology_concept_class = self.create_ontology_concept_class(ontology_concept_name, ontology_parent_class) + return self.create_ontology_linked_designator_by_concept(designator_class=designator_class, + ontology_concept_class=ontology_concept_class, + object_name=object_name) + + def create_ontology_linked_designator_by_concept(self, designator_class: Type[DesignatorDescription], + ontology_concept_class: Type[owlready2.Thing], + object_name: Optional[str] = "") \ + -> Optional[DesignatorDescription]: + """ + Create a designator that belongs to a given ontology concept class + + :param designator_class: A given designator class + :param ontology_concept_class: An ontology concept class with which the output designator is associated + :param object_name: Name of object in case of the designator to be created is an Object Designator + :return: An object designator associated with the given ontology concept class if created successfully (not already exists), None otherwise + """ + ontology_concept_name = f'{object_name}_concept' + if len(OntologyConceptHolderStore().get_designators_of_ontology_concept(ontology_concept_name)) > 0: + rospy.logerr(f"A designator named [{object_name}] is already created for ontology concept [{ontology_concept_name}]") + return None + + # Create a designator of `designator_class` + is_object_designator = issubclass(designator_class, ObjectDesignatorDescription) + if is_object_designator: + if not object_name: + rospy.logerr( + f"An empty object name was given as creating its Object designator for ontology concept class [{ontology_concept_class.name}]") + return None + designator = designator_class(names=[object_name]) + else: + designator = designator_class() + + # Link designator with an ontology concept of `ontology_concept_class` + ontology_concept_holder = OntologyConceptHolderStore().get_ontology_concept_holder_by_name(ontology_concept_name) + if ontology_concept_holder is None: + ontology_concept_holder = OntologyConceptHolder(ontology_concept_class(name=ontology_concept_name, + namespace=self.main_ontology)) + self.set_ontology_concept_designator_connection(designator, ontology_concept_holder) + return designator + + @staticmethod + def set_ontology_concept_designator_connection(designator: DesignatorDescription, + ontology_concept_holder: OntologyConceptHolder) -> None: + """ + Set two-way connection between a designator and an ontology concept + + :param designator: Designator + :param ontology_concept_holder: Ontology concept holder + """ + if ontology_concept_holder not in designator.ontology_concept_holders: + designator.ontology_concept_holders.append(ontology_concept_holder) + + if not ontology_concept_holder.has_designator(designator): + ontology_concept_holder.designators.append(designator) + + @staticmethod + def set_ontology_relation(subject_designator: DesignatorDescription, + object_designator: DesignatorDescription, + predicate_name: str) -> bool: + """ + Set ontology relation between subject and object designators + + :param subject_designator: An object designator as the ontology subject + :param object_designator: An object designator as the ontology object + :param predicate_name: Name of the predicate + :return: True if the relation is set, False otherwise + """ + for subject_concept_holder in subject_designator.ontology_concept_holders: + subject_concept = subject_concept_holder.ontology_concept + if hasattr(subject_concept, predicate_name): + object_concepts_list = getattr(subject_concept, predicate_name) + object_concepts_names = [concept.name for concept in object_concepts_list] + for holder in object_designator.ontology_concept_holders: + if holder.ontology_concept.name not in object_concepts_names: + object_concepts_list.append(holder.ontology_concept) + return True + else: + rospy.logerr(f"Ontology concept [{subject_concept.name}] has no predicate named [{predicate_name}]") + return False + + @staticmethod + def get_designators_by_subject_predicate(subject: DesignatorDescription, + predicate_name: str) -> List[DesignatorDescription]: + """ + Get list of designators of an ontology-object concept given a subject designator and predicate + + :param subject: The ontology-subject designator + :param predicate_name: The ontology-predicate name of the relation + :return: List of object designators + """ + return list(itertools.chain( + *[OntologyConceptHolderStore().get_designators_of_ontology_concept(object_concept.name) + for subject_concept_holder in subject.ontology_concept_holders + for object_concept in getattr(subject_concept_holder.ontology_concept, predicate_name) + if hasattr(subject_concept_holder.ontology_concept, predicate_name)])) + + def create_ontology_object_designator_from_type(self, object_type: ObjectType, + ontology_concept_class: Type[owlready2.Thing]) \ + -> Optional[ObjectDesignatorDescription]: + """ + Create an object designator associated with an ontology concept class from a given object type + + :param object_type: An enumerated type of object + :param ontology_concept_class: An ontology concept class + :return: An object designator if created successfully (if not already existing), otherwise None + """ + object_type_name = object_type.name.lower() + object_designator = \ + self.create_ontology_linked_designator_by_concept(designator_class=ObjectDesignatorDescription, + ontology_concept_class=ontology_concept_class, + object_name=object_type_name) + object_designator.types = [object_type_name] + return object_designator + + @staticmethod + def destroy_ontology_class(ontology_class, destroy_instances: bool = True): + """ + Destroy all classes of an ontology + + :param ontology_class: The ontology class to be destroyed + :param destroy_instances: Whether to destroy instances of those ontology classes + """ + if destroy_instances: + for ontology_individual in ontology_class.instances(): + destroy_entity(ontology_individual) + OntologyConceptHolderStore().remove_ontology_concept(ontology_class.name) + destroy_entity(ontology_class) diff --git a/src/pycram/ontology/ontology_common.py b/src/pycram/ontology/ontology_common.py new file mode 100644 index 000000000..ebba6499b --- /dev/null +++ b/src/pycram/ontology/ontology_common.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +from typing import Callable, Dict, List, Optional, Type, TYPE_CHECKING +import rospy + +from ..helper import Singleton +if TYPE_CHECKING: + from ..designator import DesignatorDescription + +try: + from owlready2 import * +except ImportError: + owlready2 = None + rospy.logwarn("Could not import owlready2, OntologyConceptHolder will not be available!") + + +class OntologyConceptHolderStore(object, metaclass=Singleton): + """ + Singleton class storing all instances of `OntologyConceptHolder` + """ + + def __init__(self): + """ + Initialize the OntologyConceptHolderStore + """ + if owlready2 is None: + return + #: Dictionary of all ontology concept holders, keyed by concept names + self.__all_ontology_concept_holders: Dict[str, OntologyConceptHolder] = {} + + def add_ontology_concept_holder(self, ontology_concept_name: str, ontology_concept_holder: OntologyConceptHolder)\ + -> bool: + """ + Add an ontology concept to the store + + :param ontology_concept_name: Name of the ontology concept to be removed + :return: True if the ontology concept can be added into the concept store (if not already existing), otherwise False + """ + if ontology_concept_name in self.__all_ontology_concept_holders: + rospy.logerr(f"OntologyConceptHolder for `{ontology_concept_name}` was already created!") + return False + else: + self.__all_ontology_concept_holders.setdefault(ontology_concept_name, ontology_concept_holder) + return True + + def remove_ontology_concept(self, ontology_concept_name: str) -> bool: + """ + Remove an ontology concept from the store + + :param ontology_concept_name: Name of the ontology concept to be removed + :return: True if the ontology concept can be removed from the concept store (if existing), otherwise False + """ + if ontology_concept_name in self.__all_ontology_concept_holders: + del self.__all_ontology_concept_holders[ontology_concept_name] + return True + return False + + def get_ontology_concepts_by_class(self, ontology_concept_class: Type[owlready2.Thing]) -> List[owlready2.Thing]: + """ + Get a list of ontology concepts for a given class + + :param ontology_concept_class: An ontology concept class + :return: A list of ontology concepts of which the type is either the given class or its subclass + """ + return list(itertools.chain( + *[concept_holder.ontology_concept + for concept_holder in self.__all_ontology_concept_holders.values() + if owlready2.issubclass(concept_holder.ontology_concept, ontology_concept_class)])) + + def get_ontology_concept_by_name(self, ontology_concept_name: str) -> Optional[owlready2.Thing]: + """ + Get the ontology concept of a given name if exists, otherwise None + + :param ontology_concept_name: Name of an ontology concept + :return: The ontology concept of a given name if exists or None otherwise + """ + concept_holder = self.__all_ontology_concept_holders.get(ontology_concept_name) + return concept_holder.ontology_concept if concept_holder else None + + def get_ontology_concept_holders_by_class(self, ontology_concept_class: Type[owlready2.Thing]) \ + -> List[OntologyConceptHolder]: + """ + Get a list of ontology concept holders for a given ontology concept class + + :param ontology_concept_class: An ontology concept class + :return: A list of ontology concept holders as instances of a given ontology concept class + """ + return [concept_holder for concept_holder in self.__all_ontology_concept_holders.values() + if isinstance(concept_holder.ontology_concept, ontology_concept_class)] + + def get_ontology_concept_holder_by_name(self, ontology_concept_name: str) -> Optional[OntologyConceptHolder]: + """ + Get the ontology concept holder for one of a given name if exists, otherwise None + + :param ontology_concept_name: Name of an ontology concept + :return: The ontology concept holder for one of a given name if exists, otherwise None + """ + return self.__all_ontology_concept_holders.get(ontology_concept_name) + + @staticmethod + def get_ontology_concepts_of_designator(designator: DesignatorDescription) -> List[owlready2.Thing]: + """ + Get the corresponding ontology concepts for a given designator + + :param designator: A designator associated with an ontology concept + :return: A list of ontology concepts corresponding with a given designator + """ + return [concept_holder.ontology_concept for concept_holder in designator.ontology_concept_holders] + + def get_designators_of_ontology_concept(self, ontology_concept_name: str) -> List[DesignatorDescription]: + """ + Get the corresponding designators associated with a given ontology concept + + :param ontology_concept_name: An ontology concept name + :return: A list of designators corresponding to a given ontology concept + """ + return self.__all_ontology_concept_holders[ontology_concept_name].designators \ + if ontology_concept_name in self.__all_ontology_concept_holders else [] + + +class OntologyConceptHolder(object): + """ + Wrapper of an ontology concept that is either dynamically created or loaded from an ontology. + NOTE: Since an ontology concept class, after being saved into an ontology file, must be reusable in the next time + the ontology is loaded, there must be no other attributes of it that should be created aside from ones inherited from `owlready2.Thing`! + + :ivar ontology_concept: An ontology concept, either dynamically created, or loaded from an ontology + """ + + def __init__(self, ontology_concept: owlready2.Thing): + """ + Initialize a holder of a given ontology concept instance + + :param ontology_concept: An ontology concept instance + """ + if owlready2 is None: + return + + #: An ontology concept, either dynamically created, or loaded from an ontology + self.ontology_concept: owlready2.Thing = ontology_concept + #: List of designators associated with this ontology concept + self.designators: List[DesignatorDescription] = [] + #: A callable used to resolve the designators to whatever of interest, like designators or their resolving results + self.resolve: Optional[Callable] = None + + #: The store for all OntologyConceptHolder instances + self.concept_holder_store: OntologyConceptHolderStore = OntologyConceptHolderStore() + self.concept_holder_store.add_ontology_concept_holder(ontology_concept.name, self) + + @property + def name(self) -> str: + """ + Get name of the ontology concept owned by this holder + + :return: Ontology concept name + """ + return self.ontology_concept.name if self.ontology_concept else "" + + def get_default_designator(self) -> Optional[DesignatorDescription]: + """ + Get the first element of designators if there is, else None + + :return: The first designator associated with the ontology concept held by this holder if exists or None + """ + return self.designators[0] if len(self.designators) > 0 else None + + def has_designator(self, designator) -> bool: + """ + Check whether this ontology concept holder has a given designator registered with its ontology concept + + :return: True if a given designator was registered by this ontology concept holder, either by itself or under another of the same name + """ + if designator in self.designators: + return True + if not hasattr(designator, "name"): + return False + for our_designator in self.designators: + if hasattr(our_designator, "name") and (getattr(our_designator, "name") == getattr(designator, "name")): + return True + return False + + def __eq__(self, other: OntologyConceptHolder) -> bool: + """ + Equality check based on name of the ontology concept + + :param other: Other ontology concept instance to check against + :return: True if the ontology concept of the other holder has the same name with the current one, otherwise False + """ + return ((self.ontology_concept == other.ontology_concept) or + (self.ontology_concept.name == other.ontology_concept.name)) diff --git a/test/bullet_world_testcase.py b/test/bullet_world_testcase.py index 5c25f9a67..cdea4a082 100644 --- a/test/bullet_world_testcase.py +++ b/test/bullet_world_testcase.py @@ -10,7 +10,7 @@ from pycram.datastructures.enums import ObjectType, WorldMode from pycram.object_descriptors.urdf import ObjectDescription from pycram.ros.viz_marker_publisher import VizMarkerPublisher - +from pycram.ontology.ontology import OntologyManager, SOMA_ONTOLOGY_IRI class BulletWorldTestCase(unittest.TestCase): @@ -29,6 +29,7 @@ def setUpClass(cls): ObjectDescription, pose=Pose([1.3, 0.7, 0.95])) ProcessModule.execution_delay = False cls.viz_marker_publisher = VizMarkerPublisher() + OntologyManager(SOMA_ONTOLOGY_IRI) def setUp(self): self.world.reset_world() diff --git a/test/test_action_designator.py b/test/test_action_designator.py index 0329ead33..26054c4d4 100644 --- a/test/test_action_designator.py +++ b/test/test_action_designator.py @@ -16,6 +16,8 @@ class TestActionDesignatorGrounding(BulletWorldTestCase): def test_move_torso(self): description = action_designator.MoveTorsoAction([0.3]) + # SOMA ontology seems not provide a corresponding concept yet for MoveTorso + #self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().position, 0.3) with simulated_robot: description.resolve().perform() @@ -23,6 +25,7 @@ def test_move_torso(self): def test_set_gripper(self): description = action_designator.SetGripperAction(["left"], ["open", "close"]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().gripper, "left") self.assertEqual(description.ground().motion, "open") self.assertEqual(len(list(iter(description))), 2) @@ -34,18 +37,21 @@ def test_set_gripper(self): def test_release(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.ReleaseAction(["left"], object_description) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().gripper, "left") self.assertEqual(description.ground().object_designator.name, "milk") def test_grip(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.GripAction(["left"], object_description, [0.5]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().gripper, "left") self.assertEqual(description.ground().object_designator.name, "milk") def test_park_arms(self): description = action_designator.ParkArmsAction([Arms.BOTH]) self.assertEqual(description.ground().arm, Arms.BOTH) + self.assertTrue(description.ontology_concept_holders) with simulated_robot: description.resolve().perform() for joint, pose in robot_description.get_static_joint_chain("right", "park").items(): @@ -57,10 +63,12 @@ def test_park_arms(self): def test_navigate(self): description = action_designator.NavigateAction([Pose([1, 0, 0], [0, 0, 0, 1])]) self.assertEqual(description.ground().target_location, Pose([1, 0, 0], [0, 0, 0, 1])) + self.assertTrue(description.ontology_concept_holders) def test_pick_up(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.PickUpAction(object_description, ["left"], ["front"]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().object_designator.name, "milk") with simulated_robot: NavigateActionPerformable(Pose([0.6, 0.4, 0], [0, 0, 0, 1])).perform() @@ -71,6 +79,7 @@ def test_pick_up(self): def test_place(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.PlaceAction(object_description, [Pose([1.3, 1, 0.9], [0, 0, 0, 1])], ["left"]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().object_designator.name, "milk") with simulated_robot: NavigateActionPerformable(Pose([0.6, 0.4, 0], [0, 0, 0, 1])).perform() @@ -81,6 +90,7 @@ def test_place(self): def test_look_at(self): description = action_designator.LookAtAction([Pose([1, 0, 1])]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().target, Pose([1, 0, 1])) with simulated_robot: description.resolve().perform() @@ -91,6 +101,7 @@ def test_detect(self): self.milk.set_pose(Pose([1.5, 0, 1.2])) object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.DetectAction(object_description) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().object_designator.name, "milk") with simulated_robot: detected_object = description.resolve().perform() @@ -103,12 +114,14 @@ def test_detect(self): def test_open(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.OpenAction(object_description, ["left"], [1]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().object_designator.name, "milk") @unittest.skip def test_close(self): object_description = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.CloseAction(object_description, ["left"]) + self.assertTrue(description.ontology_concept_holders) self.assertEqual(description.ground().object_designator.name, "milk") def test_transport(self): @@ -117,6 +130,7 @@ def test_transport(self): ["left"], [Pose([-1.35, 0.78, 0.95], [0.0, 0.0, 0.16439898301071468, 0.9863939245479175])]) + self.assertTrue(description.ontology_concept_holders) with simulated_robot: action_designator.MoveTorsoAction([0.2]).resolve().perform() description.resolve().perform() @@ -130,6 +144,7 @@ def test_grasping(self): self.robot.set_pose(Pose([-2.14, 1.06, 0])) milk_desig = object_designator.ObjectDesignatorDescription(names=["milk"]) description = action_designator.GraspingAction(["right"], milk_desig) + self.assertTrue(description.ontology_concept_holders) with simulated_robot: description.resolve().perform() dist = np.linalg.norm( diff --git a/test/test_ontology.py b/test/test_ontology.py new file mode 100644 index 000000000..130b67568 --- /dev/null +++ b/test/test_ontology.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import unittest +import logging +from pathlib import Path +from typing import Type, Optional + +from pycram.designator import ObjectDesignatorDescription + +import rospy +try: + from owlready2 import * +except ImportError: + owlready2 = None + rospy.logwarn("Could not import owlready2, Ontology unit-tests could not run!") + +from pycram.ontology.ontology import OntologyManager, SOMA_ONTOLOGY_IRI +from pycram.ontology.ontology_common import OntologyConceptHolderStore, OntologyConceptHolder + + +class TestOntologyManager(unittest.TestCase): + ontology_manager: OntologyManager + main_ontology: Optional[owlready2.Ontology] + soma: Optional[owlready2.Ontology] + dul: Optional[owlready2.Ontology] + + @classmethod + def setUpClass(cls): + cls.ontology_manager = OntologyManager(SOMA_ONTOLOGY_IRI) + if cls.ontology_manager.initialized(): + cls.main_ontology = cls.ontology_manager.main_ontology + cls.soma = cls.ontology_manager.soma + cls.dul = cls.ontology_manager.dul + else: + cls.main_ontology = None + cls.soma = None + cls.dul = None + + @classmethod + def tearDownClass(cls): + if cls.ontology_manager.initialized(): + save_dir = Path(f"{Path.home()}/ontologies") + save_dir.mkdir(parents=True, exist_ok=True) + cls.ontology_manager.save(f"{save_dir}/{Path(cls.ontology_manager.main_ontology_iri).stem}.owl") + + def test_ontology_manager(self): + if self.ontology_manager.initialized(): + self.assertIs(self.ontology_manager, OntologyManager()) + + def test_ontology_concept_holder(self): + if not self.ontology_manager.initialized(): + return + dynamic_ontology_concept_class = self.ontology_manager.create_ontology_concept_class('DynamicOntologyConcept') + dynamic_ontology_concept_holder = OntologyConceptHolder(dynamic_ontology_concept_class(name='dynamic_ontology_concept1', + namespace=self.main_ontology)) + self.assertTrue(owlready2.isinstance_python(dynamic_ontology_concept_holder.ontology_concept, owlready2.Thing)) + + def test_loaded_ontologies(self): + if not self.ontology_manager.initialized(): + return + self.assertIsNotNone(self.main_ontology) + self.assertTrue(self.main_ontology.loaded) + self.assertIsNotNone(self.soma) + self.assertTrue(self.soma.loaded) + self.assertIsNotNone(self.dul) + self.assertTrue(self.dul.loaded) + + def test_ontology_concept_class_dynamic_creation(self): + if not self.ontology_manager.initialized(): + return + dynamic_ontology_concept_class = self.ontology_manager.create_ontology_concept_class('DynamicOntologyConcept') + self.assertIsNotNone(dynamic_ontology_concept_class) + self.assertEqual(dynamic_ontology_concept_class.namespace, self.main_ontology) + self.assertIs(dynamic_ontology_concept_class, self.main_ontology.DynamicOntologyConcept) + self.assertIs(issubclass(dynamic_ontology_concept_class, owlready2.Thing), True) + dynamic_ontology_concept = dynamic_ontology_concept_class(name='dynamic_ontology_concept2', + namespace=self.main_ontology) + self.assertTrue(owlready2.isinstance_python(dynamic_ontology_concept, owlready2.Thing)) + + def test_ontology_triple_classes_dynamic_creation(self): + if not self.ontology_manager.initialized(): + return + # Test dynamic triple classes creation without inheritance from existing parent ontology classes + self.ontology_manager.create_ontology_triple_classes(subject_class_name="OntologySubject", + object_class_name="OntologyObject", + predicate_name="predicate", + inverse_predicate_name="inverse_predicate") + + subject_class = self.main_ontology.OntologySubject + self.assertIsNotNone(subject_class) + subject_individual = subject_class("subject") + self.assertIsNotNone(subject_individual.predicate) + + object_class = self.main_ontology.OntologyObject + self.assertIsNotNone(object_class) + object_individual = object_class("object") + self.assertIsNotNone(object_individual.inverse_predicate) + + # Test dynamic triple classes creation as inheriting from existing parent ontology classes + PLACEABLE_ON_PREDICATE_NAME = "placeable_on" + HOLD_OBJ_PREDICATE_NAME = "hold_obj" + self.ontology_manager.create_ontology_triple_classes(ontology_subject_parent_class=self.soma.Container, + subject_class_name="OntologyPlaceHolderObject", + ontology_object_parent_class=self.dul.PhysicalObject, + object_class_name="OntologyHandheldObject", + predicate_name=PLACEABLE_ON_PREDICATE_NAME, + inverse_predicate_name=HOLD_OBJ_PREDICATE_NAME, + ontology_property_parent_class=self.soma.affordsBearer, + ontology_inverse_property_parent_class=self.soma.isBearerAffordedBy) + + def create_ontology_handheld_object_designator(object_name: str, ontology_parent_class: Type[owlready2.Thing]): + return self.ontology_manager.create_ontology_linked_designator(object_name=object_name, + designator_class=ObjectDesignatorDescription, + ontology_concept_name=f"Onto{object_name}", + ontology_parent_class=ontology_parent_class) + + # Holdable object + egg = create_ontology_handheld_object_designator("egg", self.main_ontology.OntologyHandheldObject) + # Placeholder object + egg_tray = create_ontology_handheld_object_designator("egg_tray", self.main_ontology.OntologyPlaceHolderObject) + + # Create ontology relation between [Place-holder] and [Holdable obj] + self.ontology_manager.set_ontology_relation(subject_designator=egg, object_designator=egg_tray, + predicate_name=PLACEABLE_ON_PREDICATE_NAME) + + self.ontology_manager.set_ontology_relation(subject_designator=egg_tray, object_designator=egg, + predicate_name=HOLD_OBJ_PREDICATE_NAME) + + # Query potential designator candidates based on above-set ontology relations among them + egg_placeholders = [placeholder.names for placeholder in \ + self.ontology_manager.get_designators_by_subject_predicate(subject=egg, + predicate_name=PLACEABLE_ON_PREDICATE_NAME)] + self.assertTrue(len(egg_placeholders) == 1) + self.assertEqual(egg_placeholders[0], ["egg_tray"]) + + egg_tray_holdables = [placeholder.names for placeholder in \ + self.ontology_manager.get_designators_by_subject_predicate(subject=egg_tray, + predicate_name=HOLD_OBJ_PREDICATE_NAME)] + self.assertTrue(len(egg_tray_holdables) == 1) + self.assertEqual(egg_tray_holdables[0], ["egg"]) + + def test_ontology_class_destruction(self): + if not self.ontology_manager.initialized(): + return + concept_class_name = 'DynamicOntologyConcept' + dynamic_ontology_concept_class = self.ontology_manager.create_ontology_concept_class(concept_class_name) + OntologyConceptHolder(dynamic_ontology_concept_class(name='dynamic_ontology_concept3', + namespace=self.main_ontology)) + + self.ontology_manager.destroy_ontology_class(dynamic_ontology_concept_class) + self.assertIsNone(self.ontology_manager.get_ontology_class(concept_class_name)) + self.assertFalse(OntologyConceptHolderStore().get_ontology_concepts_by_class(dynamic_ontology_concept_class)) + + +if __name__ == '__main__': + unittest.main()