@@ -127,15 +127,15 @@ def create_opensearch_policies(bedrock_role_arn: str, collection_name: str, poli
127
127
128
128
def _create_security_policy (name , policy_type , policy ):
129
129
try :
130
- aoss_client .create_security_policy (name = name , policy = json .dumps (policy ), type = policy_type )
130
+ aoss_client .conn . create_security_policy (name = name , policy = json .dumps (policy ), type = policy_type )
131
131
except ClientError as e :
132
132
if e .response ["Error" ]["Code" ] == "ConflictException" :
133
133
log .info ("OpenSearch security policy %s already exists." , name )
134
134
raise
135
135
136
136
def _create_access_policy (name , policy_type , policy ):
137
137
try :
138
- aoss_client .create_access_policy (name = name , policy = json .dumps (policy ), type = policy_type )
138
+ aoss_client .conn . create_access_policy (name = name , policy = json .dumps (policy ), type = policy_type )
139
139
except ClientError as e :
140
140
if e .response ["Error" ]["Code" ] == "ConflictException" :
141
141
log .info ("OpenSearch data access policy %s already exists." , name )
@@ -204,9 +204,9 @@ def create_collection(collection_name: str):
204
204
:param collection_name: The name of the Collection to create.
205
205
"""
206
206
log .info ("\n Creating collection: %s." , collection_name )
207
- return aoss_client .create_collection (name = collection_name , type = "VECTORSEARCH" )[ "createCollectionDetail" ] [
208
- "id "
209
- ]
207
+ return aoss_client .conn . create_collection (name = collection_name , type = "VECTORSEARCH" )[
208
+ "createCollectionDetail "
209
+ ][ "id" ]
210
210
211
211
212
212
@task
@@ -317,7 +317,7 @@ def get_collection_arn(collection_id: str):
317
317
"""
318
318
return next (
319
319
colxn ["arn" ]
320
- for colxn in aoss_client .list_collections ()["collectionSummaries" ]
320
+ for colxn in aoss_client .conn . list_collections ()["collectionSummaries" ]
321
321
if colxn ["id" ] == collection_id
322
322
)
323
323
@@ -336,7 +336,9 @@ def delete_data_source(knowledge_base_id: str, data_source_id: str):
336
336
:param data_source_id: The unique identifier of the data source to delete.
337
337
"""
338
338
log .info ("Deleting data source %s from Knowledge Base %s." , data_source_id , knowledge_base_id )
339
- bedrock_agent_client .delete_data_source (dataSourceId = data_source_id , knowledgeBaseId = knowledge_base_id )
339
+ bedrock_agent_client .conn .delete_data_source (
340
+ dataSourceId = data_source_id , knowledgeBaseId = knowledge_base_id
341
+ )
340
342
341
343
342
344
# [END howto_operator_bedrock_delete_data_source]
@@ -355,7 +357,7 @@ def delete_knowledge_base(knowledge_base_id: str):
355
357
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
356
358
"""
357
359
log .info ("Deleting Knowledge Base %s." , knowledge_base_id )
358
- bedrock_agent_client .delete_knowledge_base (knowledgeBaseId = knowledge_base_id )
360
+ bedrock_agent_client .conn . delete_knowledge_base (knowledgeBaseId = knowledge_base_id )
359
361
360
362
361
363
# [END howto_operator_bedrock_delete_knowledge_base]
@@ -393,7 +395,7 @@ def delete_collection(collection_id: str):
393
395
:param collection_id: ID of the collection to be indexed.
394
396
"""
395
397
log .info ("Deleting collection %s." , collection_id )
396
- aoss_client .delete_collection (id = collection_id )
398
+ aoss_client .conn . delete_collection (id = collection_id )
397
399
398
400
399
401
@task (trigger_rule = TriggerRule .ALL_DONE )
@@ -404,26 +406,26 @@ def delete_opensearch_policies(collection_name: str):
404
406
:param collection_name: All policies in the given collection name will be deleted.
405
407
"""
406
408
407
- access_policies = aoss_client .list_access_policies (
409
+ access_policies = aoss_client .conn . list_access_policies (
408
410
type = "data" , resource = [f"collection/{ collection_name } " ]
409
411
)["accessPolicySummaries" ]
410
412
log .info ("Found access policies for %s: %s" , collection_name , access_policies )
411
413
if not access_policies :
412
414
raise Exception ("No access policies found?" )
413
415
for policy in access_policies :
414
416
log .info ("Deleting access policy for %s: %s" , collection_name , policy ["name" ])
415
- aoss_client .delete_access_policy (name = policy ["name" ], type = "data" )
417
+ aoss_client .conn . delete_access_policy (name = policy ["name" ], type = "data" )
416
418
417
419
for policy_type in ["encryption" , "network" ]:
418
- policies = aoss_client .list_security_policies (
420
+ policies = aoss_client .conn . list_security_policies (
419
421
type = policy_type , resource = [f"collection/{ collection_name } " ]
420
422
)["securityPolicySummaries" ]
421
423
if not policies :
422
424
raise Exception ("No security policies found?" )
423
425
log .info ("Found %s security policies for %s: %s" , policy_type , collection_name , policies )
424
426
for policy in policies :
425
427
log .info ("Deleting %s security policy for %s: %s" , policy_type , collection_name , policy ["name" ])
426
- aoss_client .delete_security_policy (name = policy ["name" ], type = policy_type )
428
+ aoss_client .conn . delete_security_policy (name = policy ["name" ], type = policy_type )
427
429
428
430
429
431
with DAG (
@@ -436,8 +438,8 @@ def delete_opensearch_policies(collection_name: str):
436
438
test_context = sys_test_context_task ()
437
439
env_id = test_context ["ENV_ID" ]
438
440
439
- aoss_client = OpenSearchServerlessHook (aws_conn_id = None ). conn
440
- bedrock_agent_client = BedrockAgentHook (aws_conn_id = None ). conn
441
+ aoss_client = OpenSearchServerlessHook (aws_conn_id = None )
442
+ bedrock_agent_client = BedrockAgentHook (aws_conn_id = None )
441
443
442
444
region_name = boto3 .session .Session ().region_name
443
445
0 commit comments