Elastic Stack/Elasticsearch

Elasticsearch, Docker-compose, Python module

Naranjito 2021. 12. 9. 11:59

Set up the requirement and docker-compose.

 

1. Create requirement.txt file.

pandas==1.2.4
elasticsearch==7.15.0
python-decouple==3.5
urllib3==1.25.11
mlxtend==0.19.0
networkx==2.6.3
matplotlib==3.5.1
prefixspan==0.5.2

 

2. Create Docker file.

FROM continuumio/miniconda3

WORKDIR /usr/app

RUN mkdir result

RUN mkdir src

EXPOSE 8888

RUN pip install --no-cache-dir --upgrade pip

COPY ./requirements.txt .

RUN pip install -r requirements.txt

RUN pip install jupyter

RUN jupyter notebook --generate-config --allow-root

ENTRYPOINT jupyter notebook --allow-root --ip=0.0.0.0 --port=8888 --no-browser

 

3. Create .env file.

ELASTIC_HOST=localhost
ELASTIC_PORT=9200
ELASTIC_PROTOCOL=http
ELASTIC_USER=user
ELASTIC_KEY=pass

 

4. Create docker-compose.yml file.

There are two different containers-elasticsearch, jupyter which need to be connected.

version: '3.0'

services: ## There are two different containers which are es01, jupyter
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1 ## Image to be used when creating the es01. 
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - cluster.initial_master_nodes=es01
      - bootstrap.memory_lock=true
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic

  jupyter:
    build:
      context: . ## Create the container using by current directory.
    ports:
      - 8888:8888
    volumes:
      - ./result:/usr/app/result
      - ./src:/usr/app/src
    env_file:
      - ./.env
    networks:
      - elastic
    environment:
      - ELASTIC_HOST=es01
      - ELASTIC_PORT=9200

volumes:
  data01:
    driver: local

networks:
  elastic:
    driver: bridge

 

Seperate mono code into modules. 

 

1. Config module.

Create the config module and store config values in the module.

from decouple import config

ELASTIC_HOST=config('ELASTIC_HOST')
if ELASTIC_HOST == None:
    ELASTIC_HOST='http://localhost'

ELASTIC_PORT=config('ELASTIC_PORT')
if ELASTIC_PORT == None:
    ELASTIC_PORT=9200
else:
    ELASTIC_PORT=int(ELASTIC_PORT)

ELASTIC_USER=config('ELASTIC_USER')
if ELASTIC_USER == None:
    ELASTIC_USER='user'

ELASTIC_KEY=config('ELASTIC_KEY')
if ELASTIC_KEY == None:
    ELASTIC_KEY='123'

ELASTIC_PROTOCOL=config('ELASTIC_PROTOCOL')
if ELASTIC_PROTOCOL == None:
    ELASTIC_PROTOCOL='http'

ELASTIC_CONFIG={
    'host': ELASTIC_HOST,
    'port': ELASTIC_PORT,
    'user': ELASTIC_USER,
    'password': ELASTIC_KEY,
    'protocol' : ELASTIC_PROTOCOL
}

 

2. Elasticsearch connection module.

It including query, insert, search.

from config import ELASTIC_CONFIG
import ssl
from elasticsearch.connection import create_ssl_context
from elasticsearch import Elasticsearch, helpers
import urllib3


class ElasticsearchConnector:
    
    def __init__(self):
        print('creating')
        self.es=None

# 1. Create the module that connects to Elasticsearch

    def connect(self):

        ssl_context = create_ssl_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE

        es=Elasticsearch(hosts=[{'host':ELASTIC_CONFIG['host'],'port':ELASTIC_CONFIG['port']}],
        scheme=ELASTIC_CONFIG['protocol'],
        verify_certs=False,
        ssl_context=ssl_context,
        http_auth=(ELASTIC_CONFIG['user'], ELASTIC_CONFIG['password']))
        print(es)

        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        self.es=es

    def isConnected(self):
        if self.es is not None:
            return True
        else:
            return False

    def getConnection(self):
        if not self.isConnected():
            self.connect()
        return self.es
 
# 2. Query from Elasticsearch connection module, it should accept query parameters and index.

    def insertData(self,index, body, doc_type):
        connection=self.getConnection()
        res=connection.index(index=index,body=body, doc_type=doc_type)
        print(res)
     
# 3. Insert the data into Elasticsearch.   
     
    def bulkInsert(self, index, docs):
        connection=self.getConnection()
        result=helpers.bulk(connection, docs, index=index)
        print(result)
    
# 4. Search the data from Elasticsearch.

    def searchData(self, index, body):
        connection=self.getConnection()
        result=connection.search(index=index, body=body)
        records=list(map(lambda n:n["_source"]["doc"],result['hits']['hits']))
        return records

 

3. Read csv module.

Create script that read from csv and insert Elasticsearch.

from es import ElasticsearchConnector
from elasticsearch import helpers
import pandas as pd
import numpy as np

connector=ElasticsearchConnector()
### Efficient way to define index for coding below.
INDEX="ts_data_accident_2020" 

import os
cwd=os.path.abspath(os.getcwd())
df=pd.read_csv(cwd +'/data.csv')
df.head()

### Efficient way to remove all nan.
for column_name in df.columns:
    df[column_name]=df[column_name].replace(np.nan, '')
    
for i, row in df.iterrows():
    result=connector.insertData(index=INDEX,body=row.to_dict(),doc_type='accident') ### Any doc_type name can be defined.
   
# 1. Way one to insert the data into Elasticsearch.   
def doc_generator(data, doc_type='doc_type'):
    df_iter=data.iterrows()
    for index,document in df_iter:
        yield {
            "doc_type": doc_type,
            "doc":document.to_dict()
        }     

connector.bulkInsert(docs=doc_generator(df, doc_type='accident'), index=INDEX)

# 2. Way two to insert the data into Elasticsearch.
def doc_generator(data):
    df_iter=data.iterrows()
    for index,document in df_iter:
        yield {
            "doc":document.to_dict()
        } 

connector.bulkInsert(docs=doc_generator(df), index=INDEX)

 

4. Receive the data module.

Receive the data from Elasticsearch.

from es import ElasticsearchConnector
connector=ElasticsearchConnector()

body={
    "query":{
        "match_all":{}
    }
}
result=connector.searchData(index="ts_data_accident_2020", body=body)
result