Set up the requirement and docker-compose.
1. Create requirement.txt file.
pandas==1.2.4
elasticsearch==7.15.0
python-decouple==3.5
urllib3==1.25.11
mlxtend==0.19.0
networkx==2.6.3
matplotlib==3.5.1
prefixspan==0.5.2
2. Create Docker file.
FROM continuumio/miniconda3
WORKDIR /usr/app
RUN mkdir result
RUN mkdir src
EXPOSE 8888
RUN pip install --no-cache-dir --upgrade pip
COPY ./requirements.txt .
RUN pip install -r requirements.txt
RUN pip install jupyter
RUN jupyter notebook --generate-config --allow-root
ENTRYPOINT jupyter notebook --allow-root --ip=0.0.0.0 --port=8888 --no-browser
3. Create .env file.
ELASTIC_HOST=localhost
ELASTIC_PORT=9200
ELASTIC_PROTOCOL=http
ELASTIC_USER=user
ELASTIC_KEY=pass
4. Create docker-compose.yml file.
There are two different containers-elasticsearch, jupyter which need to be connected.
version: '3.0'
services: ## There are two different containers which are es01, jupyter
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1 ## Image to be used when creating the es01.
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- cluster.initial_master_nodes=es01
- bootstrap.memory_lock=true
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
jupyter:
build:
context: . ## Create the container using by current directory.
ports:
- 8888:8888
volumes:
- ./result:/usr/app/result
- ./src:/usr/app/src
env_file:
- ./.env
networks:
- elastic
environment:
- ELASTIC_HOST=es01
- ELASTIC_PORT=9200
volumes:
data01:
driver: local
networks:
elastic:
driver: bridge
Seperate mono code into modules.
1. Config module.
Create the config module and store config values in the module.
from decouple import config
ELASTIC_HOST=config('ELASTIC_HOST')
if ELASTIC_HOST == None:
ELASTIC_HOST='http://localhost'
ELASTIC_PORT=config('ELASTIC_PORT')
if ELASTIC_PORT == None:
ELASTIC_PORT=9200
else:
ELASTIC_PORT=int(ELASTIC_PORT)
ELASTIC_USER=config('ELASTIC_USER')
if ELASTIC_USER == None:
ELASTIC_USER='user'
ELASTIC_KEY=config('ELASTIC_KEY')
if ELASTIC_KEY == None:
ELASTIC_KEY='123'
ELASTIC_PROTOCOL=config('ELASTIC_PROTOCOL')
if ELASTIC_PROTOCOL == None:
ELASTIC_PROTOCOL='http'
ELASTIC_CONFIG={
'host': ELASTIC_HOST,
'port': ELASTIC_PORT,
'user': ELASTIC_USER,
'password': ELASTIC_KEY,
'protocol' : ELASTIC_PROTOCOL
}
2. Elasticsearch connection module.
It including query, insert, search.
from config import ELASTIC_CONFIG
import ssl
from elasticsearch.connection import create_ssl_context
from elasticsearch import Elasticsearch, helpers
import urllib3
class ElasticsearchConnector:
def __init__(self):
print('creating')
self.es=None
# 1. Create the module that connects to Elasticsearch
def connect(self):
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
es=Elasticsearch(hosts=[{'host':ELASTIC_CONFIG['host'],'port':ELASTIC_CONFIG['port']}],
scheme=ELASTIC_CONFIG['protocol'],
verify_certs=False,
ssl_context=ssl_context,
http_auth=(ELASTIC_CONFIG['user'], ELASTIC_CONFIG['password']))
print(es)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.es=es
def isConnected(self):
if self.es is not None:
return True
else:
return False
def getConnection(self):
if not self.isConnected():
self.connect()
return self.es
# 2. Query from Elasticsearch connection module, it should accept query parameters and index.
def insertData(self,index, body, doc_type):
connection=self.getConnection()
res=connection.index(index=index,body=body, doc_type=doc_type)
print(res)
# 3. Insert the data into Elasticsearch.
def bulkInsert(self, index, docs):
connection=self.getConnection()
result=helpers.bulk(connection, docs, index=index)
print(result)
# 4. Search the data from Elasticsearch.
def searchData(self, index, body):
connection=self.getConnection()
result=connection.search(index=index, body=body)
records=list(map(lambda n:n["_source"]["doc"],result['hits']['hits']))
return records
3. Read csv module.
Create script that read from csv and insert Elasticsearch.
from es import ElasticsearchConnector
from elasticsearch import helpers
import pandas as pd
import numpy as np
connector=ElasticsearchConnector()
### Efficient way to define index for coding below.
INDEX="ts_data_accident_2020"
import os
cwd=os.path.abspath(os.getcwd())
df=pd.read_csv(cwd +'/data.csv')
df.head()
### Efficient way to remove all nan.
for column_name in df.columns:
df[column_name]=df[column_name].replace(np.nan, '')
for i, row in df.iterrows():
result=connector.insertData(index=INDEX,body=row.to_dict(),doc_type='accident') ### Any doc_type name can be defined.
# 1. Way one to insert the data into Elasticsearch.
def doc_generator(data, doc_type='doc_type'):
df_iter=data.iterrows()
for index,document in df_iter:
yield {
"doc_type": doc_type,
"doc":document.to_dict()
}
connector.bulkInsert(docs=doc_generator(df, doc_type='accident'), index=INDEX)
# 2. Way two to insert the data into Elasticsearch.
def doc_generator(data):
df_iter=data.iterrows()
for index,document in df_iter:
yield {
"doc":document.to_dict()
}
connector.bulkInsert(docs=doc_generator(df), index=INDEX)
4. Receive the data module.
Receive the data from Elasticsearch.
from es import ElasticsearchConnector
connector=ElasticsearchConnector()
body={
"query":{
"match_all":{}
}
}
result=connector.searchData(index="ts_data_accident_2020", body=body)
result