Python_package_Polars

Polars는 RUST 기반으로 개발된 데이터 분석 도구임.

Polars 는 rust 기반 프레임 워크로 문법이 spark 와 살짝 비슷하다.

lazy frame 에 넣으면 램 효율이 비약적으로 증가
이후 데이터를 불러오기전에 조건절들을 전부 기억해놨다가 해당 조건절중에서 가장 빠른 방법을 찾아 순서대로 조건절 실행해고 완료된 table을 반환한다 (이때 .collect() 함수를 통해 테이블을 호출할 수 있다. )

설치시 connectorx 라는 db 연결 라이브러리가 필히 설치가 되어있어야한다. (그렇지 않을경우 error 반환)

pip install connectorx
pip install polars 
# 오프라인 환경 패키지 설치. 
# pip install --no-index --find-links=\\apkrp-wfsr193\DA\GH\프로그램\polars polars
# !conda install --no-index --find-links=C:/Users/gukim00/Desktop/csv_file/polar connectorx

로컬 파일 접근은 pandas 와 비슷하게 사용하면 된다.
db접근시에는 sqlalchemy와는 다른 방법으로 진행해야한다.

conn = 'mssql+pyodbc://[host]:[port]/[dbname]?trusted_connection=true'
# mssql+pyodbc 말고 단일 mssql 을 사용해도 된다고 한다. (특히 trusted_connection의 경우)
import polars as pl # polars import 
import connectorx as cx # connectorx 는 polars 가 데이터 프레임에 접근하는 도구이며,
ip_address = '123.123.123.132'
port = '1000'
db = 'test_db' 
conn = f'mssql+pyodbc://{ip_address}:{port}/{db}?trusted_connection=true'  # 윈도우 인증 로그인의 경우 trusted_connection= true 를 준다.
## pandas pd.read_sql 처럼 포트번호를 지정해주고 engine 만 connectorx 로 지정해주면 된다. 
query = "select * from ProductRecommender where yearmonth = '202306' AND STATUS = 'ACTIVE'"
df = pl.read_database(query,  connection_uri=conn, engine = 'connectorx')
file_path = '파일경로입력'
df.write_csv(file_path)

lazy 프레임에 넣는 방법.


### sql query 에서 데이터를 불러오고 해당 데이터를 lazy_frame 에 담는다. 
df2 = pl.read_database('select * from db_table' , conn).lazy()

## 일단 sql을 통해 호출을하고 
### lazy frame의 경우 head 대신에 fetch()를 활용한다. 
df2.fetch(4)
1
## 필터로 10개만 보기. collect()실행을 한번했더니 램용량이 많이 올라감....
df2.filter((pl.col('customerInsuranceAge') > 70) & 
           (pl.col('productcode').str.contains('CAN'))
           ).fetch(2)
1
## apply함수의 경우에는 일반 pandas 와는 조금 다른 방법인것 같다.
## pandas 의 경우 직접적으로 컬럼에 apply 를 하게되면 series 형태로 반환을 했다면 polars 의 경우 검색을 해봐도 with_columns과 같이 새 컬럼을 생성해야 적용이 된다.
df = pl.DataFrame({'A':[1,2,3,4,5],
                   'B':[5,6,7,8,9]
                   })

def max_of_column(column): 
  if column == 1 : 
    return df['B']

result = df.with_columns(pl.col('A').apply(max_of_column).alias('applied_max'))

print(result)
shape: (5, 3)
┌─────┬─────┬─────────────┐
│ A   ┆ B   ┆ applied_max │
│ --- ┆ --- ┆ ---         │
│ i64 ┆ i64 ┆ list[i64]   │
╞═════╪═════╪═════════════╡
│ 1   ┆ 5   ┆ [5, 6, … 9] │
│ 2   ┆ 6   ┆ null        │
│ 3   ┆ 7   ┆ null        │
│ 4   ┆ 8   ┆ null        │
│ 5   ┆ 9   ┆ null        │
└─────┴─────┴─────────────┘
processed_segment = df.with_columns(pl.when(pl.col('A')> 2).then(pl.col('B')).otherwise(0).alias('131'))
processed_segment
shape: (5, 3)
AB131
i64i64i64
150
260
377
488
599

아래는 sqlalchemy를 이용한 쿼리 방법

import pymssql
import numpy as np
import pandas as pd
import datetime 
from datetime import timedelta
from dateutil.relativedelta import relativedelta
import sys, os 
import sqlalchemy as db
from sqlalchemy import select ,Table, func, text
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import sessionmaker


db_url = URL.create(
        # drivername="mssql+pymssql",
        drivername="mssql+pyodbc",
        host = "ip_address_here",
        port  = 'port_here',
        database = "db_here",
        query = {
                "driver" : "ODBC Driver 13 for SQL Server",
                "TrustServerCertificate" : "yes",
                "authentication" : "ActiveDirectoryIntegrated",
                "isolation_level" : "AUTOCOMMIT" ## autocommit 설정 (아래 엔진에서 설정해도 무관.)
        }
)

db_engine = create_engine(db_url,encoding = 'utf-8' ,isolation_level = 'AUTOCOMMIT')


metadata =  db.MetaData()


connection = db_engine.raw_connection()
cursor = connection.cursor()

#방법 1 단순 READ 
sql = "select top 1 * from db_table where yearmonth = '202306' AND STATUS = 'ACTIVE'"
reco_df = pd.read_sql(sql, connection)


# 방법 2 임시테이블 생성등등에 활용 EX(INSERT INTO)
cursor.execute(sql)
rows = cursor.fetchall() #fetchall 의경우 리스트의 형식으로 데이터를 담기때문에 컬럼 설정이 되지 않는다. 컬럼은 아래와 같이 일괄 가져올 수 있음.
columns = [column[0] for column in cursor.description]

df = pd.DataFrame.from_records(rows,columns=columns)
c:\Users\gukim00\Anaconda3\envs\py39_clone\lib\site-packages\pandas\io\sql.py:761: UserWarning: pandas only support SQLAlchemy connectable(engine/connection) ordatabase string URI or sqlite3 DBAPI2 connectionother DBAPI2 objects are not tested, please consider using SQLAlchemy
  warnings.warn(
df = pd.DataFrame.from_records(rows.fetchall(),columns=columns)
# 이유는 모르겠는데 polars CONNECTORX 실행후 SQLALCHEMY 실행시 호스트 연결이 끊긴다는 에러가 발생함. 
import pyodbc 
print(pyodbc.drivers())
['SQL Server', 'Microsoft Access Driver (*.mdb, *.accdb)', 'Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)', 'Microsoft Access Text Driver (*.txt, *.csv)', 'SQL Server Native Client 11.0', 'ODBC Driver 13 for SQL Server', 'Microsoft Access dBASE Driver (*.dbf, *.ndx, *.mdx)']