Skip to content

Make sure pgtap errors get caught #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pypgstac/pypgstac/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""PyPGStac Version."""
__version__ = "0.3.2"
__version__ = "0.3.3"
361 changes: 361 additions & 0 deletions pypgstac/pypgstac/migrations/pgstac.0.3.2-0.3.3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
SET SEARCH_PATH to pgstac, public;
set check_function_bodies = off;

CREATE OR REPLACE FUNCTION pgstac.add_filters_to_cql(j jsonb)
RETURNS jsonb
LANGUAGE plpgsql
AS $function$
DECLARE
newprop jsonb;
newprops jsonb := '[]'::jsonb;
BEGIN
IF j ? 'ids' THEN
newprop := jsonb_build_object(
'in',
jsonb_build_array(
'{"property":"id"}'::jsonb,
j->'ids'
)
);
newprops := jsonb_insert(newprops, '{1}', newprop);
END IF;
IF j ? 'collections' THEN
newprop := jsonb_build_object(
'in',
jsonb_build_array(
'{"property":"collection"}'::jsonb,
j->'collections'
)
);
newprops := jsonb_insert(newprops, '{1}', newprop);
END IF;

IF j ? 'datetime' THEN
newprop := format(
'{"anyinteracts":[{"property":"datetime"}, %s]}',
j->'datetime'
);
newprops := jsonb_insert(newprops, '{1}', newprop);
END IF;

IF j ? 'bbox' THEN
newprop := format(
'{"intersects":[{"property":"geometry"}, %s]}',
j->'bbox'
);
newprops := jsonb_insert(newprops, '{1}', newprop);
END IF;

IF j ? 'intersects' THEN
newprop := format(
'{"intersects":[{"property":"geometry"}, %s]}',
j->'intersects'
);
newprops := jsonb_insert(newprops, '{1}', newprop);
END IF;

RAISE NOTICE 'newprops: %', newprops;

IF newprops IS NOT NULL AND jsonb_array_length(newprops) > 0 THEN
return jsonb_set(
j,
'{filter}',
cql_and_append(j, jsonb_build_object('and', newprops))
) - '{ids,collections,datetime,bbox,intersects}'::text[];
END IF;

return j;
END;
$function$
;

CREATE OR REPLACE FUNCTION pgstac.get_token_filter(_search jsonb DEFAULT '{}'::jsonb, token_rec jsonb DEFAULT NULL::jsonb)
RETURNS text
LANGUAGE plpgsql
AS $function$
DECLARE
token_id text;
filters text[] := '{}'::text[];
prev boolean := TRUE;
field text;
dir text;
sort record;
orfilters text[] := '{}'::text[];
andfilters text[] := '{}'::text[];
output text;
token_where text;
BEGIN
-- If no token provided return NULL
IF token_rec IS NULL THEN
IF NOT (_search ? 'token' AND
(
(_search->>'token' ILIKE 'prev:%')
OR
(_search->>'token' ILIKE 'next:%')
)
) THEN
RETURN NULL;
END IF;
prev := (_search->>'token' ILIKE 'prev:%');
token_id := substr(_search->>'token', 6);
SELECT to_jsonb(items) INTO token_rec FROM items WHERE id=token_id;
END IF;
RAISE NOTICE 'TOKEN ID: %', token_rec->'id';

CREATE TEMP TABLE sorts (
_row int GENERATED ALWAYS AS IDENTITY NOT NULL,
_field text PRIMARY KEY,
_dir text NOT NULL,
_val text
) ON COMMIT DROP;

-- Make sure we only have distinct columns to sort with taking the first one we get
INSERT INTO sorts (_field, _dir)
SELECT
(items_path(value->>'field')).path,
get_sort_dir(value)
FROM
jsonb_array_elements(coalesce(_search->'sortby','[{"field":"datetime","direction":"desc"}]'))
ON CONFLICT DO NOTHING
;
RAISE NOTICE 'sorts 1: %', (SELECT jsonb_agg(to_json(sorts)) FROM sorts);
-- Get the first sort direction provided. As the id is a primary key, if there are any
-- sorts after id they won't do anything, so make sure that id is the last sort item.
SELECT _dir INTO dir FROM sorts ORDER BY _row ASC LIMIT 1;
IF EXISTS (SELECT 1 FROM sorts WHERE _field = 'id') THEN
DELETE FROM sorts WHERE _row > (SELECT _row FROM sorts WHERE _field = 'id' ORDER BY _row ASC);
ELSE
INSERT INTO sorts (_field, _dir) VALUES ('id', dir);
END IF;

