데이터베이스 (DB)/InfluxDB

InfluxDB와 Python3 연동하기 (For Windows & Ubuntu)

0. 들어가기 전에


지금까지 시계열 데이터베이스, InfluxDB에 대해 알아보았다.

 

이 시계열 데이터베이스는 주로 데이터를 분석이나 인공지능 분야에서 많이 사용이 될 텐데 이 계열에서 가장 많이 사용하는 언어가 Python이다.

 

고맙게도 InfluxDB에서 Python API를 지원한다. (Sphinx로 작성한듯하다.)

그래서 이번 시간에는 InfluxDB를 Python에서 사용하는 방법을 알아볼 것이다.

 

혹시나 아직 InfluxDB가 설치되어 있지 않다면 아래 링크를 참고하자.

foreverhappiness.tistory.com/59

 

InfluxDB 소개 및 설치 (For Windows & Ubuntu)

0. 들어가기 전에 시계열 데이터베이스(TSDB, Time-Series Database) 중에서 가장 많이 사용되는 InfluxDB를 다뤄볼 것이다. TSDB에 대해 아직 잘 모른다면 아래 링크를 참고하길 바란다. foreverhappiness.tistor..

foreverhappiness.tistory.com

 


1. InfluxDB 라이브러리 설치


설치하는 방법은 굉장히 쉽다.

 

먼저 명령 프롬프트를 열어야한다.

화면 왼쪽 아래 검색 버튼을 눌러 "cmd" 혹은 "명령 프롬프트"를 입력 후 실행시키자.

Ubuntu 사용자라면 Terminal을 열자

 

그리고 아래와 같이 명령을 실행시키면 된다.

pip3 install influxdb

 

너무 간단하게 끝났다.

 

혹시 pip3를 찾을 수 없다는 오류가 발생한다면 아래 게시글을 참고하자.

 

 

pip, pip3 설치하기 - 파이썬 3 (Python 3) - For Windows & Linux

1. pip이란 무엇인가? Python 언어를 설치하고 나면 pip 또는 pip3라는 명령어를 사용할 경우가 많다. (물론 파이썬 언어를 처음 배운다면 사용할 일이 거의 없을 것이다.) pip 명령어는 파이썬으로 작

foreverhappiness.tistory.com

 


2. Python3에서 InfluxDB 사용하기


Python에서 InfluxDB를 사용하기 전에 먼저 InfluxDB 서버를 시작시켜야 한다.

앞전 포스팅에서 로컬에 InfluxDB 서버를 구동시키는 "influxd.exe" 파일을 먼저 실행시켜야 한다.

 

 

이제 InfluxDB에 접근하여 데이터를 삽입하는 아래 예제 코드를 함께 살펴볼 것이다.

그대로 복사 붙여넣기 해도 정상적으로 실행될 것이니 하나하나 살펴보자.

 

from datetime import datetime, timedelta
import pprint
import time
from influxdb import InfluxDBClient
from copy import deepcopy


def get_ifdb(db, host='localhost', port=8086, user='root', passwd='root'):
    # Create an object include information for connect to the InfluxDB
    client = InfluxDBClient(host, port, user, passwd, db)

    try:
        # Try to Create database
        client.create_database(db)

        # If you can create database or have a database
        # there is no problem connecting to the InfluxDB
        print('Connection Successful')
        print('=======================')
        print('     Connection Info')
        print('=======================')
        print('host :', host)
        print('port :', port)
        print('username :', user)
        print('database :', db)
    except:
        # Generate error if you can't create database (can't connect to ifdb)
        print('Connection Failed')
        pass

    return client


def my_test(ifdb):
    # save points in the json_body
    json_body = []
    tablename = 'my_table'
    fieldname = 'my_field'
    point = {
        "measurement": tablename,
        "tags": {
            "host": "forever_happiness",
            "country": "South Korea",
            "region": "Busan"
        },
        "fields": {
            # Initialize data to zero
            fieldname: 0
        },
        "time": None,
    }

    # vals = [1, 2, ... 9, 10]
    vals = list(range(1, 11))

    for v in vals:
        # InfluxDB is based on UTC
        # so it should be timed with KCT
        dt = datetime.now() - timedelta(hours=-9)

        np = deepcopy(point)
        np['fields'][fieldname] = v
        np['time'] = dt
        json_body.append(np)

        # wait a second
        time.sleep(1)

    # Write the data for 10 seconds on the influxDB at once
    ifdb.write_points(json_body)

    result = ifdb.query('select * from %s' % tablename)
    pprint.pprint(result.raw)


