Skip to main content

vector/
nats.rs

1//! Shared helper functions for NATS source and sink.
2#![allow(missing_docs)]
3
4use nkeys::error::Error as NKeysError;
5use snafu::{ResultExt, Snafu};
6use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
7
8use crate::tls::TlsEnableableConfig;
9
10/// Errors that can occur during NATS configuration.
11#[derive(Debug, Snafu)]
12pub enum NatsConfigError {
13    #[snafu(display("NATS Auth Config Error: {}", source))]
14    AuthConfigError { source: NKeysError },
15    #[snafu(display("NATS TLS Config Error: missing key"))]
16    TlsMissingKey,
17    #[snafu(display("NATS TLS Config Error: missing cert"))]
18    TlsMissingCert,
19    #[snafu(display("NATS Credentials file error"))]
20    CredentialsFileError { source: std::io::Error },
21}
22
23/// Configuration of the authentication strategy when interacting with NATS.
24#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(rename_all = "snake_case", tag = "strategy")]
27#[configurable(metadata(
28    docs::enum_tag_description = "The strategy used to authenticate with the NATS server.
29
30More information on NATS authentication, and the various authentication strategies, can be found in the
31NATS [documentation][nats_auth_docs]. For TLS client certificate authentication specifically, see the
32`tls` settings.
33
34[nats_auth_docs]: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro"
35))]
36pub enum NatsAuthConfig {
37    /// Username/password authentication.
38    UserPassword {
39        #[configurable(derived)]
40        user_password: NatsAuthUserPassword,
41    },
42
43    /// Token authentication.
44    Token {
45        #[configurable(derived)]
46        token: NatsAuthToken,
47    },
48
49    /// Credentials file authentication. (JWT-based)
50    CredentialsFile {
51        #[configurable(derived)]
52        credentials_file: NatsAuthCredentialsFile,
53    },
54
55    /// NKey authentication.
56    Nkey {
57        #[configurable(derived)]
58        nkey: NatsAuthNKey,
59    },
60}
61
62impl std::fmt::Display for NatsAuthConfig {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        use NatsAuthConfig::*;
65        let word = match self {
66            UserPassword { .. } => "user_password",
67            Token { .. } => "token",
68            CredentialsFile { .. } => "credentials_file",
69            Nkey { .. } => "nkey",
70        };
71        write!(f, "{word}")
72    }
73}
74
75/// Username and password configuration.
76#[configurable_component]
77#[derive(Clone, Debug)]
78#[serde(deny_unknown_fields)]
79pub struct NatsAuthUserPassword {
80    /// Username.
81    pub(crate) user: String,
82
83    /// Password.
84    pub(crate) password: SensitiveString,
85}
86
87/// Token configuration.
88#[configurable_component]
89#[derive(Clone, Debug)]
90#[serde(deny_unknown_fields)]
91pub struct NatsAuthToken {
92    /// Token.
93    pub(crate) value: SensitiveString,
94}
95
96/// Credentials file configuration.
97#[configurable_component]
98#[derive(Clone, Debug)]
99#[serde(deny_unknown_fields)]
100pub struct NatsAuthCredentialsFile {
101    /// Path to credentials file.
102    #[configurable(metadata(docs::examples = "/etc/nats/nats.creds"))]
103    pub(crate) path: String,
104}
105
106/// NKeys configuration.
107#[configurable_component]
108#[derive(Clone, Debug)]
109#[serde(deny_unknown_fields)]
110pub struct NatsAuthNKey {
111    /// User.
112    ///
113    /// Conceptually, this is equivalent to a public key.
114    pub(crate) nkey: String,
115
116    /// Seed.
117    ///
118    /// Conceptually, this is equivalent to a private key.
119    pub(crate) seed: String,
120}
121
122impl NatsAuthConfig {
123    pub(crate) fn to_nats_options(&self) -> Result<async_nats::ConnectOptions, NatsConfigError> {
124        match self {
125            NatsAuthConfig::UserPassword { user_password } => {
126                Ok(async_nats::ConnectOptions::with_user_and_password(
127                    user_password.user.clone(),
128                    user_password.password.inner().to_string(),
129                ))
130            }
131            NatsAuthConfig::CredentialsFile { credentials_file } => {
132                async_nats::ConnectOptions::with_credentials(
133                    &std::fs::read_to_string(credentials_file.path.clone())
134                        .context(CredentialsFileSnafu)?,
135                )
136                .context(CredentialsFileSnafu)
137            }
138            NatsAuthConfig::Nkey { nkey } => {
139                Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone()))
140            }
141            NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token(
142                token.value.inner().to_string(),
143            )),
144        }
145    }
146}
147
148pub(crate) fn from_tls_auth_config(
149    connection_name: &str,
150    auth_config: &Option<NatsAuthConfig>,
151    tls_config: &Option<TlsEnableableConfig>,
152) -> Result<async_nats::ConnectOptions, NatsConfigError> {
153    let nats_options = match &auth_config {
154        None => async_nats::ConnectOptions::new(),
155        Some(auth) => auth.to_nats_options()?,
156    };
157
158    let nats_options = nats_options.name(connection_name);
159
160    match tls_config {
161        None => Ok(nats_options),
162        Some(tls_config) => {
163            let tls_enabled = tls_config.enabled.unwrap_or(false);
164            let nats_options = nats_options.require_tls(tls_enabled);
165            if !tls_enabled {
166                return Ok(nats_options);
167            }
168
169            let nats_options = match &tls_config.options.ca_file {
170                None => nats_options,
171                Some(ca_file) => nats_options.add_root_certificates(ca_file.clone()),
172            };
173
174            let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) {
175                (None, None) => nats_options,
176                (Some(crt_file), Some(key_file)) => {
177                    nats_options.add_client_certificate(crt_file.clone(), key_file.clone())
178                }
179                (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey),
180                (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert),
181            };
182            Ok(nats_options)
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use indoc::indoc;
190
191    use super::*;
192
193    fn parse_auth(s: &str) -> Result<async_nats::ConnectOptions, crate::Error> {
194        serde_yaml::from_str(s)
195            .map_err(Into::into)
196            .and_then(|config: NatsAuthConfig| config.to_nats_options().map_err(Into::into))
197    }
198
199    #[test]
200    fn auth_user_password_ok() {
201        parse_auth(indoc! {r#"
202            strategy: user_password
203            user_password:
204              user: username
205              password: password
206        "#})
207        .unwrap();
208    }
209
210    #[test]
211    fn auth_user_password_missing_user() {
212        parse_auth(indoc! {r#"
213            strategy: user_password
214            user_password:
215              password: password
216        "#})
217        .unwrap_err();
218    }
219
220    #[test]
221    fn auth_user_password_missing_password() {
222        parse_auth(indoc! {r#"
223            strategy: user_password
224            user_password:
225              user: username
226        "#})
227        .unwrap_err();
228    }
229
230    #[test]
231    fn auth_user_password_missing_all() {
232        parse_auth(indoc! {r#"
233            strategy: user_password
234            token:
235              value: foobar
236        "#})
237        .unwrap_err();
238    }
239
240    #[test]
241    fn auth_token_ok() {
242        parse_auth(indoc! {r#"
243            strategy: token
244            token:
245              value: token
246        "#})
247        .unwrap();
248    }
249
250    #[test]
251    fn auth_token_missing() {
252        parse_auth(indoc! {r#"
253            strategy: token
254            user_password:
255              user: foobar
256        "#})
257        .unwrap_err();
258    }
259
260    #[test]
261    fn auth_credentials_file_ok() {
262        parse_auth(indoc! {r#"
263            strategy: credentials_file
264            credentials_file:
265              path: tests/integration/nats/data/nats.creds
266        "#})
267        .unwrap();
268    }
269
270    #[test]
271    fn auth_credentials_file_missing() {
272        parse_auth(indoc! {r#"
273            strategy: credentials_file
274            token:
275              value: foobar
276        "#})
277        .unwrap_err();
278    }
279
280    #[test]
281    fn auth_nkey_ok() {
282        parse_auth(indoc! {r#"
283            strategy: nkey
284            nkey:
285              nkey: UC435ZYS52HF72E2VMQF4GO6CUJOCHDUUPEBU7XDXW5AQLIC6JZ46PO5
286              seed: SUAAEZYNLTEA2MDTG7L5X7QODZXYHPOI2LT2KH5I4GD6YVP24SE766EGPA
287        "#})
288        .unwrap();
289    }
290
291    #[test]
292    fn auth_nkey_missing_nkey() {
293        parse_auth(indoc! {r#"
294            strategy: nkey
295            nkey:
296              seed: SUAAEZYNLTEA2MDTG7L5X7QODZXYHPOI2LT2KH5I4GD6YVP24SE766EGPA
297        "#})
298        .unwrap_err();
299    }
300
301    #[test]
302    fn auth_nkey_missing_seed() {
303        parse_auth(indoc! {r#"
304            strategy: nkey
305            nkey:
306              nkey: UC435ZYS52HF72E2VMQF4GO6CUJOCHDUUPEBU7XDXW5AQLIC6JZ46PO5
307        "#})
308        .unwrap_err();
309    }
310
311    #[test]
312    fn auth_nkey_missing_both() {
313        parse_auth(indoc! {r#"
314            strategy: nkey
315            user_password:
316              user: foobar
317        "#})
318        .unwrap_err();
319    }
320}