-- Add value from looked up item to the sorts table
UPDATE sorts SET _val=quote_literal(token_rec->>_field);

-- Check if all sorts are the same direction and use row comparison
-- to filter
RAISE NOTICE 'sorts 2: %', (SELECT jsonb_agg(to_json(sorts)) FROM sorts);

IF (SELECT count(DISTINCT _dir) FROM sorts) = 1 THEN
SELECT format(
'(%s) %s (%s)',
concat_ws(', ', VARIADIC array_agg(quote_ident(_field))),
CASE WHEN (prev AND dir = 'ASC') OR (NOT prev AND dir = 'DESC') THEN '<' ELSE '>' END,
concat_ws(', ', VARIADIC array_agg(_val))
) INTO output FROM sorts
WHERE token_rec ? _field
;
ELSE
FOR sort IN SELECT * FROM sorts ORDER BY _row asc LOOP
RAISE NOTICE 'SORT: %', sort;
IF sort._row = 1 THEN
orfilters := orfilters || format('(%s %s %s)',
quote_ident(sort._field),
CASE WHEN (prev AND sort._dir = 'ASC') OR (NOT prev AND sort._dir = 'DESC') THEN '<' ELSE '>' END,
sort._val
);
ELSE
orfilters := orfilters || format('(%s AND %s %s %s)',
array_to_string(andfilters, ' AND '),
quote_ident(sort._field),
CASE WHEN (prev AND sort._dir = 'ASC') OR (NOT prev AND sort._dir = 'DESC') THEN '<' ELSE '>' END,
sort._val
);

END IF;
andfilters := andfilters || format('%s = %s',
quote_ident(sort._field),
sort._val
);
END LOOP;
output := array_to_string(orfilters, ' OR ');
END IF;
DROP TABLE IF EXISTS sorts;
token_where := concat('(',coalesce(output,'true'),')');
IF trim(token_where) = '' THEN
token_where := NULL;
END IF;
RAISE NOTICE 'TOKEN_WHERE: |%|',token_where;
RETURN token_where;
END;
$function$
;

CREATE OR REPLACE FUNCTION pgstac.search(_search jsonb DEFAULT '{}'::jsonb)
RETURNS jsonb
LANGUAGE plpgsql
SET jit TO 'off'
AS $function$
DECLARE
searches searches%ROWTYPE;
_where text;
token_where text;
full_where text;
orderby text;
query text;
token_type text := substr(_search->>'token',1,4);
_limit int := coalesce((_search->>'limit')::int, 10);
curs refcursor;
cntr int := 0;
iter_record items%ROWTYPE;
first_record items%ROWTYPE;
last_record items%ROWTYPE;
out_records jsonb := '[]'::jsonb;
prev_query text;
next text;
prev_id text;
has_next boolean := false;
has_prev boolean := false;
prev text;
total_count bigint;
context jsonb;
collection jsonb;
includes text[];
excludes text[];
exit_flag boolean := FALSE;
batches int := 0;
timer timestamptz := clock_timestamp();
BEGIN
searches := search_query(_search);
_where := searches._where;
orderby := searches.orderby;
total_count := coalesce(searches.total_count, searches.estimated_count);


IF token_type='prev' THEN
token_where := get_token_filter(_search, null::jsonb);
orderby := sort_sqlorderby(_search, TRUE);
END IF;
IF token_type='next' THEN
token_where := get_token_filter(_search, null::jsonb);
END IF;

full_where := concat_ws(' AND ', _where, token_where);
RAISE NOTICE 'FULL QUERY % %', full_where, clock_timestamp()-timer;
timer := clock_timestamp();