def do_test():
    # Connect to InfluxDB
    mydb = get_ifdb(db='myDB')

    # write data to mydb
    my_test(mydb)


if __name__ == '__main__':
    do_test()

 

실행 결과가 아래와 같이 나온다면 잘 실행된 것이다.

 

 

조금 길지만 차근차근 보다 보면 어렵지 않게 분석이 가능할 것이다.

 

코드 실행과 동시에 가장 먼저 do_test 함수가 실행될 것이고 do_test 내부에는 influxDB에 연결하는 함수(get_ifdb)와 연결된 데이터베이스에 데이터를 입력하는 함수(my_test)가 있다.

 

get_ifdb

혹시 API를 보는 연습이 되어있지 않다면 이번 기회에 API를 보는 연습을 해보는 것도 좋다.

get_ifdb 함수 내부에서 InfluxDBClient 객체를 만들어 client 변수에 저장하는데 이때 이 객체에 InfluxDB로 연결할 정보들을 저장한다.

 

API를 보면 아래와 같이 나온다.

host는 이미 'localhost'로, port는 8086으로, username과 password는 'root'로 이미 초기화가 되어있다.

따라서 database만 잘 입력해주면 되는데 만약 로컬이 아닌 다른 IP에 있는 InfluxDB로 접근을 한다면 예제 코드 상에서 host 부분을 변경해야 할 것이다.

 

실제로 코드를 뜯어보면 아래와 같이 초기화가 되어있다.

 

즉, InfluxDBClient 객체는 그냥 객체일 뿐 별도의 연결 함수를 사용하지 않았으므로 그냥 연결에 대한 정보만 가지고 있는  것이다.

 

그럼 연결은 어디서 하는가?

바로 아래의 try문에서 create_database 함수를 통해 데이터베이스를 생성할 수 있는지 없는지에 따라 연결 가능 유무를 판별한다.

만약 "influxd.exe" 파일을 실행시키지 않고 데이터베이스 생성을 시도한다면 create_database 함수에서 에러를 발생시켜 except문으롤 빠질 것이다.

 

이미 생성된 데이터베이스가 있다면 새로운 데이터베이스를 만들지 않으며 에러도 발생시키지 않으니 걱정하지 않아도 된다.

 

my_test

my_test 함수에서 본격적으로 데이터를 삽입할 건데 예전 포스팅에서 InfluxDB는 Point라는 틀을 가지고 있다고 얘기한 적이 있었다.

(4번 데이터 입력하기 참고) foreverhappiness.tistory.com/60

 

point는 dictionary로 구성되어 있고 measurement와 tag key, field key, time key를 포함하고 있다.

데이터를 입력할 때는 point를 deepcopy를 사용하여 데이터를 적용한 후 json_body에 append 시켜준다.

 

최종적으로 write_points 함수를 통해 influxDB에 데이터가 입력되는데 현재 코드에서는 10초에 한 번씩 10초만큼의 데이터를 한꺼번에 삽입하는 것이다.

 

마지막으로 query 함수를 통해서 직접적으로 쿼리를 입력할 수도 있는데 대부분의 데이터베이스의 적인 SQL Injection 공격에 취약할 수 있기 현업에서 사용한다면 때문에 조심해서 사용해야 한다.

 

 

 

 

InfluxDB에서 공식적으로 제공하고 있는 예제도 있으니 참고하면 좋을 것 같다.

influxdb-python.readthedocs.io/en/latest/examples.html

 

InfluxDB Python Examples — InfluxDB 5.3.1 documentation

© Copyright 2013-2014 Errplane Inc. Revision 7b036730.

influxdb-python.readthedocs.io

 


3. 마무리


InfluxDB 자체에 대한 전반적인 내용들은 마무리가 되었다.

 

다음 포스팅에서는 InfluxDB에 저장된 데이터들을 시각화해주는 Grafana에 대해 알아보자.