# -*- coding: utf-8 -*-
"""
Safe database credential loader.
"""
import os
import json
import string
import sqlalchemy as sa
from sqlalchemy.engine import Engine
[docs]class EngineCreator: # pragma: no cover
"""
Tired of looking up docs on https://docs.sqlalchemy.org/en/latest/core/engines.html?
``EngineCreator`` creates sqlalchemy engine in one line:
Example::
from sqlalchemy_mate import EngineCreator
# sqlite in memory
engine = EngineCreator.create_sqlite()
# connect to postgresql, credential stored at ``~/.db.json``
# content of ``.db.json``
{
"mydb": {
"host": "example.com",
"port": 1234,
"database": "test",
"username": "admin",
"password": "admin"
},
...
}
engine = EngineCreator.from_home_db_json("mydb").create_postgresql()
"""
def __init__(
self,
host=None,
port=None,
database=None,
username=None,
password=None,
):
self.host = host
self.port = port
self.database = database
self.username = username
self.password = password
uri_template = "{username}{has_password}{password}@{host}{has_port}{port}/{database}"
path_db_json = os.path.join(os.path.expanduser("~"), ".db.json")
local_home = os.path.basename(os.path.expanduser("~"))
def __repr__(self):
return "{classname}(host='{host}', port={port}, database='{database}', username={username}, password='xxxxxxxxxxxx')".format(
classname=self.__class__.__name__,
host=self.host, port=self.port,
database=self.database, username=self.username,
)
@property
def uri(self) -> str:
"""
Return sqlalchemy connect string URI.
"""
return self.uri_template.format(
host=self.host,
port="" if self.port is None else self.port,
database=self.database,
username=self.username,
password="" if self.password is None else self.password,
has_password="" if self.password is None else ":",
has_port="" if self.port is None else ":",
)
@classmethod
def _validate_key_mapping(cls, key_mapping):
if key_mapping is not None:
keys = list(key_mapping)
keys.sort()
if keys != ["database", "host", "password", "port", "username"]:
msg = ("`key_mapping` is the credential field mapping from `Credential` to custom json! "
"it has to be a dictionary with 5 keys: "
"host, port, password, port, username!")
raise ValueError(msg)
@classmethod
def _transform(cls, data, key_mapping):
if key_mapping is None:
return data
else:
return {actual: data[custom] for actual, custom in key_mapping.items()}
@classmethod
def _from_json_data(cls, data, json_path=None, key_mapping=None):
if json_path is not None:
for p in json_path.split("."):
data = data[p]
return cls(**cls._transform(data, key_mapping))
[docs] @classmethod
def from_json(
cls,
json_file: str,
json_path: str = None,
key_mapping: dict = None,
) -> 'EngineCreator':
"""
Load connection credential from json file.
:param json_file: str, path to json file
:param json_path: str, dot notation of the path to the credential dict.
:param key_mapping: dict, map 'host', 'port', 'database', 'username', 'password'
to custom alias, for example ``{'host': 'h', 'port': 'p', 'database': 'db', 'username': 'user', 'password': 'pwd'}``. This params are used to adapt any json data.
:rtype:
:return:
Example:
Your json file::
{
"credentials": {
"db1": {
"h": "example.com",
"p": 1234,
"db": "test",
"user": "admin",
"pwd": "admin",
},
"db2": {
...
}
}
}
Usage::
cred = Credential.from_json(
"path-to-json-file", "credentials.db1",
dict(host="h", port="p", database="db", username="user", password="pwd")
)
"""
cls._validate_key_mapping(key_mapping)
with open(json_file, "rb") as f:
data = json.loads(f.read().decode("utf-8"))
return cls._from_json_data(data, json_path, key_mapping)
[docs] @classmethod
def from_home_db_json(
cls,
identifier: str,
key_mapping: dict = None,
) -> 'EngineCreator': # pragma: no cover
"""
Read credential from $HOME/.db.json file.
:type identifier: str
:param identifier: str, database identifier.
:type key_mapping: Dict[str, str]
:param key_mapping: dict
``.db.json````::
{
"identifier1": {
"host": "example.com",
"port": 1234,
"database": "test",
"username": "admin",
"password": "admin",
},
"identifier2": {
...
}
}
"""
return cls.from_json(
json_file=cls.path_db_json, json_path=identifier, key_mapping=key_mapping)
[docs] @classmethod
def from_s3_json(
cls,
bucket_name: str,
key: str,
json_path: str = None,
key_mapping: dict = None,
aws_profile: str = None,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
region_name: str = None,
) -> 'EngineCreator': # pragma: no cover
"""
Load database credential from json on s3.
:param bucket_name: str
:param key: str
:param aws_profile: if None, assume that you are using this from
AWS cloud. (service on the same cloud doesn't need profile name)
:param aws_access_key_id: str, not recommend to use
:param aws_secret_access_key: str, not recommend to use
:param region_name: str
"""
import boto3
ses = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
profile_name=aws_profile,
)
s3 = ses.resource("s3")
bucket = s3.Bucket(bucket_name)
object = bucket.Object(key)
data = json.loads(object.get()["Body"].read().decode("utf-8"))
return cls._from_json_data(data, json_path, key_mapping)
[docs] @classmethod
def from_env(
cls,
prefix: str,
kms_decrypt: bool = False,
aws_profile: str = None,
) -> 'EngineCreator':
"""
Load database credential from env variable.
- host: ENV.{PREFIX}_HOST
- port: ENV.{PREFIX}_PORT
- database: ENV.{PREFIX}_DATABASE
- username: ENV.{PREFIX}_USERNAME
- password: ENV.{PREFIX}_PASSWORD
:param prefix: str
:param kms_decrypt: bool
:param aws_profile: str
"""
if len(prefix) < 1:
raise ValueError("prefix can't be empty")
if len(set(prefix).difference(set(string.ascii_uppercase + "_"))):
raise ValueError("prefix can only use [A-Z] and '_'!")
if not prefix.endswith("_"):
prefix = prefix + "_"
data = dict(
host=os.getenv(prefix + "HOST"),
port=os.getenv(prefix + "PORT"),
database=os.getenv(prefix + "DATABASE"),
username=os.getenv(prefix + "USERNAME"),
password=os.getenv(prefix + "PASSWORD"),
)
if kms_decrypt is True: # pragma: no cover
import boto3
from base64 import b64decode
if aws_profile is not None:
kms = boto3.client("kms")
else:
ses = boto3.Session(profile_name=aws_profile)
kms = ses.client("kms")
def decrypt(kms, text):
return kms.decrypt(
CiphertextBlob=b64decode(text.encode("utf-8"))
)["Plaintext"].decode("utf-8")
data = {
key: value if value is None else decrypt(kms, str(value))
for key, value in data.items()
}
return cls(**data)
[docs] def to_dict(self):
"""
Convert credentials into a dict.
"""
return dict(
host=self.host,
port=self.port,
database=self.database,
username=self.username,
password=self.password,
)
# --- engine creator logic
def create_connect_str(self, dialect_and_driver) -> str:
return "{}://{}".format(dialect_and_driver, self.uri)
_ccs = create_connect_str
[docs] def create_engine(self, conn_str, **kwargs) -> Engine:
"""
:rtype: Engine
"""
return sa.create_engine(conn_str, **kwargs)
_ce = create_engine
[docs] @classmethod
def create_sqlite(cls, path=":memory:", **kwargs):
"""
Create sqlite engine.
"""
return sa.create_engine("sqlite:///{path}".format(path=path), **kwargs)
[docs] class DialectAndDriver(object):
"""
DB dialect and DB driver mapping.
"""
psql = "postgresql"
psql_psycopg2 = "postgresql+psycopg2"
psql_pg8000 = "postgresql+pg8000"
psql_pygresql = "postgresql+pygresql"
psql_psycopg2cffi = "postgresql+psycopg2cffi"
psql_pypostgresql = "postgresql+pypostgresql"
mysql = "mysql"
mysql_mysqldb = "mysql+mysqldb"
mysql_mysqlconnector = "mysql+mysqlconnector"
mysql_oursql = "mysql+oursql"
mysql_pymysql = "mysql+pymysql"
mysql_cymysql = "mysql+cymysql"
oracle = "oracle"
oracle_cx_oracle = "oracle+cx_oracle"
mssql_pyodbc = "mssql+pyodbc"
mssql_pymssql = "mssql+pymssql"
redshift_psycopg2 = "redshift+psycopg2"
# postgresql
[docs] def create_postgresql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql), **kwargs
)
[docs] def create_postgresql_psycopg2(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql_psycopg2), **kwargs
)
[docs] def create_postgresql_pg8000(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql_pg8000), **kwargs
)
def _create_postgresql_pygresql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql_pygresql), **kwargs
)
[docs] def create_postgresql_psycopg2cffi(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql_psycopg2cffi), **kwargs
)
[docs] def create_postgresql_pypostgresql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.psql_pypostgresql), **kwargs
)
# mysql
[docs] def create_mysql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql), **kwargs
)
[docs] def create_mysql_mysqldb(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql_mysqldb), **kwargs
)
[docs] def create_mysql_mysqlconnector(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql_mysqlconnector), **kwargs
)
[docs] def create_mysql_oursql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql_oursql), **kwargs
)
[docs] def create_mysql_pymysql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql_pymysql), **kwargs
)
[docs] def create_mysql_cymysql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mysql_cymysql), **kwargs
)
# oracle
[docs] def create_oracle(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.oracle), **kwargs
)
[docs] def create_oracle_cx_oracle(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.oracle_cx_oracle), **kwargs
)
# mssql
[docs] def create_mssql_pyodbc(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mssql_pyodbc), **kwargs
)
[docs] def create_mssql_pymssql(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.mssql_pymssql), **kwargs
)
# redshift
[docs] def create_redshift(self, **kwargs):
"""
:rtype: Engine
"""
return self._ce(
self._ccs(self.DialectAndDriver.redshift_psycopg2), **kwargs
)
if __name__ == "__main__":
import boto3
from base64 import b64decode
cred = Credential.from_s3_json(
"sanhe-credential", "db/elephant-dupe-remove.json",
aws_profile="sanhe",
)
print(cred)