Skip to content

Commit e8a524c

Browse files
committed
feat: REST API to update entity twin data
1 parent e509ae6 commit e8a524c

File tree

5 files changed

+299
-11
lines changed

5 files changed

+299
-11
lines changed

crates/core/tedge_agent/src/entity_manager/server.rs

+50
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use async_trait::async_trait;
2+
use serde::Deserialize;
3+
use serde::Serialize;
4+
use serde_json::Map;
25
use serde_json::Value;
36
use tedge_actors::LoggingSender;
47
use tedge_actors::MessageSink;
@@ -7,6 +10,7 @@ use tedge_actors::Server;
710
use tedge_api::entity::EntityMetadata;
811
use tedge_api::entity_store;
912
use tedge_api::entity_store::EntityRegistrationMessage;
13+
use tedge_api::entity_store::EntityTwinMessage;
1014
use tedge_api::entity_store::ListFilters;
1115
use tedge_api::mqtt_topics::Channel;
1216
use tedge_api::mqtt_topics::EntityTopicId;
@@ -22,6 +26,7 @@ use tracing::error;
2226
pub enum EntityStoreRequest {
2327
Get(EntityTopicId),
2428
Create(EntityRegistrationMessage),
29+
Patch(EntityTwinData),
2530
Delete(EntityTopicId),
2631
List(ListFilters),
2732
MqttMessage(MqttMessage),
@@ -31,11 +36,40 @@ pub enum EntityStoreRequest {
3136
pub enum EntityStoreResponse {
3237
Get(Option<EntityMetadata>),
3338
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
39+
Patch(Result<(), entity_store::Error>),
3440
Delete(Vec<EntityMetadata>),
3541
List(Vec<EntityMetadata>),
3642
Ok,
3743
}
3844

45+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46+
pub struct EntityTwinData {
47+
pub topic_id: EntityTopicId,
48+
#[serde(flatten)]
49+
pub fragments: Map<String, Value>,
50+
}
51+
52+
impl EntityTwinData {
53+
pub fn try_new(
54+
topic_id: EntityTopicId,
55+
twin_data: Map<String, Value>,
56+
) -> Result<Self, InvalidTwinData> {
57+
for key in twin_data.keys() {
58+
if key.starts_with('@') {
59+
return Err(InvalidTwinData);
60+
}
61+
}
62+
Ok(Self {
63+
topic_id,
64+
fragments: twin_data,
65+
})
66+
}
67+
}
68+
69+
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
70+
#[error("Fragment keys starting with '@' are not allowed as twin data")]
71+
pub struct InvalidTwinData;
72+
3973
pub struct EntityStoreServer {
4074
entity_store: EntityStore,
4175
mqtt_schema: MqttSchema,
@@ -90,6 +124,10 @@ impl Server for EntityStoreServer {
90124
let res = self.register_entity(entity).await;
91125
EntityStoreResponse::Create(res)
92126
}
127+
EntityStoreRequest::Patch(twin_data) => {
128+
let res = self.patch_entity(twin_data).await;
129+
EntityStoreResponse::Patch(res)
130+
}
93131
EntityStoreRequest::Delete(topic_id) => {
94132
let deleted_entities = self.deregister_entity(topic_id).await;
95133
EntityStoreResponse::Delete(deleted_entities)
@@ -218,6 +256,18 @@ impl EntityStoreServer {
218256
Ok(registered)
219257
}
220258

259+
async fn patch_entity(&mut self, twin_data: EntityTwinData) -> Result<(), entity_store::Error> {
260+
for (fragment_key, fragment_value) in twin_data.fragments.into_iter() {
261+
self.entity_store.update_twin_data(EntityTwinMessage::new(
262+
twin_data.topic_id.clone(),
263+
fragment_key,
264+
fragment_value,
265+
))?;
266+
}
267+
268+
Ok(())
269+
}
270+
221271
async fn deregister_entity(&mut self, topic_id: EntityTopicId) -> Vec<EntityMetadata> {
222272
let deleted = self.entity_store.deregister_entity(&topic_id);
223273
for entity in deleted.iter() {

crates/core/tedge_agent/src/http_server/entity_store.rs

+157-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
use super::server::AgentState;
1212
use crate::entity_manager::server::EntityStoreRequest;
1313
use crate::entity_manager::server::EntityStoreResponse;
14+
use crate::entity_manager::server::EntityTwinData;
15+
use crate::entity_manager::server::InvalidTwinData;
1416
use axum::extract::Path;
1517
use axum::extract::Query;
1618
use axum::extract::State;
@@ -23,6 +25,8 @@ use axum::Router;
2325
use hyper::StatusCode;
2426
use serde::Deserialize;
2527
use serde_json::json;
28+
use serde_json::Map;
29+
use serde_json::Value;
2630
use std::str::FromStr;
2731
use tedge_api::entity::EntityMetadata;
2832
use tedge_api::entity::InvalidEntityType;
@@ -96,7 +100,7 @@ enum Error {
96100
#[error(transparent)]
97101
EntityStoreError(#[from] entity_store::Error),
98102

99-
#[error("Entity not found with topic id: {0}")]
103+
#[error("Entity with topic id: {0} not found")]
100104
EntityNotFound(EntityTopicId),
101105

102106
#[allow(clippy::enum_variant_names)]
@@ -108,6 +112,9 @@ enum Error {
108112

109113
#[error(transparent)]
110114
InvalidInput(#[from] InputValidationError),
115+
116+
#[error(transparent)]
117+
InvalidTwinData(#[from] InvalidTwinData),
111118
}
112119

113120
impl IntoResponse for Error {
@@ -123,6 +130,7 @@ impl IntoResponse for Error {
123130
Error::ChannelError(_) => StatusCode::INTERNAL_SERVER_ERROR,
124131
Error::InvalidEntityStoreResponse => StatusCode::INTERNAL_SERVER_ERROR,
125132
Error::InvalidInput(_) => StatusCode::BAD_REQUEST,
133+
Error::InvalidTwinData(_) => StatusCode::BAD_REQUEST,
126134
};
127135
let error_message = self.to_string();
128136

@@ -135,7 +143,9 @@ pub(crate) fn entity_store_router(state: AgentState) -> Router {
135143
.route("/v1/entities", post(register_entity).get(list_entities))
136144
.route(
137145
"/v1/entities/{*path}",
138-
get(get_entity).delete(deregister_entity),
146+
get(get_entity)
147+
.patch(patch_entity)
148+
.delete(deregister_entity),
139149
)
140150
.with_state(state)
141151
}
@@ -160,6 +170,29 @@ async fn register_entity(
160170
))
161171
}
162172

173+
async fn patch_entity(
174+
State(state): State<AgentState>,
175+
Path(path): Path<String>,
176+
Json(twin_fragments): Json<Map<String, Value>>,
177+
) -> impl IntoResponse {
178+
let topic_id = EntityTopicId::from_str(&path)?;
179+
let twin_data = EntityTwinData::try_new(topic_id, twin_fragments)?;
180+
181+
let response = state
182+
.entity_store_handle
183+
.clone()
184+
.await_response(EntityStoreRequest::Patch(twin_data))
185+
.await?;
186+
let EntityStoreResponse::Patch(res) = response else {
187+
return Err(Error::InvalidEntityStoreResponse);
188+
};
189+
res?;
190+
191+
let entity = get_entity(State(state), Path(path)).await?;
192+
193+
Ok(entity)
194+
}
195+
163196
async fn get_entity(
164197
State(state): State<AgentState>,
165198
Path(path): Path<String>,
@@ -328,7 +361,7 @@ mod tests {
328361
let entity: Value = serde_json::from_slice(&body).unwrap();
329362
assert_json_eq!(
330363
entity,
331-
json!( {"error":"Entity not found with topic id: device/test-child//"})
364+
json!({"error":"Entity with topic id: device/test-child// not found"})
332365
);
333366
}
334367

@@ -483,6 +516,127 @@ mod tests {
483516
);
484517
}
485518

519+
#[tokio::test]
520+
async fn entity_patch() {
521+
let TestHandle {
522+
mut app,
523+
mut entity_store_box,
524+
} = setup();
525+
526+
// Mock entity store actor response for patch
527+
tokio::spawn(async move {
528+
while let Some(mut req) = entity_store_box.recv().await {
529+
if let EntityStoreRequest::Patch(twin_data) = req.request {
530+
if twin_data.topic_id
531+
== EntityTopicId::default_child_device("test-child").unwrap()
532+
{
533+
req.reply_to
534+
.send(EntityStoreResponse::Patch(Ok(())))
535+
.await
536+
.unwrap();
537+
}
538+
} else if let EntityStoreRequest::Get(topic_id) = req.request {
539+
if topic_id == EntityTopicId::default_child_device("test-child").unwrap() {
540+
let mut entity =
541+
EntityMetadata::child_device("test-child".to_string()).unwrap();
542+
entity.twin_data.insert("foo".to_string(), json!("bar"));
543+
544+
req.reply_to
545+
.send(EntityStoreResponse::Get(Some(entity)))
546+
.await
547+
.unwrap();
548+
}
549+
}
550+
}
551+
});
552+
553+
let twin_payload = json!({"foo": "bar"}).to_string();
554+
555+
let req = Request::builder()
556+
.method(Method::PATCH)
557+
.uri("/v1/entities/device/test-child//")
558+
.header("Content-Type", "application/json")
559+
.body(Body::from(twin_payload))
560+
.expect("request builder");
561+
562+
let response = app.call(req).await.unwrap();
563+
assert_eq!(response.status(), StatusCode::OK);
564+
565+
let body = response.into_body().collect().await.unwrap().to_bytes();
566+
let entity: EntityMetadata = serde_json::from_slice(&body).unwrap();
567+
assert_eq!(entity.twin_data.get("foo"), Some(&json!("bar")));
568+
}
569+
570+
#[tokio::test]
571+
async fn entity_patch_invalid_key() {
572+
let TestHandle {
573+
mut app,
574+
entity_store_box: _, // Not used
575+
} = setup();
576+
577+
let req = Request::builder()
578+
.method(Method::PATCH)
579+
.uri("/v1/entities/device/test-child//")
580+
.header("Content-Type", "application/json")
581+
.body(Body::from(r#"{"@id": "new-id"}"#))
582+
.expect("request builder");
583+
584+
let response = app.call(req).await.unwrap();
585+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
586+
587+
let body = response.into_body().collect().await.unwrap().to_bytes();
588+
let entity: Value = serde_json::from_slice(&body).unwrap();
589+
assert_json_eq!(
590+
entity,
591+
json!({"error":"Fragment keys starting with '@' are not allowed as twin data"})
592+
);
593+
}
594+
595+
#[tokio::test]
596+
async fn patch_unknown_entity() {
597+
let TestHandle {
598+
mut app,
599+
mut entity_store_box,
600+
} = setup();
601+
602+
// Mock entity store actor response
603+
tokio::spawn(async move {
604+
if let Some(mut req) = entity_store_box.recv().await {
605+
if let EntityStoreRequest::Patch(twin_data) = req.request {
606+
if twin_data.topic_id
607+
== EntityTopicId::default_child_device("test-child").unwrap()
608+
{
609+
req.reply_to
610+
.send(EntityStoreResponse::Patch(Err(
611+
entity_store::Error::UnknownEntity(
612+
"device/test-child//".to_string(),
613+
),
614+
)))
615+
.await
616+
.unwrap();
617+
}
618+
}
619+
}
620+
});
621+
622+
let req = Request::builder()
623+
.method(Method::PATCH)
624+
.uri("/v1/entities/device/test-child//")
625+
.header("Content-Type", "application/json")
626+
.body(Body::from(r#"{"foo": "bar"}"#))
627+
.expect("request builder");
628+
629+
let response = app.call(req).await.unwrap();
630+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
631+
632+
let body = response.into_body().collect().await.unwrap().to_bytes();
633+
let entity: Value = serde_json::from_slice(&body).unwrap();
634+
assert_json_eq!(
635+
entity,
636+
json!({"error":"The specified entity: device/test-child// does not exist in the store"})
637+
);
638+
}
639+
486640
#[tokio::test]
487641
async fn entity_delete() {
488642
let TestHandle {

crates/core/tedge_api/src/entity_store.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ pub enum Error {
733733
#[error("An entity with topic id: {0} is already registered")]
734734
EntityAlreadyRegistered(EntityTopicId),
735735

736-
#[error("The specified entity {0} does not exist in the store")]
736+
#[error("The specified entity: {0} does not exist in the store")]
737737
UnknownEntity(String),
738738

739739
#[error("Auto registration of the entity with topic id {0} failed as it does not match the default topic scheme: 'device/<device-id>/service/<service-id>'. Try explicit registration instead.")]

0 commit comments

Comments
 (0)