Source code for bluemist.datasource.database


__author__ = "Shashank Agrawal"
__license__ = "MIT"
__version__ = "0.1.1"
__email__ = "dew@bluemist-ai.one"


import logging
import os
import urllib.parse
from logging import config
import pandas as pd

BLUEMIST_PATH = os.getenv("BLUEMIST_PATH")

config.fileConfig(BLUEMIST_PATH + '/' + 'logging.config')
logger = logging.getLogger("bluemist")


[docs]def get_data_from_database(db_type=None, host=None, database=None, service=None, oracle_instant_client_path=None, username=None, password=None, query=None, chunk_size=1000): """ Extract data from database. Supported databases are MariaDB, Microsoft SQL, MySQL, Oracle, AWS Aurora and PostgreSQL db_type: {'mariadb', 'mssql', 'mysql', 'aurora-mysql', 'oracle', 'postgres', 'aurora-postgre'} Database type host: str Database host database: str Database name service: str Oracle service name. Used if ``db_type`` is ``oracle`` oracle_instant_client_path: str Filesystem path of Oracle instant client. Used if ``db_type`` is ``oracle`` username: str Database user password: str Database password query: str Database query to be used to extract data chunk_size: int or None, default=1000 Number of rows to return in each batch. Pass ``None`` to disable batch mode Examples --------- .. raw:: html :file: ../../code_samples/quickstarts/datasource/ds_database.html """ password = urllib.parse.quote_plus(password) if db_type == 'mariadb': from sqlalchemy import create_engine logger.info('Pulling data from MariaDB') connection_url = 'mysql+pymysql://' + username + ':' + password + '@' + host + '/' + database engine = create_engine(connection_url) conn = engine.connect() logger.info('Connection successful !!') data = extract_data(conn, query, chunk_size) return data elif db_type == 'mssql': from sqlalchemy import create_engine logger.info('Pulling data from MS SQL') connection_url = 'mssql+pymssql://' + username + ':' + password + '@' + host + '/' + database engine = create_engine(connection_url) conn = engine.connect() logger.info('Connection successful !!') data = extract_data(conn, query, chunk_size) return data elif db_type == 'mysql' or db_type == 'aurora-mysql': from sqlalchemy import create_engine logger.info('Pulling data from MySQL') connection_url = 'mysql+pymysql://' + username + ':' + password + '@' + host + '/' + database engine = create_engine(connection_url) conn = engine.connect() logger.info('Connection successful !!') data = extract_data(conn, query, chunk_size) return data elif db_type == 'oracle': from sqlalchemy import create_engine import cx_Oracle logger.info('Pulling data from Oracle') connection_url = 'oracle+cx_oracle://' + username + ':' + password + '@' + host + '/?service_name=' + service engine = create_engine(connection_url) conn = engine.connect() logger.info('Connection successful !!') data = extract_data(conn, query, chunk_size) return data elif db_type == 'postgres' or db_type == 'aurora-postgres': from sqlalchemy import create_engine logger.info('Pulling data from PostgreSQL') connection_url = 'postgresql://' + username + ':' + password + '@' + host + '/' + database engine = create_engine(connection_url) conn = engine.connect() logger.info('Connection successful !!') data = extract_data(conn, query, chunk_size) return data
def extract_data(conn=None, query=None, chunk_size=None): dfs = [] record_count = 0 for chunk in pd.read_sql_query(sql=query, con=conn, chunksize=chunk_size): dfs.append(chunk) record_count = record_count + chunk.shape[0] logger.debug('Records pulled in this batch {}'.format(chunk.shape[0])) data = pd.concat(dfs, ignore_index=True) logger.info('Total records pulled {}'.format(record_count)) data.columns = data.columns.str.replace('\W', '_') # TODO: Revisit this code return data