Python Pandas Dataframe’i Elasticsearch’e Yazmak

Merhabalar bu yazımızda Python Pandas Dataframe verisini Python kullanarak Elasticsearch’e yazacağız. Şükür ki Python çok gelişmiş ve zengin bir dil. Birileri biz kullanalım diye kütüphane yazmış, biz de bunu kullanarak dataframe’i Elasticsearch’e yazacağız. Bu yazıyı takip etmek için bazı ön koşullarımız olacak.

  • Elasticsearch 7.9.0 kurulu ve 9200 portundan erişilebilir durumdadır. Örneğin, benim elasticsearch sunucum aşağıdaki adreste şuan çalışıyor. Yüklü değil ise docker ile hızlı bir şekilde kullanıma hazır hale getirebilirsiniz. Browser’a localhost:9200 gitdiğimizde:
{
  "name" : "elasticsearch",
  "cluster_name" : "Atscale-ELK-Cluster",
  "cluster_uuid" : "nqlxX4saQFKMVTbrZ0YHgQ",
  "version" : {
    "number" : "7.9.0",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "a479a2a7fce0389512d6a9361301708b92dff667",
    "build_date" : "2020-08-11T21:36:48.204330Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

 

  • Python3 yüklü ve çalışır durumdadır.
  • Kullanacağımız modülün adı elasticsearch. python -m pip install elasticsearch komutu ile bu modülü yüklemiş durumdasınız.
  • Kullanacağımız veri setine buradan ulaşabilirsiniz.
  • Jupyter kullanabiliyorsunuz.

O halde başlayalım.


Veri seti,  bir market müşterilerine ait üretilmiş sentetik bir veri seti. Müşterinin gelir, yaş ve cinsiyet gibi özelliklerine göre ne kadar alış-veriş yaptığıyla ilgili.

Kütüphanelerimizi indirelim:

# Kütüphaneler
import pandas as pd
from elasticsearch import Elasticsearch, helpers

Elasticsearh sınıfından nesne yaratalım. Biliyoruz ki bu yazıda ağırlıklı olarak elasticsearch modülünü kullanacağız. Sizin elasticsearch sunucunuz farklı olabilir. Genelde: localhost:9200

# Elasticsearch sunucusuna bağlanan Elasticsearch sınıfından bir nesne yaratalım
es = Elasticsearch("localhost:9200")

Veriyi okuyup dataframe olarak tutalım:

df= pd.read_csv("https://github.com/erkansirin78/datasets/raw/master/spark_book_data/retail-data/all/online-retail-dataset.csv")
df.head()
  InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country
0 536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 12/1/2010 8:26 2.55 17850.0 United Kingdom
1 536365 71053 WHITE METAL LANTERN 6 12/1/2010 8:26 3.39 17850.0 United Kingdom
2 536365 84406B CREAM CUPID HEARTS COAT HANGER 8 12/1/2010 8:26 2.75 17850.0 United Kingdom
3 536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 12/1/2010 8:26 3.39 17850.0 United Kingdom
4 536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 12/1/2010 8:26 3.39 17850.0 United Kingdom

Null değerleri düşürelim

df.dropna(inplace=True)

Verimize uygun bir mapping (şema) oluşturalım:

online_retail_index =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram","asciifolding"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 10
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "InvoiceNo":    { "type": "keyword" },  
      "StockCode":  { "type": "keyword"  }, 
      "Description":   { "type": "text"  },
      "Quantity": {"type": "integer"},
      "InvoiceDate": {
        "type":   "date",
        "format": "MM/dd/yyyy hh:ss"
      },
      "UnitPrice": {"type": "float"},
      "CustomerID": {"type": "keyword"},
      "Country": {"type": "keyword"}
    }
  }
  }

Dataframe’i okuyup Elasticsearche yazan bir fonksiyon yazalım:

def dataframe_to_es(df, es_index):
    for df_idx, line in df.iterrows():
        yield {
            "_index": es_index,
            "_id":df_idx,
            "_source" : {
                "InvoiceNo": line[0],
                "StockCode": line[1],
                "Description": line[2],
                "Quantity": line[3],
                "InvoiceDate": line[4],
                "UnitPrice": line[5],
                "CustomerID": line[6],
                "Country": line[7]
            }
        }

 

Şimdi fonksiyonu kullanalım, ancak eğer varsa indeksi silelim. Çünkü indeks veri aktarılırken otomatik yaratılacakt. Bunu biraz da ilişkisel veri tabanındaki CREATE TABLE IF NOT EXIST sql cümleciğine benzetebiliriz. Daha sonra fonksiyonu kullanarak aktarma işlemini gerekleştirelim.

try:
    es.indices.delete("online_retail_python")
except:
    print("No index") 
    
helpers.bulk(es, dataframe_to_es(df, "online_retail_python"), raise_on_error=False)

 

Çıktı:

(406829, [])

şeklinde olacaktır.

Basit bir arama yapalım:

keyword = "Coffee"
res = es.search(index='online_retail_python', body={
    "query": {
        "bool": {
            "should": [
                {
                    "match": {
                        "Description": keyword
                    }
                }
            ]
        }
    }
    
})

res sözlüğündeki sonuçları görelim.

res['hits']['hits'][:4]

Çıktısı:

[{'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '9103',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537192',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/5/2010 13:42',
   'UnitPrice': 12.75,
   'CustomerID': 16402.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '14891',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537624',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/7/2010 14:41',
   'UnitPrice': 12.75,
   'CustomerID': 12748.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '22385',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '538167',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/9/2010 18:58',
   'UnitPrice': 12.75,
   'CustomerID': 14713.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '17532',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537765',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/8/2010 12:08',
   'UnitPrice': 12.75,
   'CustomerID': 14606.0,
   'Country': 'United Kingdom'}}]

İşte bu kadar. Hayırlı olsun. Elasticsearch’ün json dünyasında kaybolmadan python yeteneklerinizi kullanarak bir dataframe’i Elasticsearch’e aktardınız.

Başka bir yazıda görüşmek dileğiyle, mutlu analizler…

Yazar Hakkında
Toplam 157 yazı
Erkan ŞİRİN
Erkan ŞİRİN
2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete danışmanlık ve eğitimler vermekte, projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık ve sahtecilik tespiti, akan veri işleme ve veri hazırlama sürecidir.
Yorumlar (Yorum yapılmamış)

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara