-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.py
72 lines (59 loc) · 2.73 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
from azure.appconfiguration import AzureAppConfigurationClient
from requests.exceptions import RequestException
from azure.core.exceptions import ResourceNotFoundError
import logging
from typing import Union, Optional
class AppConfig:
def __init__(self) -> None:
"""
Initialise the AppConfig class.
Raises:
ValueError: If the AZURE_APP_CONFIG_CONNECTION_STRING is not found in the environment variables.
"""
# Try reading from Docker secret first.
self.connection_string = self._read_from_secret('azure_app_config')
# If not in Docker or the secret read failed, try environment variable.
if not self.connection_string:
self.connection_string = os.environ.get("AZURE_APP_CONFIG_CONNECTION_STRING")
if not self.connection_string:
raise ValueError("AZURE_APP_CONFIG_CONNECTION_STRING not found in Docker secrets or environment variables.")
self.config_client = AzureAppConfigurationClient.from_connection_string(self.connection_string)
def _read_from_secret(self, secret_path: str) -> Optional[str]:
"""
Read the connection string from the Docker secret.
Args:
secret_path (str): The path to the Docker secret.
Returns:
Optional[str]: The connection string if found; otherwise, None.
Raises:
IOError: If an error occurs while reading the Docker secret.
"""
try:
with open(secret_path, 'r') as file:
return file.read().strip()
except IOError as e:
print(f"Error reading from secret: {e}")
return None
def get_value(self, key: str) -> Union[str, None]:
"""
Fetch a value from the Azure App Configuration using a given key.
Args:
key (str): The key for which the value is to be fetched.
Returns:
Union[str, None]: The corresponding value if found; otherwise, None.
Note:
If an error occurs during the fetch, it logs the error and returns None.
"""
try:
fetched_key = self.config_client.get_configuration_setting(key=key)
return fetched_key.value if fetched_key else None
except ResourceNotFoundError:
logging.error(f"Error fetching key {key} from Azure App Configuration, the key does not exist.")
return None
except RequestException:
logging.error(f"HTTP Request failed when fetching key {key} from Azure App Configuration.")
return None
except Exception as ex:
logging.error(f"An error occurred while fetching key {key} from Azure App Configuration: {str(ex)}")
return None