Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/multitypes #2

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,6 @@ jobs:
maturin-version: ${{ env.MATURIN_VERSION }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}

# Needed for Docker on Apple M1
manylinux-aarch64:
runs-on: ubuntu-latest
environment: release
strategy:
fail-fast: false
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11' ]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

# Needed to avoid out-of-memory error
- name: Set Swap Space
uses: pierotofy/set-swap-space@master
with:
swap-size-gb: 10

- name: Fix README symlink
run: |
rm py_maplib/README.md
cp README.md py_maplib/README.md

- name: Publish wheel
uses: PyO3/maturin-action@v1
env:
JEMALLOC_SYS_WITH_LG_PAGE: 16
with:
command: publish
args: -m py_maplib/Cargo.toml --skip-existing --no-sdist -o wheels -i python${{ matrix.PYTHON_VERSION }} -u magbak
target: aarch64-unknown-linux-gnu
maturin-version: ${{ env.MATURIN_VERSION }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}

win-macos:
runs-on: ${{ matrix.os }}
strategy:
Expand Down
19 changes: 11 additions & 8 deletions maplib/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,18 +339,17 @@ impl Mapping {
new_blank_node_counter: usize,
) -> Result<(), MappingError> {
let now = Instant::now();
let triples: Vec<
Result<(DataFrame, RDFNodeType, Option<String>, Option<String>, bool), MappingError>,
> = result_vec.par_drain(..).map(create_triples).collect();
let triples: Vec<_> = result_vec.par_drain(..).map(create_triples).collect();
let mut ok_triples = vec![];
for t in triples {
ok_triples.push(t?);
}
let mut all_triples_to_add = vec![];
for (df, rdf_node_type, language_tag, verb, has_unique_subset) in ok_triples {
for (df, subj_rdf_node_type, obj_rdf_node_type , language_tag, verb, has_unique_subset) in ok_triples {
all_triples_to_add.push(TriplesToAdd {
df,
object_type: rdf_node_type,
subject_type: subj_rdf_node_type,
object_type: obj_rdf_node_type,
language_tag,
static_verb_column: verb,
has_unique_subset,
Expand Down Expand Up @@ -383,7 +382,7 @@ fn get_variable_names(i: &Instance) -> Vec<&String> {

fn create_triples(
i: OTTRTripleInstance,
) -> Result<(DataFrame, RDFNodeType, Option<String>, Option<String>, bool), MappingError> {
) -> Result<(DataFrame, RDFNodeType, RDFNodeType, Option<String>, Option<String>, bool), MappingError> {
let OTTRTripleInstance {
mut df,
mut dynamic_columns,
Expand Down Expand Up @@ -430,10 +429,14 @@ fn create_triples(
lf = lf.select(keep_cols.as_slice());
let df = lf.collect().expect("Collect problem");
let PrimitiveColumn {
rdf_node_type,
rdf_node_type:subj_rdf_node_type,
language_tag:_,
} = dynamic_columns.remove("subject").unwrap();
let PrimitiveColumn {
rdf_node_type: obj_rdf_node_type,
language_tag,
} = dynamic_columns.remove("object").unwrap();
Ok((df, rdf_node_type, language_tag, verb, has_unique_subset))
Ok((df, subj_rdf_node_type, obj_rdf_node_type, language_tag, verb, has_unique_subset))
}

fn create_dynamic_expression_from_static(
Expand Down
4 changes: 2 additions & 2 deletions py_maplib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py_maplib"
version = "0.4.27"
version = "0.4.28"
edition = "2021"

[workspace]
Expand All @@ -13,7 +13,7 @@ triplestore = {path="../triplestore"}
oxrdf = "0.1.0"
arrow_python_utils = {path="../arrow_python_utils"}
thiserror="1.0.31"
polars-core = {version="0.32.1", features=["dtype-array", "dtype-categorical", "dtype-date", "dtype-datetime",
polars-core = {version="0.32.1", features=["object", "dtype-array", "dtype-categorical", "dtype-date", "dtype-datetime",
"dtype-decimal", "dtype-duration", "dtype-i8", "dtype-i16", "dtype-struct", "dtype-time", "dtype-u8", "dtype-u16"]}
log ="0.4.19"

Expand Down
36 changes: 34 additions & 2 deletions py_maplib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use triplestore::sparql::QueryResult;
// SOFTWARE.
#[cfg(target_os = "linux")]
use jemallocator::Jemalloc;
use polars_core::frame::DataFrame;
use polars_core::prelude::{DataType, NamedFrom, Series};
use triplestore::sparql::multitype::{MULTI_TYPE_NAME, MultiType};

#[cfg(not(target_os = "linux"))]
use mimalloc::MiMalloc;
Expand Down Expand Up @@ -329,9 +332,12 @@ impl Mapping {
.query(&query)
.map_err(PyMaplibError::from)?;
match res {
QueryResult::Select(df) => df_to_py_df(df, py),
QueryResult::Select(mut df, datatypes) => {
df = fix_multicolumns(df);
df_to_py_df(df, py)
},
QueryResult::Construct(dfs) => {
let dfs = dfs.into_iter().map(|(df, _)| df).collect();
let dfs = dfs.into_iter().map(|(df, subj_type, obj_type)| fix_multicolumns(df)).collect();
Ok(df_vec_to_py_df_list(dfs, py)?.into())
}
}
Expand Down Expand Up @@ -498,3 +504,29 @@ fn _maplib(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
fn is_blank_node(s: &str) -> bool {
s.starts_with("_:")
}

fn fix_multicolumns(mut df: DataFrame) -> DataFrame {
let columns:Vec<_> = df.get_column_names().iter().map(|x|x.to_string()).collect();
for c in columns {
if df.column(&c).unwrap().dtype() == &DataType::Object(MULTI_TYPE_NAME) {
let ser = df.column(&c).unwrap();
let mut strs = vec![];
for i in 0..ser.len() {
let maybe_o = ser.get_object(i);
if let Some(o) = maybe_o {
let o: Option<&MultiType> = o.as_any().downcast_ref();
if let Some(m) = o {
strs.push(Some(m.to_string()));
} else {
strs.push(None)
}
} else {
strs.push(None);
}
}
let new_ser = Series::new(&c, strs);
df.with_column(new_ser).unwrap();
}
}
df
}
3 changes: 2 additions & 1 deletion py_maplib/tests/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ testdata/gtfs
test_opcua_mapping.py
testdata/*.parquet
bench200_000.nt
tmp
tmp
out.nt
52 changes: 49 additions & 3 deletions py_maplib/tests/test_blank_nodes.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import polars as pl
import pytest
import rdflib
from polars.testing import assert_frame_equal

import pathlib
from maplib import Mapping

pl.Config.set_fmt_str_lengths(300)


PATH_HERE = pathlib.Path(__file__).parent
TESTDATA_PATH = PATH_HERE / "testdata"

@pytest.fixture(scope="function")
def blank_person_mapping():
# The following example comes from https://primer.ottr.xyz/01-basics.html
Expand All @@ -22,7 +26,7 @@ def blank_person_mapping():
@prefix ottr: <http://ns.ottr.xyz/0.4/> .
@prefix ax: <http://tpl.ottr.xyz/owl/axiom/0.1/> .
@prefix rstr: <http://tpl.ottr.xyz/owl/restriction/0.1/> .
ex:Person[ ?firstName, ?lastName, ?email ] :: {
ex:Person[ ?firstName, ?lastName, xsd:anyURI ?email ] :: {
ottr:Triple(_:person, rdf:type, foaf:Person ),
ottr:Triple(_:person, foaf:firstName, ?firstName ),
ottr:Triple(_:person, foaf:lastName, ?lastName ),
Expand All @@ -48,7 +52,49 @@ def test_simple_query_no_error(blank_person_mapping):
} ORDER BY ?firstName ?lastName
""")
expected_df = pl.DataFrame({"firstName": ["Ann", "Bob"],
"lastName": ["Strong", "Brite"]})
"lastName": ["Strong", "Brite"]})

assert_frame_equal(df, expected_df)


def test_simple_query_blank_node_output_no_error(blank_person_mapping):
blank_person_mapping.write_ntriples("out.nt")
gr = rdflib.Graph()
gr.parse("out.nt", format="ntriples")
res = gr.query("""
PREFIX foaf:<http://xmlns.com/foaf/0.1/>

SELECT ?firstName ?lastName WHERE {
?p a foaf:Person .
?p foaf:lastName ?lastName .
?p foaf:firstName ?firstName .
} ORDER BY ?firstName ?lastName
""")
assert len(res) == 2


def test_multi_datatype_query_no_error(blank_person_mapping):
df = blank_person_mapping.query("""
PREFIX foaf:<http://xmlns.com/foaf/0.1/>

SELECT ?s ?v ?o WHERE {
?s ?v ?o .
}
""")
by = ["s","v","o"]
df = df.sort(by=by)
filename = TESTDATA_PATH / "multi_datatype_query.csv"
#df.write_csv(filename)
expected_df = pl.scan_csv(filename).sort(by).collect()
assert_frame_equal(df, expected_df)


@pytest.mark.skip()
def test_multi_datatype_query_sorting_no_error(blank_person_mapping):
df = blank_person_mapping.query("""
PREFIX foaf:<http://xmlns.com/foaf/0.1/>

SELECT ?s ?v ?o WHERE {
?s ?v ?o .
} ORDER BY ?s ?v ?o
""")
9 changes: 9 additions & 0 deletions py_maplib/tests/testdata/multi_datatype_query.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
s,v,o
_:person_l0_p0_r0,http://www.w3.org/1999/02/22-rdf-syntax-ns#type,<http://xmlns.com/foaf/0.1/Person>
_:person_l0_p0_r0,http://xmlns.com/foaf/0.1/firstName,"""Ann"""
_:person_l0_p0_r0,http://xmlns.com/foaf/0.1/lastName,"""Strong"""
_:person_l0_p0_r0,http://xmlns.com/foaf/0.1/mbox,<mailto:[email protected]>
_:person_l0_p0_r1,http://www.w3.org/1999/02/22-rdf-syntax-ns#type,<http://xmlns.com/foaf/0.1/Person>
_:person_l0_p0_r1,http://xmlns.com/foaf/0.1/firstName,"""Bob"""
_:person_l0_p0_r1,http://xmlns.com/foaf/0.1/lastName,"""Brite"""
_:person_l0_p0_r1,http://xmlns.com/foaf/0.1/mbox,<mailto:[email protected]>
11 changes: 11 additions & 0 deletions representation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@ pub enum RDFNodeType {
BlankNode,
Literal(NamedNode),
None,
MultiType
}

impl RDFNodeType {

pub fn union(&self, other:&RDFNodeType) -> RDFNodeType {
if self == other {
self.clone()
} else {
RDFNodeType::MultiType
}
}

pub fn is_lit_type(&self, nnref: NamedNodeRef) -> bool {
if let RDFNodeType::Literal(l) = self {
if l.as_ref() == nnref {
Expand Down Expand Up @@ -70,6 +80,7 @@ impl RDFNodeType {
}
},
RDFNodeType::None => DataType::Null,
RDFNodeType::MultiType => todo!()
}
}
}
2 changes: 1 addition & 1 deletion triplestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rayon = "1.6.0"
sprs = {version="0.11.0", features=["rayon"]}
spargebra = "0.2.2"
oxrdf = "0.1.0"
polars = {version="0.32.1", features=["performant", "semi_anti_join","abs", "round_series", "lazy", "concat_str", "is_in", "dtype-full", "strings", "horizontal_concat", "rows", "timezones", "polars-time", "temporal", "list_eval", "partition_by", "parquet", "diagonal_concat", "cross_join", "cum_agg"] }
polars = {version="0.32.1", features=["object", "performant", "semi_anti_join","abs", "round_series", "lazy", "concat_str", "is_in", "dtype-full", "strings", "horizontal_concat", "rows", "timezones", "polars-time", "temporal", "list_eval", "partition_by", "parquet", "diagonal_concat", "cross_join", "cum_agg"] }
log="0.4.19"
rio_turtle = "0.8.4"
rio_api = "0.8.4"
Expand Down
14 changes: 7 additions & 7 deletions triplestore/src/export_triples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ impl Triplestore {
F: Fn(&str, &str, &str) -> T,
{
for (verb, map) in &mut self.df_map {
for (k, v) in map {
if k.find_triple_type() == TripleType::ObjectProperty {
for ((k1, k2), v) in map {
if k2.find_triple_type() == TripleType::ObjectProperty {
for i in 0..v.len() {
let df = v.get_df(i)?;
if df.height() == 0 {
Expand Down Expand Up @@ -48,8 +48,8 @@ impl Triplestore {
{
//subject, verb, lexical_form, language_tag, datatype
for (verb, map) in &mut self.df_map {
for (k, v) in map {
if k.find_triple_type() == TripleType::StringProperty {
for ((k1, k2), v) in map {
if k2.find_triple_type() == TripleType::StringProperty {
for i in 0..v.len() {
let df = v.get_df(i)?;
if df.height() == 0 {
Expand Down Expand Up @@ -88,9 +88,9 @@ impl Triplestore {
{
//subject, verb, lexical_form, datatype
for (verb, map) in &mut self.df_map {
for (k, v) in map {
if k.find_triple_type() == TripleType::NonStringProperty {
let object_type = if let RDFNodeType::Literal(l) = k {
for ((k1, k2), v) in map {
if k2.find_triple_type() == TripleType::NonStringProperty {
let object_type = if let RDFNodeType::Literal(l) = k2 {
l
} else {
panic!("Should never happen")
Expand Down
Loading