Skip to content

Commit

Permalink
[release-4.16] OCPBUGS-42441: Add O-RAN v3.0 Compliant API to PTP Eve…
Browse files Browse the repository at this point in the history
…nt Producer (#359)

* add api-version

Signed-off-by: Jack Ding <[email protected]>

* O-RAN V3 Rest Api: Event Subscription

Signed-off-by: Jack Ding <[email protected]>

* O-RAN V3 Rest Api: Status Notification

Signed-off-by: Jack Ding <[email protected]>

* Set Event Source according to O-RAN Spec

Signed-off-by: Jack Ding <[email protected]>

* implement /sync/sync-status/sync-state

Signed-off-by: Jack Ding <[email protected]>

* trigger overall sync-state events

trigger overall sync-state event when either ptp-state-change
or os-clock-sync-state-change events are triggered.

Signed-off-by: Jack Ding <[email protected]>

* fix subscription recovery from reboot

Signed-off-by: Jack Ding <[email protected]>

* v2 consumer app manifests cleanup

Signed-off-by: Jack Ding <[email protected]>

* Fixe both v1 and v2 cleanup: reset fail count when a event is successfully delivered.

Also added API support for deleting all subscriptions:
DELETE /api/ocloudNotifications/v2/subscriptions

Signed-off-by: Jack Ding <[email protected]>

* update sdk-go and rest-api to v1.21.0

Signed-off-by: Jack Ding <[email protected]>

* cleanup configmap when event apiVersion change

Signed-off-by: Jack Ding <[email protected]>

---------

Signed-off-by: Jack Ding <[email protected]>
  • Loading branch information
jzding authored Oct 17, 2024
1 parent e89fe49 commit 614576c
Show file tree
Hide file tree
Showing 53 changed files with 2,416 additions and 226 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,18 @@ deploy-consumer:kustomize
cd ./examples/manifests && $(KUSTOMIZE) edit set image cloud-event-sidecar=${IMG} && $(KUSTOMIZE) edit set image cloud-event-consumer=${CONSUMER_IMG}
$(KUSTOMIZE) build ./examples/manifests | kubectl apply -f -

# Deploy all in the configured Kubernetes cluster in ~/.kube/config
undeploy-consumer:kustomize
cd ./examples/manifests && $(KUSTOMIZE) edit set image cloud-event-sidecar=${IMG} && $(KUSTOMIZE) edit set image cloud-event-consumer=${CONSUMER_IMG}
$(KUSTOMIZE) build ./examples/manifests | kubectl delete -f -

deploy-consumer-v2:kustomize
cd ./examples/manifests/v2 && $(KUSTOMIZE) edit set image cloud-event-consumer=${CONSUMER_IMG}
$(KUSTOMIZE) build ./examples/manifests/v2 | kubectl apply -f -

undeploy-consumer-v2:kustomize
cd ./examples/manifests/v2 && $(KUSTOMIZE) edit set image cloud-event-consumer=${CONSUMER_IMG}
$(KUSTOMIZE) build ./examples/manifests/v2 | kubectl delete -f -

# For GitHub Actions CI
gha:
mkdir -p $(GOPATH)/src/github.com/redhat-cne/cloud-event-proxy
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func main(){
pubSubInstance = v1pubsub.GetAPIInstance(".")
endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:9085", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
// create publisher
pub, err := pubSubInstance.CreatePublisher(v1pubsub.NewPubSub(endpointURL, "test/test"))
pub, err := pubSubInstance.CreatePublisher(v1pubsub.NewPubSub(endpointURL, "test/test", "1.0"))
}
```
Expand Down Expand Up @@ -177,7 +177,7 @@ func main(){
pubSubInstance = v1pubsub.GetAPIInstance(".")
endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:8089", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
// create subscription
pub, err := pubSubInstance.CreateSubscription(v1pubsub.NewPubSub(endpointURL, "test/test"))
pub, err := pubSubInstance.CreateSubscription(v1pubsub.NewPubSub(endpointURL, "test/test", "1.0"))
}
Expand Down
120 changes: 101 additions & 19 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
"net/http"
"os"
"os/signal"
Expand All @@ -28,6 +30,8 @@ import (
"syscall"
"time"

"github.com/google/uuid"

"github.com/redhat-cne/sdk-go/pkg/subscriber"
v1pubs "github.com/redhat-cne/sdk-go/v1/pubsub"

Expand All @@ -52,22 +56,27 @@ import (
sdkMetrics "github.com/redhat-cne/sdk-go/pkg/localmetrics"
v1event "github.com/redhat-cne/sdk-go/v1/event"
v1http "github.com/redhat-cne/sdk-go/v1/http"
subscriberApi "github.com/redhat-cne/sdk-go/v1/subscriber"
)

var (
// defaults
storePath string
transportHost string
apiPort int
apiVersion string
channelBufferSize = 100
statusChannelBufferSize = 50
scConfig *common.SCConfiguration
metricsAddr string
apiPath = "/api/ocloudNotifications/v1/"
httpEventPublisher string
pluginHandler plugins.Handler
nodeName string
namespace string
// Event API Version 2 is O-RAN V3.0 Compliant
apiPathV2 = "/api/ocloudNotifications/v2/"
httpEventPublisher string
pluginHandler plugins.Handler
nodeName string
namespace string
isV1Api bool
)

func main() {
Expand All @@ -77,6 +86,7 @@ func main() {
flag.StringVar(&storePath, "store-path", ".", "The path to store publisher and subscription info.")
flag.StringVar(&transportHost, "transport-host", "http://ptp-event-publisher-service-NODE_NAME.openshift-ptp.svc.cluster.local:9043", "The transport bus hostname or service name.")
flag.IntVar(&apiPort, "api-port", 8089, "The address the rest api endpoint binds to.")
flag.StringVar(&apiVersion, "api-version", "1.0", "The address the rest api endpoint binds to.")
flag.StringVar(&httpEventPublisher, "http-event-publishers", "", "Comma separated address of the publishers available.")

flag.Parse()
Expand Down Expand Up @@ -110,8 +120,10 @@ func main() {
CloseCh: make(chan struct{}),
APIPort: apiPort,
APIPath: apiPath,
APIVersion: apiVersion,
StorePath: storePath,
PubSubAPI: v1pubs.GetAPIInstance(storePath),
SubscriberAPI: subscriberApi.GetAPIInstance(storePath),
BaseURL: nil,
TransportHost: parsedTransportHost,
StorageType: storageClient.EmptyDir,
Expand All @@ -127,7 +139,7 @@ func main() {
}
if namespace != "" && nodeName != "" && scConfig.TransportHost.Type == common.HTTP {
// if consumer doesn't pass namespace then this will default to empty dir
if e := client.InitConfigMap(scConfig.StorePath, nodeName, namespace); e != nil {
if e := client.InitConfigMap(scConfig.APIVersion, scConfig.StorePath, nodeName, namespace); e != nil {
log.Errorf("failed to initlialize configmap, subcription will be stored in empty dir %s", e.Error())
} else {
scConfig.StorageType = storageClient.ConfigMap
Expand All @@ -146,21 +158,33 @@ func main() {
}()

pluginHandler = plugins.Handler{Path: "./plugins"}
isV1Api = common.IsV1Api(scConfig.APIVersion)
// make PubSub REST API accessible by event service ptp-event-publisher-service-NODE_NAME
if !isV1Api {
// switch between Internal API port and PubSub port
tmpPort := scConfig.APIPort
scConfig.APIPort = scConfig.TransportHost.Port
scConfig.TransportHost.Port = tmpPort
scConfig.APIPath = apiPathV2
log.Infof("v2 rest api set scConfig.APIPort=%d, scConfig.APIPath=%s, scConfig.TransportHost.Port=%d", scConfig.APIPort, scConfig.APIPath, scConfig.TransportHost.Port)
}

var transportEnabled bool
if scConfig.TransportHost.Type == common.HTTP {
if scConfig.TransportHost.Type == common.HTTP && isV1Api {
transportEnabled = enableHTTPTransport(&wg, eventPublishers)
} else {
transportEnabled = false
}

// if all transport types failed then process internally
if !transportEnabled {
if !transportEnabled && isV1Api {
log.Errorf("No transport is enabled for sending events %s", scConfig.TransportHost.String())
wg.Add(1)
go ProcessInChannel(&wg, scConfig)
}

/* Enable pub/sub services */
_, err = common.StartPubSubService(scConfig)
err = common.StartPubSubService(scConfig)
if err != nil {
log.Fatal("pub/sub service API failed to start.")
}
Expand All @@ -179,6 +203,7 @@ func main() {
log.Fatalf("error loading mock plugin %v", err)
}
}

// process data that are coming from api server requests
ProcessOutChannel(&wg, scConfig)
}
Expand Down Expand Up @@ -210,7 +235,7 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
if pub.EndPointURI != nil {
log.Debugf("posting acknowledgment with status: %s to publisher: %s", status, pub.EndPointURI)
restClient := restclient.New()
_ = restClient.Post(pub.EndPointURI,
restClient.Post(pub.EndPointURI,
[]byte(fmt.Sprintf(`{eventId:"%s",status:"%s"}`, pub.ID, status)))
}
}
Expand Down Expand Up @@ -244,6 +269,7 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.FAILED)
}
} else if sub, ok := scConfig.PubSubAPI.HasSubscription(d.Address); ok {
// V1 only
if sub.EndPointURI != nil {
restClient := restclient.New()
event.ID = sub.ID // set ID to the subscriptionID
Expand All @@ -254,8 +280,60 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.SUCCESS)
}
} else {
log.Warnf("subscription not found, posting event %#v to log for address %s\n", event, d.Address)
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.FAILED)
// V2
eventSubscribers := scConfig.SubscriberAPI.GetClientIDAddressByResource(d.Address)
if len(eventSubscribers) != 0 {
restClient := restclient.New()
for clientID, endPointURI := range eventSubscribers {
if endPointURI != nil {
log.Infof("post events %s to subscriber %s", d.Address, endPointURI)
// make sure event ID is unique
event.ID = uuid.New().String()
var status, numSubDeleted int
status, err = restClient.PostCloudEvent(endPointURI, *d.Data)
if err != nil {
log.Errorf("error posting event at %s : %s", endPointURI, err)
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.FAILED)

// POST return DNSError if server is not reachable
var dnsError *net.DNSError
// Capture DNS error "lookup consumer-events-subscription-service.cloud-events.svc.cluster.local: no such host"
// or timeout error "context deadline exceeded (Client.Timeout exceeded while awaiting headers)"
// or connection refused error "dial tcp <ip>:9043: connect: connection refused"
if errors.As(err, &dnsError) || os.IsTimeout(err) || errors.Is(err, syscall.ECONNREFUSED) {
// has subscriber failed to connect for 10 times delete the subscribers
if scConfig.SubscriberAPI.IncFailCountToFail(clientID) {
log.Errorf("connection lost for address %s, proceed to clean up subscription", d.Address)
if numSubDeleted, err = scConfig.SubscriberAPI.DeleteAllSubscriptionsForClient(clientID); err != nil {
log.Errorf("failed to delete all subscriptions for client %s: %s", clientID.String(), err.Error())
} else {
cleanupConfigMap(d.ClientID)
}
apiMetrics.UpdateSubscriptionCount(apiMetrics.ACTIVE, -(numSubDeleted))
} else {
log.Errorf("client %s not responding, waiting %d times before marking to delete subscriber",
d.Address, scConfig.SubscriberAPI.FailCountThreshold()-scConfig.SubscriberAPI.GetFailCount(clientID))
}
}
} else {
scConfig.SubscriberAPI.ResetFailCount(clientID)
if status == http.StatusNoContent {
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.SUCCESS)
} else {
log.Errorf("posting event at %s returned invalid status %d", endPointURI, status)
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.FAILED)
}
}
} else {
// this should not happen
log.Errorf("endPointURI is nil for ResourceAddress %s clientID %s", d.Address, clientID)
continue
}
}
} else {
log.Warnf("subscription not found, posting event to log for address %s", d.Address)
localmetrics.UpdateEventReceivedCount(d.Address, localmetrics.FAILED)
}
}
} else if d.Status == channel.SUCCESS || d.Status == channel.FAILED { // event sent ,ack back to publisher
postProcessFn(d.Address, d.Status)
Expand Down Expand Up @@ -284,14 +362,7 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
log.Infof("subscriber saved in configmap %s", obj.String())
}
} else if d.Status == channel.DELETE {
var obj subscriber.Subscriber
obj.Action = channel.DELETE
obj.ClientID = d.ClientID
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to delete subscription in configmap %s", err.Error())
} else {
log.Infof("deleted subscription %s ", obj.ClientID.String())
}
cleanupConfigMap(d.ClientID)
}
}
case <-scConfig.CloseCh:
Expand Down Expand Up @@ -421,3 +492,14 @@ func updateHTTPPublishers(nodeIP, nodeName string, addr ...string) (httpPublishe
}
return httpPublishers
}

func cleanupConfigMap(clientID uuid.UUID) {
var obj subscriber.Subscriber
obj.Action = channel.DELETE
obj.ClientID = clientID
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to delete subscription in configmap %s", err.Error())
} else {
log.Infof("deleted subscription %s ", obj.ClientID.String())
}
}
6 changes: 3 additions & 3 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestSidecar_MainWithHTTP(t *testing.T) {
log.Infof("Configuration set to %#v", scConfig)

//start rest service
_, err := common.StartPubSubService(scConfig)
err := common.StartPubSubService(scConfig)
assert.Nil(t, err)

// imitate main process
Expand All @@ -78,7 +78,7 @@ func TestSidecar_MainWithHTTP(t *testing.T) {
//create publisher
// this is loopback on server itself. Since current pod does not create any server
endpointURL := fmt.Sprintf("%s%s", scConfig.BaseURL, "dummy")
createPub := v1pubsub.NewPubSub(types.ParseURI(endpointURL), resourceAddress)
createPub := v1pubsub.NewPubSub(types.ParseURI(endpointURL), resourceAddress, scConfig.APIVersion)
pub, err := common.CreatePublisher(scConfig, createPub)
assert.Nil(t, err)
assert.NotEmpty(t, pub.ID)
Expand All @@ -88,7 +88,7 @@ func TestSidecar_MainWithHTTP(t *testing.T) {
log.Infof("Publisher \n%s:", pub.String())

//Test subscription
createSub := v1pubsub.NewPubSub(types.ParseURI(endpointURL), resourceAddress)
createSub := v1pubsub.NewPubSub(types.ParseURI(endpointURL), resourceAddress, scConfig.APIVersion)
sub, err := common.CreateSubscription(scConfig, createSub)
assert.Nil(t, err)
assert.NotEmpty(t, sub.ID)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY . .

RUN hack/build-example-go.sh

FROM registry.ci.openshift.org/ocp/4.17:base-rhel9 AS bin
FROM --platform=linux/x86_64 registry.ci.openshift.org/ocp/4.17:base-rhel9 AS bin
COPY --from=builder /go/src/github.com/redhat-cne/cloud-event-proxy/build/cloud-event-consumer /

LABEL io.k8s.display-name="Cloud Event Proxy Sample Consumer" \
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY . .

RUN hack/build-example-go.sh

FROM quay.io/redhat-cne/openshift-origin-release:base-rhel9-4.17 AS bin
FROM --platform=linux/x86_64 quay.io/redhat-cne/openshift-origin-release:base-rhel9-4.17 AS bin
COPY --from=builder /go/src/github.com/redhat-cne/cloud-event-proxy/build/cloud-event-consumer /

LABEL io.k8s.display-name="Cloud Event Proxy Sample Consumer" \
Expand Down
Loading

0 comments on commit 614576c

Please sign in to comment.