InfluxDB 사용하기

1. InfluxDB 설치

우선 pod를 생성해준다.

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-release bitnami/influxdb --version 5.13.5

잘 설치되었으면 InfluxDB token을 가져온다. token을 환경변수로 저장해두자.

kubectl get secret -n default my-release-influxdb -o jsonpath="{.data.admin-user-token}" | base64 --decode ; echo
echo 'export INFLUXDB_TOKEN=<TOKEN>' >> ~/.bashrc
source ~/.bashrc

다음으로 Bucket을 만들어준다.

kubectl exec -it <INFLUXDB_POD_NAME> -- influx bucket create -n UEData -o primary -t $INFLUXDB_TOKEN

2. InfluxDB 설정

이제 InfluxDB에 데이터를 넣어준다. 우선 InfluxDB 포트포워딩을 해주고,

kubectl port-forward svc/my-release-influxdb 8086:8086

Anaconda를 설치했으면 conda를 이용해 필요한 패키지를 설치한다.

conda create -n influxdb python=3.8
conda activate influxdb
pip install pandas influxdb_client

그리고 InfluxDB에 데이터를 넣어준다. 그 전에 qp 리포지토리를 클론해준다.

git clone -b f-release https://gerrit.o-ran-sc.org/r/ric-app/qp
cd qp/qp

instert.py 파일을 아래와 같이 수정해준다.

import pandas as pd
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime

org="primary"
token="<TOKEN>"
bucket="UEData"

class INSERTDATA:

   def __init__(self):
        self.client = InfluxDBClient(url= "http://localhost:8086", token=token, org=org)


def explode(df):
     for col in df.columns:
             if isinstance(df.iloc[0][col], list):
                     df = df.explode(col)
             d = df[col].apply(pd.Series)
             df[d.columns] = d
             df = df.drop(col, axis=1)
     return df


def jsonToTable(df):
     df.index = range(len(df))
     cols = [col for col in df.columns if isinstance(df.iloc[0][col], dict) or isinstance(df.iloc[0][col], list)]
     if len(cols) == 0:
             return df
     for col in cols:
             d = explode(pd.DataFrame(df[col], columns=[col]))
             d = d.dropna(axis=1, how='all')
             df = pd.concat([df, d], axis=1)
             df = df.drop(col, axis=1).dropna()
     return jsonToTable(df)


def time(df):
     df.index = pd.to_datetime(pd.date_range(start=datetime.datetime.utcnow(), freq='10ms', periods=len(df)))
     df['measTimeStampRf'] = df['measTimeStampRf'].apply(lambda x: str(x))

     return df


def populatedb():
     df = pd.read_json('cell.json.gz', lines=True)
     df = df[['cellMeasReport']].dropna()
     df = jsonToTable(df)
     df = time(df)
     db = INSERTDATA()
     write_api = db.client.write_api(write_options=SYNCHRONOUS)
     write_api.write(bucket="UEData",record=df, data_frame_measurement_name="liveCell",org=org)
     query_api = db.client.query_api()
     query = f'from(bucket: "{bucket}") |> range(start: -10000d)'
     result = query_api.query(org=org, query=query)
     results = []
     for table in result:
        for record in table.records:
            results.append((record.get_field(), record.get_value()))
     print(results)

print("---start algorithm----")
populatedb()
print("---end algorithm----")

insert.py 파일을 실행해준다.

python3 insert.py

마지막으로 aimlfw-dep/RECIPE_EXAMPLE/example_recipe_latest_stable.yaml 파일을 수정해준다.

traininghost:
  ip_address: localhost
...
datalake:
  influxdb:
    host: localhost
    port: 8086
    orgname: primary
    bucket: UEData
    token: <TOKEN>

수정사항을 반영해준다.

bin/uninstall.sh
bin/install.sh -f RECIPE_EXAMPLE/example_recipe_latest_stable.yaml

3. InfluxDB 확인

일단 QoE 데이터를 InfluxDB에 올리는 것 까지 되었을 것이다.

그럼 InfluxDB에 데이터가 잘 들어갔는지 확인해보자.

아래 명령어로 secret을 조회하여 InfluxDB의 admin 계정의 password를 확인할 수 있다.

kubectl get secret -n default my-release-influxdb -o jsonpath="{.data.admin-user-password}" | base64 --decode ; echo

아니면 계정을 생성해서 사용할 수도 있다.

kubectl exec -it -n default <INFLUXDB_POD_NAME> -- influx user create --org primary --name <USERNAME> --password <PASSWORD> -t $INFLUXDB_TOKEN

http://localhost:8086 에 접속해서 로그인을 해보자.

AIMLFW 환경설정
모델 학습 pipeline 구축하기