FOR query IN SELECT partition_queries(full_where, orderby) LOOP
timer := clock_timestamp();
query := format('%s LIMIT %L', query, _limit + 1);
RAISE NOTICE 'Partition Query: %', query;
batches := batches + 1;
curs = create_cursor(query);
LOOP
FETCH curs into iter_record;
EXIT WHEN NOT FOUND;
cntr := cntr + 1;
last_record := iter_record;
IF cntr = 1 THEN
first_record := last_record;
END IF;
IF cntr <= _limit THEN
out_records := out_records || last_record.content;
ELSIF cntr > _limit THEN
has_next := true;
exit_flag := true;
EXIT;
END IF;
END LOOP;
RAISE NOTICE 'Query took %', clock_timestamp()-timer;
timer := clock_timestamp();
EXIT WHEN exit_flag;
END LOOP;
RAISE NOTICE 'Scanned through % partitions.', batches;


-- Flip things around if this was the result of a prev token query
IF token_type='prev' THEN
out_records := flip_jsonb_array(out_records);
first_record := last_record;
END IF;

-- If this query has a token, see if there is data before the first record
IF _search ? 'token' THEN
prev_query := format(
'SELECT 1 FROM items WHERE %s LIMIT 1',
concat_ws(
' AND ',
_where,
trim(get_token_filter(_search, to_jsonb(first_record)))
)
);
RAISE NOTICE 'Query to get previous record: % --- %', prev_query, first_record;
EXECUTE prev_query INTO has_prev;
IF FOUND and has_prev IS NOT NULL THEN
RAISE NOTICE 'Query results from prev query: %', has_prev;
has_prev := TRUE;
END IF;
END IF;
has_prev := COALESCE(has_prev, FALSE);

RAISE NOTICE 'token_type: %, has_next: %, has_prev: %', token_type, has_next, has_prev;
IF has_prev THEN
prev := out_records->0->>'id';
END IF;
IF has_next OR token_type='prev' THEN
next := out_records->-1->>'id';
END IF;


-- include/exclude any fields following fields extension
IF _search ? 'fields' THEN
IF _search->'fields' ? 'exclude' THEN
excludes=textarr(_search->'fields'->'exclude');
END IF;
IF _search->'fields' ? 'include' THEN
includes=textarr(_search->'fields'->'include');
IF array_length(includes, 1)>0 AND NOT 'id' = ANY (includes) THEN
includes = includes || '{id}';
END IF;
END IF;
SELECT jsonb_agg(filter_jsonb(row, includes, excludes)) INTO out_records FROM jsonb_array_elements(out_records) row;
END IF;

context := jsonb_strip_nulls(jsonb_build_object(
'limit', _limit,
'matched', total_count,
'returned', coalesce(jsonb_array_length(out_records), 0)
));

collection := jsonb_build_object(
'type', 'FeatureCollection',
'features', coalesce(out_records, '[]'::jsonb),
'next', next,
'prev', prev,
'context', context
);

RETURN collection;
END;
$function$
;

CREATE OR REPLACE FUNCTION pgstac.sort_sqlorderby(_search jsonb DEFAULT NULL::jsonb, reverse boolean DEFAULT false)
RETURNS text
LANGUAGE sql
AS $function$
WITH sortby AS (
SELECT coalesce(_search->'sortby','[{"field":"datetime", "direction":"desc"}]') as sort
), withid AS (
SELECT CASE
WHEN sort @? '$[*] ? (@.field == "id")' THEN sort
ELSE sort || '[{"field":"id", "direction":"desc"}]'::jsonb
END as sort
FROM sortby
), withid_rows AS (
SELECT jsonb_array_elements(sort) as value FROM withid
),sorts AS (
SELECT
(items_path(value->>'field')).path as key,
parse_sort_dir(value->>'direction', reverse) as dir
FROM withid_rows
)
SELECT array_to_string(
array_agg(concat(key, ' ', dir)),
', '
) FROM sorts;
$function$
;



INSERT INTO migrations (version) VALUES ('0.3.3');
Loading