Skip to content

Commit

Permalink
Add basic support for calculations (#1364)
Browse files Browse the repository at this point in the history
This also splits up the fetching and merging for the formulas (follow up
to #1362) and adds basic
tests.

This currently does not support matching facets, which will be a follow
up.

Currently, this independently calculates each missing variable/entity
pair, rather than combining the entities into one DerivedSeries request,
and then updates the merging to accept an array of responses. This was
to avoid computing the calculation if some of the entities already have
data, however if it's preferred to only do one DerivedSeries per formula
I can update this.
  • Loading branch information
n-h-diaz authored May 31, 2024
1 parent 1d224bd commit 85af98c
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 29 deletions.
18 changes: 18 additions & 0 deletions internal/merger/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ func MergeObservation(main, aux *pbv2.ObservationResponse) *pbv2.ObservationResp
return main
}

// MergeMultipleObservations merges multiple V2 observation responses, ranked
// in order of preference.
func MergeMultipleObservations(obs ...*pbv2.ObservationResponse) *pbv2.ObservationResponse {
if obs == nil {
return nil
}
if len(obs) == 0 {
return nil
}
if len(obs) == 1 {
return obs[0]
}
if len(obs) == 2 {
return MergeObservation(obs[0], obs[1])
}
return MergeObservation(obs[0], MergeMultipleObservations(obs[1:]...))
}

// MergeObservationDates merges two V1 observation-dates responses.
func MergeObservationDates(
main, aux *pbv1.BulkObservationDatesLinkedResponse,
Expand Down
123 changes: 123 additions & 0 deletions internal/merger/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,129 @@ func TestMergeObservation(t *testing.T) {
}
}

func TestMergeMultipleObservations(t *testing.T) {
cmpOpts := cmp.Options{
protocmp.Transform(),
}

for _, c := range []struct {
o1 *pbv2.ObservationResponse
o2 *pbv2.ObservationResponse
o3 *pbv2.ObservationResponse
want *pbv2.ObservationResponse
}{
{
&pbv2.ObservationResponse{
ByVariable: map[string]*pbv2.VariableObservation{
"var1": {
ByEntity: map[string]*pbv2.EntityObservation{
"entity1": {
OrderedFacets: []*pbv2.FacetObservation{
{
FacetId: "facet1",
Observations: []*pb.PointStat{
{
Date: "2021",
Value: proto.Float64(45.4),
},
},
},
},
},
},
},
},
},
&pbv2.ObservationResponse{
ByVariable: map[string]*pbv2.VariableObservation{
"var1": {
ByEntity: map[string]*pbv2.EntityObservation{
"entity1": {
OrderedFacets: []*pbv2.FacetObservation{
{
FacetId: "facet2",
Observations: []*pb.PointStat{
{
Date: "2022",
Value: proto.Float64(66.4),
},
},
},
},
},
},
},
},
},
&pbv2.ObservationResponse{
ByVariable: map[string]*pbv2.VariableObservation{
"var1": {
ByEntity: map[string]*pbv2.EntityObservation{
"entity1": {
OrderedFacets: []*pbv2.FacetObservation{
{
FacetId: "facet3",
Observations: []*pb.PointStat{
{
Date: "2023",
Value: proto.Float64(7.28),
},
},
},
},
},
},
},
},
},
&pbv2.ObservationResponse{
ByVariable: map[string]*pbv2.VariableObservation{
"var1": {
ByEntity: map[string]*pbv2.EntityObservation{
"entity1": {
OrderedFacets: []*pbv2.FacetObservation{
{
FacetId: "facet1",
Observations: []*pb.PointStat{
{
Date: "2021",
Value: proto.Float64(45.4),
},
},
},
{
FacetId: "facet2",
Observations: []*pb.PointStat{
{
Date: "2022",
Value: proto.Float64(66.4),
},
},
},
{
FacetId: "facet3",
Observations: []*pb.PointStat{
{
Date: "2023",
Value: proto.Float64(7.28),
},
},
},
},
},
},
},
},
},
},
} {
got := MergeMultipleObservations(c.o1, c.o2, c.o3)
if diff := cmp.Diff(got, c.want, cmpOpts); diff != "" {
t.Errorf("MergeMultipleObservations(%v, %v, %v) got diff: %s", c.o1, c.o2, c.o3, diff)
}
}
}

func TestMergeBulkVariableInfoResponse(t *testing.T) {
cmpOpts := cmp.Options{protocmp.Transform()}
for _, tc := range []struct {
Expand Down
7 changes: 6 additions & 1 deletion internal/server/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/datacommonsorg/mixer/internal/merger"
"github.com/datacommonsorg/mixer/internal/server/pagination"
v2observation "github.com/datacommonsorg/mixer/internal/server/v2/observation"
"github.com/datacommonsorg/mixer/internal/util"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -242,5 +243,9 @@ func (s *Server) V2Observation(
localResp, remoteResp := <-localRespChan, <-remoteRespChan
// The order of argument matters, localResp is prefered and will be put first
// in the merged result.
return merger.MergeObservation(localResp, remoteResp), nil
mergedResp := merger.MergeObservation(localResp, remoteResp)
calculatedResps := v2observation.CalculateObservationResponses(ctx, s.store, mergedResp, s.cachedata.Load())
// mergedResp is preferred over any calculated response.
combinedResp := append([]*pbv2.ObservationResponse{mergedResp}, calculatedResps...)
return merger.MergeMultipleObservations(combinedResp...), nil
}
53 changes: 25 additions & 28 deletions internal/server/statvar/fetcher/formula_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"net/http"
"sort"

pb "github.com/datacommonsorg/mixer/internal/proto"
"github.com/datacommonsorg/mixer/internal/merger"
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/server/resource"
v1pv "github.com/datacommonsorg/mixer/internal/server/v1/propertyvalues"
Expand All @@ -29,14 +29,13 @@ import (
)

// FetchFormulas fetches StatisticalCalculations and returns a map of SV dcids to a list of inputPropertyExpressions.
// TODO: Split fetch and merge logic.
func FetchFormulas(
ctx context.Context,
store *store.Store,
metadata *resource.Metadata,
) (map[string][]string, error) {
errGroup, errCtx := errgroup.WithContext(ctx)
localRespChan := make(chan map[string]map[string]map[string][]*pb.EntityInfo, 1)
localRespChan := make(chan *pbv2.NodeResponse, 1)
remoteRespChan := make(chan *pbv2.NodeResponse, 1)
// Fetch for BT and SQL.
errGroup.Go(func() error {
Expand All @@ -61,7 +60,7 @@ func FetchFormulas(
if len(statCal) == 0 {
return nil
}
localResp, _, err := v1pv.Fetch(
resp, _, err := v1pv.Fetch(
errCtx,
store,
statCal,
Expand All @@ -73,6 +72,20 @@ func FetchFormulas(
if err != nil {
return err
}
// Wrap result in pbv2.NodeResponse
localResp := &pbv2.NodeResponse{Data: map[string]*pbv2.LinkedGraph{}}
for dcid, data := range resp {
localResp.Data[dcid] = &pbv2.LinkedGraph{
Arcs: map[string]*pbv2.Nodes{},
}
for prop, nodes := range data {
if len(nodes) > 0 {
localResp.Data[dcid].Arcs[prop] = &pbv2.Nodes{
Nodes: v1pv.MergeTypedNodes(nodes),
}
}
}
}
localRespChan <- localResp
return nil
})
Expand Down Expand Up @@ -116,31 +129,15 @@ func FetchFormulas(
close(localRespChan)
close(remoteRespChan)
localResp, remoteResp := <-localRespChan, <-remoteRespChan
result := map[string][]string{}
localResult := map[string]map[string]bool{}
for _, props := range localResp {
for _, outputProps := range props["outputProperty"] {
for _, outputNode := range outputProps {
for _, inputProps := range props["inputPropertyExpression"] {
for _, inputNode := range inputProps {
result[outputNode.Dcid] = append(result[outputNode.Dcid], inputNode.Value)
localResult[outputNode.Dcid] = map[string]bool{inputNode.Value: true}
}
}
}
}
mergedResp, err := merger.MergeNode(localResp, remoteResp)
if err != nil {
return nil, err
}
if remoteResp != nil {
for _, props := range remoteResp.Data {
for _, outputNode := range props.Arcs["outputProperty"].Nodes {
for _, inputNode := range props.Arcs["inputPropertyExpression"].Nodes {
// Don't duplicate local formulas.
if _, ok := localResult[outputNode.Dcid]; !ok {
result[outputNode.Dcid] = append(result[outputNode.Dcid], inputNode.Value)
} else if _, ok := localResult[outputNode.Dcid][inputNode.Value]; !ok {
result[outputNode.Dcid] = append(result[outputNode.Dcid], inputNode.Value)
}
}
result := map[string][]string{}
for _, props := range mergedResp.Data {
for _, outputNode := range props.Arcs["outputProperty"].Nodes {
for _, inputNode := range props.Arcs["inputPropertyExpression"].Nodes {
result[outputNode.Dcid] = append(result[outputNode.Dcid], inputNode.Value)
}
}
}
Expand Down
72 changes: 72 additions & 0 deletions internal/server/v2/observation/calculation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package observation is for V2 observation API
package observation

import (
"context"

pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/server/cache"
"github.com/datacommonsorg/mixer/internal/store"
)

// CalculateObservationResponses returns a list of ObservationResponses based
// on StatisticalCalculation formulas for the missing data in an input
// ObservationResponse.
func CalculateObservationResponses(
ctx context.Context,
store *store.Store,
inputResp *pbv2.ObservationResponse,
cachedata *cache.Cache,
) []*pbv2.ObservationResponse {
calculatedResponses := []*pbv2.ObservationResponse{}
for variable, variableObservation := range inputResp.ByVariable {
formulas, ok := cachedata.SVFormula()[variable]
if !ok {
continue
}
for entity, entityObservation := range variableObservation.ByEntity {
// Response already contains data.
if len(entityObservation.OrderedFacets) != 0 {
continue
}
// Use first formula that returns data.
for _, formula := range formulas {
derivedSeries, err := DerivedSeries(
ctx,
store,
formula,
[]string{entity},
)
// Missing input data.
if err != nil {
continue
}
// Successful calculation.
bv := derivedSeries.ByVariable
if len(bv[formula].ByEntity[entity].OrderedFacets) > 0 {
// Re-label formula response with the outputProperty and delete
// formula.
bv[variable] = bv[formula]
delete(bv, formula)
calculatedResponses = append(calculatedResponses, derivedSeries)
continue
}
}
}
}
return calculatedResponses
}
Loading

0 comments on commit 85af98c

Please sign in to comment.