우선 pod를 생성해준다.
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-release bitnami/influxdb --version 5.13.5
잘 설치되었으면 InfluxDB token을 가져온다. token을 환경변수로 저장해두자.
kubectl get secret -n default my-release-influxdb -o jsonpath="{.data.admin-user-token}" | base64 --decode ; echo
echo 'export INFLUXDB_TOKEN=<TOKEN>' >> ~/.bashrc
source ~/.bashrc
다음으로 Bucket을 만들어준다.
kubectl exec -it <INFLUXDB_POD_NAME> -- influx bucket create -n UEData -o primary -t $INFLUXDB_TOKEN
이제 InfluxDB에 데이터를 넣어준다. 우선 InfluxDB 포트포워딩을 해주고,
kubectl port-forward svc/my-release-influxdb 8086:8086
Anaconda를 설치했으면 conda를 이용해 필요한 패키지를 설치한다.
conda create -n influxdb python=3.8
conda activate influxdb
pip install pandas influxdb_client
그리고 InfluxDB에 데이터를 넣어준다. 그 전에 qp 리포지토리를 클론해준다.
git clone -b f-release https://gerrit.o-ran-sc.org/r/ric-app/qp
cd qp/qp
instert.py
파일을 아래와 같이 수정해준다.
import pandas as pd
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
org="primary"
token="<TOKEN>"
bucket="UEData"
class INSERTDATA:
def __init__(self):
self.client = InfluxDBClient(url= "http://localhost:8086", token=token, org=org)
def explode(df):
for col in df.columns:
if isinstance(df.iloc[0][col], list):
df = df.explode(col)
d = df[col].apply(pd.Series)
df[d.columns] = d
df = df.drop(col, axis=1)
return df
def jsonToTable(df):
df.index = range(len(df))
cols = [col for col in df.columns if isinstance(df.iloc[0][col], dict) or isinstance(df.iloc[0][col], list)]
if len(cols) == 0:
return df
for col in cols:
d = explode(pd.DataFrame(df[col], columns=[col]))
d = d.dropna(axis=1, how='all')
df = pd.concat([df, d], axis=1)
df = df.drop(col, axis=1).dropna()
return jsonToTable(df)
def time(df):
df.index = pd.to_datetime(pd.date_range(start=datetime.datetime.utcnow(), freq='10ms', periods=len(df)))
df['measTimeStampRf'] = df['measTimeStampRf'].apply(lambda x: str(x))
return df
def populatedb():
df = pd.read_json('cell.json.gz', lines=True)
df = df[['cellMeasReport']].dropna()
df = jsonToTable(df)
df = time(df)
db = INSERTDATA()
write_api = db.client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket="UEData",record=df, data_frame_measurement_name="liveCell",org=org)
query_api = db.client.query_api()
query = f'from(bucket: "{bucket}") |> range(start: -10000d)'
result = query_api.query(org=org, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_field(), record.get_value()))
print(results)
print("---start algorithm----")
populatedb()
print("---end algorithm----")
insert.py
파일을 실행해준다.
python3 insert.py
마지막으로 aimlfw-dep/RECIPE_EXAMPLE/example_recipe_latest_stable.yaml
파일을 수정해준다.
traininghost:
ip_address: localhost
...
datalake:
influxdb:
host: localhost
port: 8086
orgname: primary
bucket: UEData
token: <TOKEN>
수정사항을 반영해준다.
bin/uninstall.sh
bin/install.sh -f RECIPE_EXAMPLE/example_recipe_latest_stable.yaml
일단 QoE 데이터를 InfluxDB에 올리는 것 까지 되었을 것이다.
그럼 InfluxDB에 데이터가 잘 들어갔는지 확인해보자.
아래 명령어로 secret을 조회하여 InfluxDB의 admin 계정의 password를 확인할 수 있다.
kubectl get secret -n default my-release-influxdb -o jsonpath="{.data.admin-user-password}" | base64 --decode ; echo
아니면 계정을 생성해서 사용할 수도 있다.
kubectl exec -it -n default <INFLUXDB_POD_NAME> -- influx user create --org primary --name <USERNAME> --password <PASSWORD> -t $INFLUXDB_TOKEN
http://localhost:8086 에 접속해서 로그인을 해보자.