Ограничения SQL берут своё начало в декларативности языка – мы указываем SQL что мы хотим получить, а SQL извлекает нам это из указанной базы. Для простой обработки данных этого достаточно. Но что делать, если мы хотим большего? Приведённый ниже класс – наша основа для оптимизации сервера MS SQL, далее мы дополним его несколькими методами. Сторонний модуль pyodbc упрощает доступ к базам данных через программный интерфейс ODBC (Open Database Connectivity).
Чтобы подключиться к базе данных из Python с помощью этого класса, достаточно создать объект и передать имя базы данных, к примеру, sql = Sql('database123')
.
Давайте разберёмся, что происходит внутри класса. В метод инициализации __init__
мы передаём строку server="XXVIR00012,55000"
. Это строковое значение – имя нашего сервера, которое можно найти в диалоговом окне "Connect
или в верхней части окна в среде MS SQL в
to Server"Server Management Studio
:
Все трудности подключения берёт на себя модуль pyodbc. Нам лишь нужно передать строку подключения в функцию pyodbc.connect()
.
self.cnxn = pyodbc.connect("Driver={SQL Server Native Client 11.0};"
"Server="+self.server+";"
"Database="+self.database+";"
"Trusted_Connection=yes;")
Подробнее о передаваемых в ODBC-интерфейс значениях читайте в официальном хелпе.
В конце класса создаётся строка, обновляемая с каждым передаваемым запросом:
self.query = "-- {}nn-- Made in Python".format(datetime.now()
.strftime("%d/%m/%Y"))
Это позволяет нам собирать логи и создавать более читабельный вывод. Для записи времени мы используем стандартную библиотеку datetime
.
Компоненты
Есть несколько важных функций, направленных на передачу данных в базу данных или из неё. Для примера мы возьмём каталог, в котором имеется множество однотипных csv-файлов.
В текущем проекте мы хотим:
- Импортировать файлы в SQL-server.
- Объединить их в одну таблицу.
- Динамически создать несколько таблиц на основе категорий внутри столбца.
import sys
sys.path.insert(0, r'C:\Usermediumpysqlpluslib')
import os
import pandas as pd
from data import Sql
sql = Sql('database123')
directory = r'C:\Usermediumdata\' # место хранения сгенерированных данных
file_list = os.listdir(directory) # определить список всех файлов
for file in file_list:
df = pd.read_csv(directory+file)
sql.push_dataframe(df, file[:-4])
# конвертируем список имен из file_list в имена таблиц
table_names = [x[:-4] for x in file_list]
sql.union(table_names, 'generic_jan') # объединяем файлы в одну таблицу
sql.drop(table_names)
# определяем список категорий в colX, например ['hr', 'finance', 'tech', 'c_suite']
sets = list(sql.manual("SELECT colX AS 'category' FROM generic_jan GROUP BY colX", response=True)['category'])
for category in sets:
sql.manual("SELECT * INTO generic_jan_"+category+" FROM generic_jan WHERE colX = '"+category+"'")
Как мы видим, кроме инициализации в класс Sql нужно добавить методы push_dataframe
и manual
, union
и drop
. Опишем их.
Метод push_dataframe
Функция push_dataframe
позволит поместить в базу данных датафрейм Pandas.
def push_dataframe(self, data, table="raw_data", batchsize=500):
cursor = self.cnxn.cursor() # создаем курсор
cursor.fast_executemany = True # активируем быстрое выполнение
# создаём заготовку для создания таблицы (начало)
query = "CREATE TABLE [" + table + "] (n"
# итерируемся по столбцам
for i in range(len(list(data))):
query += "t[{}] varchar(255)".format(list(data)[i]) # add column (everything is varchar for now)
# добавляем корректное завершение
if i != len(list(data))-1:
query += ",n"
else:
query += "n);"
cursor.execute(query) # запуск создания таблицы
self.cnxn.commit() # коммит для изменений
# append query to our SQL code logger
self.query += ("nn-- create tablen" + query)
# вставляем данные в батчи
query = ("INSERT INTO [{}] ({})n".format(table,
'['+'], [' # берем столбцы
.join(list(data)) + ']') +
"VALUESn(?{})".format(", ?"*(len(list(data))-1)))
# вставляем данные в целевую таблицу
for i in range(0, len(data), batchsize):
if i+batchsize > len(data):
batch = data[i: len(data)].values.tolist()
else:
batch = data[i: i+batchsize].values.tolist()
# запускаем вставку батча
cursor.executemany(query, batch)
self.cnxn.commit()
Это полезно, когда нужно загрузить много файлов.
Метод manual
Метод manual
используется выше как отдельно, так и внутри функций union
и drop
. Она позволяет упростить выполнение SQL-кода.
def manual(self, query, response=False):
cursor = self.cnxn.cursor() # создаем курсор выполнения
if response:
return read_sql(query, self.cnxn)
try:
cursor.execute(query) # execute
except pyodbc.ProgrammingError as error:
print("Warning:n{}".format(error))
self.cnxn.commit()
return "Query complete."
Аргумент response
даёт возможность вставить в дата фрейм исходящую информацию нашего запроса. Извлечь все уникальные значения из colX
в таблице generic_jan
можно с помощью следующей строки:
sets = list(sql.manual("SELECT colX AS 'category' FROM generic_jan GROUP BY colX", response=True)['category'])
Метод union
Теперь на основе метода manual
создадим метод union
:
def union(self, table_list, name="union", join="UNION"):
query = "SELECT * INTO ["+name+"] FROM (n"
query += f'n{join}n'.join(
[f'SELECT [{x}].* FROM [{x}]' for x in table_list]
)
query += ") x"
self.manual(query, fast=True)
Это «объединяющий» запрос с перебором списка имён таблиц из table_list
.
Метод drop
Метод drop
выполняет удаление таблиц:
def drop(self, tables):
if isinstance(tables, str):
# если отдельная строка, переведем в список
tables = [tables]
for table in tables:
# проверяем наличие таблицы и удаляем, если существует
query = ("IF OBJECT_ID ('["+table+"]', 'U') IS NOT NULL "
"DROP TABLE ["+table+"]")
self.manual(query)
Функция drop
позволяет удалить одну или несколько таблицу, поместив строку в tables
, либо несколько таблиц, поместив туда же весь список.
Заключение
Сочетая описанные несложные методы мы значительно облегчили работу с большим количеством файлов в SQL Server. Если вас заинтересовала тема взаимодействия Python и SQL, почитайте пост «Как подружить Python и базы данных SQL. Подробное руководство». Успехов в развитии!