1
1
use crate :: node:: util:: initialize_default_node;
2
- use crate :: tcp:: inlet:: create:: CreateCommand as InletCreateCommand ;
2
+ use crate :: shared_args:: OptionalTimeoutArg ;
3
+ use crate :: tcp:: inlet:: create:: { tcp_inlet_default_from_addr, tcp_inlet_default_to_addr} ;
4
+ use crate :: tcp:: util:: alias_parser;
5
+ use crate :: util:: parsers:: duration_parser;
6
+ use crate :: util:: parsers:: hostname_parser;
7
+ use crate :: util:: { port_is_free_guard, print_warning_for_deprecated_flag_replaced} ;
3
8
use crate :: { Command , CommandGlobalOpts } ;
4
9
use async_trait:: async_trait;
10
+ use clap:: builder:: FalseyValueParser ;
5
11
use clap:: Args ;
6
12
use colorful:: Colorful ;
7
- use miette:: miette;
13
+ use miette:: { miette, IntoDiagnostic } ;
14
+ use ockam:: identity:: Identifier ;
15
+ use ockam:: transport:: HostnamePort ;
8
16
use ockam:: Context ;
17
+ use ockam_abac:: PolicyExpression ;
18
+ use ockam_api:: address:: extract_address_value;
19
+ use ockam_api:: cli_state:: random_name;
9
20
use ockam_api:: colors:: color_primary;
10
21
use ockam_api:: influxdb:: { InfluxDBPortals , LeaseUsage } ;
11
22
use ockam_api:: nodes:: models:: portal:: InletStatus ;
12
23
use ockam_api:: nodes:: BackgroundNodeClient ;
13
- use ockam_api:: { fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus } ;
24
+ use ockam_api:: { fmt_info, fmt_log, fmt_ok, fmt_warn, CliState , ConnectionStatus } ;
14
25
use ockam_core:: api:: { Reply , Status } ;
15
- use ockam_multiaddr:: MultiAddr ;
26
+ use ockam_multiaddr:: { proto, MultiAddr , Protocol } ;
27
+ use ockam_node:: compat:: asynchronous:: resolve_peer;
28
+ use std:: str:: FromStr ;
29
+ use std:: time:: Duration ;
16
30
use tracing:: trace;
17
31
18
32
/// Create InfluxDB Inlets
19
33
#[ derive( Clone , Debug , Args ) ]
20
- pub struct InfluxDBCreateCommand {
34
+ pub struct CreateCommand {
35
+ /// Assign a name to this InfluxDB Inlet
36
+ #[ arg( id = "NAME" , value_parser = alias_parser) ]
37
+ pub name : Option < String > ,
38
+
39
+ /// Node on which to start the InfluxDB Inlet.
40
+ #[ arg( long, display_order = 900 , id = "NODE_NAME" , value_parser = extract_address_value) ]
41
+ pub at : Option < String > ,
42
+
43
+ /// Address on which to accept InfluxDB connections.
44
+ #[ arg( long, display_order = 900 , id = "SOCKET_ADDRESS" , hide_default_value = true , default_value_t = tcp_inlet_default_from_addr( ) , value_parser = hostname_parser) ]
45
+ pub from : HostnamePort ,
46
+
47
+ /// Route to a InfluxDB Outlet or the name of the InfluxDB Outlet service you want to connect to.
48
+ ///
49
+ /// If you are connecting to a local node, you can provide the route as `/node/n/service/outlet`.
50
+ ///
51
+ /// If you are connecting to a remote node through a relay in the Orchestrator you can either
52
+ /// provide the full route to the InfluxDB Outlet as `/project/myproject/service/forward_to_myrelay/secure/api/service/outlet`,
53
+ /// or just the name of the service as `outlet` or `/service/outlet`.
54
+ /// If you are passing just the service name, consider using `--via` to specify the
55
+ /// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`).
56
+ #[ arg( long, display_order = 900 , id = "ROUTE" , default_value_t = tcp_inlet_default_to_addr( ) ) ]
57
+ pub to : String ,
58
+
59
+ /// Name of the relay that this InfluxDB Inlet will use to connect to the InfluxDB Outlet.
60
+ ///
61
+ /// Use this flag when you are using `--to` to specify the service name of a InfluxDB Outlet
62
+ /// that is reachable through a relay in the Orchestrator.
63
+ /// If you don't provide it, the default relay name will be used, if necessary.
64
+ #[ arg( long, display_order = 900 , id = "RELAY_NAME" ) ]
65
+ pub via : Option < String > ,
66
+
67
+ /// Identity to be used to create the secure channel. If not set, the node's identity will be used.
68
+ #[ arg( long, value_name = "IDENTITY_NAME" , display_order = 900 ) ]
69
+ pub identity : Option < String > ,
70
+
71
+ /// Authorized identifier for secure channel connection
72
+ #[ arg( long, name = "AUTHORIZED" , display_order = 900 ) ]
73
+ pub authorized : Option < Identifier > ,
74
+
75
+ /// [DEPRECATED] Use the <NAME> positional argument instead
76
+ #[ arg( long, display_order = 900 , id = "ALIAS" , value_parser = alias_parser) ]
77
+ pub alias : Option < String > ,
78
+
79
+ /// Policy expression that will be used for access control to the InfluxDB Inlet.
80
+ /// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used.
81
+ ///
82
+ /// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`.
83
+ #[ arg(
84
+ long,
85
+ visible_alias = "expression" ,
86
+ display_order = 900 ,
87
+ id = "POLICY_EXPRESSION"
88
+ ) ]
89
+ pub allow : Option < PolicyExpression > ,
90
+
91
+ /// Time to wait for the outlet to be available.
92
+ #[ arg( long, display_order = 900 , id = "WAIT" , default_value = "5s" , value_parser = duration_parser) ]
93
+ pub connection_wait : Duration ,
94
+
95
+ /// Time to wait before retrying to connect to the InfluxDB Outlet.
96
+ #[ arg( long, display_order = 900 , id = "RETRY" , default_value = "20s" , value_parser = duration_parser) ]
97
+ pub retry_wait : Duration ,
98
+
21
99
#[ command( flatten) ]
22
- pub tcp_inlet : InletCreateCommand ,
100
+ pub timeout : OptionalTimeoutArg ,
101
+
102
+ /// Create the InfluxDB Inlet without waiting for the InfluxDB Outlet to connect
103
+ #[ arg( long, default_value = "false" ) ]
104
+ pub no_connection_wait : bool ,
105
+
106
+ /// Enable UDP NAT puncture.
107
+ #[ arg(
108
+ long,
109
+ visible_alias = "enable-udp-puncture" ,
110
+ value_name = "BOOL" ,
111
+ default_value_t = false ,
112
+ hide = true
113
+ ) ]
114
+ pub udp : bool ,
115
+
116
+ /// Disable fallback to TCP.
117
+ /// TCP won't be used to transfer data between the Inlet and the Outlet.
118
+ #[ arg(
119
+ long,
120
+ visible_alias = "disable-tcp-fallback" ,
121
+ value_name = "BOOL" ,
122
+ default_value_t = false ,
123
+ hide = true
124
+ ) ]
125
+ pub no_tcp_fallback : bool ,
126
+
127
+ /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
128
+ /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`.
129
+ #[ arg( long, env = "OCKAM_PRIVILEGED" , value_parser = FalseyValueParser :: default ( ) , hide = true ) ]
130
+ pub privileged : bool ,
131
+
132
+ #[ arg( long, value_name = "BOOL" , default_value_t = false , hide = true ) ]
133
+ /// Enable TLS for the InfluxDB Inlet.
134
+ /// Uses the default project TLS certificate provider, `/project/default/service/tls_certificate_provider`.
135
+ /// To specify a different certificate provider, use `--tls-certificate-provider`.
136
+ /// Requires `ockam-tls-certificate` credential attribute.
137
+ pub tls : bool ,
138
+
139
+ #[ arg( long, value_name = "ROUTE" , hide = true ) ]
140
+ /// Enable TLS for the InfluxDB Inlet using the provided certificate provider.
141
+ /// Requires `ockam-tls-certificate` credential attribute.
142
+ pub tls_certificate_provider : Option < MultiAddr > ,
23
143
24
144
/// Share the leases among the clients or use a separate lease for each client
25
145
#[ arg( long, default_value = "per-client" ) ]
@@ -33,48 +153,44 @@ pub struct InfluxDBCreateCommand {
33
153
}
34
154
35
155
#[ async_trait]
36
- impl Command for InfluxDBCreateCommand {
156
+ impl Command for CreateCommand {
37
157
const NAME : & ' static str = "influxdb-inlet create" ;
38
158
39
159
async fn async_run ( mut self , ctx : & Context , opts : CommandGlobalOpts ) -> crate :: Result < ( ) > {
40
160
initialize_default_node ( ctx, & opts) . await ?;
41
- self = self . parse_args ( & opts) . await ?;
161
+ let cmd = self . parse_args ( & opts) . await ?;
42
162
43
- let mut node = BackgroundNodeClient :: create ( ctx, & opts. state , & self . tcp_inlet . at ) . await ?;
44
- self . tcp_inlet
45
- . timeout
46
- . timeout
47
- . map ( |t| node. set_timeout_mut ( t) ) ;
163
+ let mut node = BackgroundNodeClient :: create ( ctx, & opts. state , & cmd. at ) . await ?;
164
+ cmd. timeout . timeout . map ( |t| node. set_timeout_mut ( t) ) ;
48
165
49
166
let inlet_status = {
50
167
let pb = opts. terminal . progress_bar ( ) ;
51
168
if let Some ( pb) = pb. as_ref ( ) {
52
169
pb. set_message ( format ! (
53
170
"Creating a InfluxDB Inlet at {}...\n " ,
54
- color_primary( & self . tcp_inlet . from)
171
+ color_primary( & cmd . from)
55
172
) ) ;
56
173
}
57
174
58
175
loop {
59
176
let result: Reply < InletStatus > = node
60
177
. create_influxdb_inlet (
61
178
ctx,
62
- & self . tcp_inlet . from ,
63
- & self . tcp_inlet . to ( ) ,
64
- self . tcp_inlet . alias . as_ref ( ) . expect ( "The `alias` argument should be set to its default value if not provided" ) ,
65
- & self . tcp_inlet . authorized ,
66
- & self . tcp_inlet . allow ,
67
- self . tcp_inlet . connection_wait ,
68
- !self . tcp_inlet . no_connection_wait ,
69
- & self
70
- . tcp_inlet
179
+ & cmd. from ,
180
+ & cmd. to ( ) ,
181
+ cmd. name . as_ref ( ) . expect ( "The `name` argument should be set to its default value if not provided" ) ,
182
+ & cmd. authorized ,
183
+ & cmd. allow ,
184
+ cmd. connection_wait ,
185
+ !cmd. no_connection_wait ,
186
+ & cmd
71
187
. secure_channel_identifier ( & opts. state )
72
188
. await ?,
73
- self . tcp_inlet . udp ,
74
- self . tcp_inlet . no_tcp_fallback ,
75
- & self . tcp_inlet . tls_certificate_provider ,
76
- self . leased_token_strategy . clone ( ) ,
77
- self . lease_manager_route . clone ( ) ,
189
+ cmd . udp ,
190
+ cmd . no_tcp_fallback ,
191
+ & cmd . tls_certificate_provider ,
192
+ cmd . leased_token_strategy . clone ( ) ,
193
+ cmd . lease_manager_route . clone ( ) ,
78
194
)
79
195
. await ?;
80
196
@@ -90,49 +206,46 @@ impl Command for InfluxDBCreateCommand {
90
206
} ;
91
207
trace ! ( "the inlet creation returned a non-OK status: {s:?}" ) ;
92
208
93
- if self . tcp_inlet . retry_wait . as_millis ( ) == 0 {
94
- return Err ( miette ! ( "Failed to create TCP inlet" ) ) ?;
209
+ if cmd . retry_wait . as_millis ( ) == 0 {
210
+ return Err ( miette ! ( "Failed to create InfluxDB inlet" ) ) ?;
95
211
}
96
212
97
213
if let Some ( pb) = pb. as_ref ( ) {
98
214
pb. set_message ( format ! (
99
- "Waiting for TCP Inlet {} to be available... Retrying momentarily\n " ,
100
- color_primary( & self . tcp_inlet . to)
215
+ "Waiting for InfluxDB Inlet {} to be available... Retrying momentarily\n " ,
216
+ color_primary( & cmd . to)
101
217
) ) ;
102
218
}
103
- tokio:: time:: sleep ( self . tcp_inlet . retry_wait ) . await
219
+ tokio:: time:: sleep ( cmd . retry_wait ) . await
104
220
}
105
221
}
106
222
}
107
223
} ;
108
224
109
225
let node_name = node. node_name ( ) ;
110
- self . tcp_inlet
111
- . add_inlet_created_event ( & opts, & node_name, & inlet_status)
112
- . await ?;
113
226
114
227
let created_message = fmt_ok ! (
115
228
"Created a new InfluxDB Inlet in the Node {} bound to {}\n " ,
116
229
color_primary( & node_name) ,
117
- color_primary( & self . tcp_inlet . from)
230
+ color_primary( & cmd . from)
118
231
) ;
119
232
120
- let plain = if self . tcp_inlet . no_connection_wait {
121
- created_message + & fmt_log ! ( "It will automatically connect to the TCP Outlet at {} as soon as it is available" ,
122
- color_primary( & self . tcp_inlet . to)
233
+ let plain = if cmd . no_connection_wait {
234
+ created_message + & fmt_log ! ( "It will automatically connect to the InfluxDB Outlet at {} as soon as it is available" ,
235
+ color_primary( & cmd . to)
123
236
)
124
237
} else if inlet_status. status == ConnectionStatus :: Up {
125
238
created_message
126
239
+ & fmt_log ! (
127
- "sending traffic to the TCP Outlet at {}" ,
128
- color_primary( & self . tcp_inlet . to)
240
+ "sending traffic to the InfluxDB Outlet at {}" ,
241
+ color_primary( & cmd . to)
129
242
)
130
243
} else {
131
244
fmt_warn ! (
132
- "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n " ,
245
+ "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the InfluxDB Outlet at {}\n " ,
133
246
color_primary( & node_name) ,
134
- color_primary( self . tcp_inlet . from. to_string( ) ) ,
135
- color_primary( & self . tcp_inlet . to)
247
+ color_primary( cmd . from. to_string( ) ) ,
248
+ color_primary( & cmd . to)
136
249
) + & fmt_info ! ( "It will retry to connect automatically" )
137
250
} ;
138
251
@@ -147,9 +260,52 @@ impl Command for InfluxDBCreateCommand {
147
260
}
148
261
}
149
262
150
- impl InfluxDBCreateCommand {
263
+ impl CreateCommand {
151
264
async fn parse_args ( mut self , opts : & CommandGlobalOpts ) -> miette:: Result < Self > {
152
- self . tcp_inlet = self . tcp_inlet . parse_args ( opts) . await ?;
265
+ if let Some ( alias) = self . alias . as_ref ( ) {
266
+ print_warning_for_deprecated_flag_replaced (
267
+ opts,
268
+ "alias" ,
269
+ "the <NAME> positional argument" ,
270
+ ) ?;
271
+ if self . name . is_none ( ) {
272
+ self . name = Some ( alias. clone ( ) ) ;
273
+ } else {
274
+ return Err ( miette ! (
275
+ "--alias is deprecated and can't be used together with the <NAME> positional argument" ,
276
+ ) ) ;
277
+ }
278
+ } else {
279
+ self . name = self . name . or_else ( || Some ( random_name ( ) ) ) ;
280
+ }
281
+
282
+ let from = resolve_peer ( self . from . to_string ( ) )
283
+ . await
284
+ . into_diagnostic ( ) ?;
285
+ port_is_free_guard ( & from) ?;
286
+
287
+ self . to = crate :: tcp:: inlet:: create:: CreateCommand :: parse_arg_to (
288
+ & opts. state ,
289
+ self . to ,
290
+ self . via . as_ref ( ) ,
291
+ )
292
+ . await ?;
293
+ if self . to ( ) . matches ( 0 , & [ proto:: Project :: CODE . into ( ) ] ) && self . authorized . is_some ( ) {
294
+ return Err ( miette ! (
295
+ "--authorized can not be used with project addresses"
296
+ ) ) ?;
297
+ }
298
+
299
+ self . tls_certificate_provider = if let Some ( tls_certificate_provider) =
300
+ & self . tls_certificate_provider
301
+ {
302
+ Some ( tls_certificate_provider. clone ( ) )
303
+ } else if self . tls {
304
+ Some ( MultiAddr :: from_str ( "/project/default/service/tls_certificate_provider" ) . unwrap ( ) )
305
+ } else {
306
+ None
307
+ } ;
308
+
153
309
if self
154
310
. lease_manager_route
155
311
. as_ref ( )
@@ -159,6 +315,22 @@ impl InfluxDBCreateCommand {
159
315
"lease-manager-route argument requires leased-token-strategy=per-client"
160
316
) ) ?
161
317
} ;
318
+
162
319
Ok ( self )
163
320
}
321
+
322
+ pub fn to ( & self ) -> MultiAddr {
323
+ MultiAddr :: from_str ( & self . to ) . unwrap ( )
324
+ }
325
+
326
+ pub async fn secure_channel_identifier (
327
+ & self ,
328
+ state : & CliState ,
329
+ ) -> miette:: Result < Option < Identifier > > {
330
+ if let Some ( identity_name) = self . identity . as_ref ( ) {
331
+ Ok ( Some ( state. get_identifier_by_name ( identity_name) . await ?) )
332
+ } else {
333
+ Ok ( None )
334
+ }
335
+ }
164
336
}
0 commit comments