Python Pandas Dataframe’i Elasticsearch’e Yazmak

Merhabalar bu yazımızda Python Pandas Dataframe verisini Python kullanarak Elasticsearch’e yazacağız. Şükür ki Python çok gelişmiş ve zengin bir dil. Birileri biz kullanalım diye kütüphane yazmış, biz de bunu kullanarak dataframe’i Elasticsearch’e yazacağız. Bu yazıyı takip etmek için bazı ön koşullarımız olacak.

  • Elasticsearch 7.9.0 kurulu ve 9200 portundan erişilebilir durumdadır. Örneğin, benim elasticsearch sunucum aşağıdaki adreste şuan çalışıyor. Yüklü değil ise docker ile hızlı bir şekilde kullanıma hazır hale getirebilirsiniz. Browser’a localhost:9200 girdiğimizde:
  • Python3 yüklü ve çalışır durumdadır.
  • Kullanacağımız modülün adı elasticsearch. python -m pip install elasticsearch komutu ile bu modülü yüklemiş durumdasınız.
  • Kullanacağımız veri setine buradan ulaşabilirsiniz.
  • Jupyter kullanabiliyorsunuz.

O halde başlayalım.


Veri seti,  klasik perakende satış verisi.

Kütüphanelerimizi indirelim:

# Kütüphaneler
import pandas as pd
from elasticsearch import Elasticsearch, helpers

Elasticsearh sınıfından nesne yaratalım. Biliyoruz ki bu yazıda ağırlıklı olarak elasticsearch modülünü kullanacağız. Sizin elasticsearch sunucunuz farklı olabilir. Genelde: localhost:9200

# Elasticsearch sunucusuna bağlanan Elasticsearch sınıfından bir nesne yaratalım
es = Elasticsearch("localhost:9200")

Veriyi okuyup pandas dataframe olarak tutalım:

df= pd.read_csv("https://github.com/erkansirin78/datasets/raw/master/spark_book_data/retail-data/all/online-retail-dataset.csv")
df.head()
 InvoiceNoStockCodeDescriptionQuantityInvoiceDateUnitPriceCustomerIDCountry
053636585123AWHITE HANGING HEART T-LIGHT HOLDER612/1/2010 8:262.5517850.0United Kingdom
153636571053WHITE METAL LANTERN612/1/2010 8:263.3917850.0United Kingdom
253636584406BCREAM CUPID HEARTS COAT HANGER812/1/2010 8:262.7517850.0United Kingdom
353636584029GKNITTED UNION FLAG HOT WATER BOTTLE612/1/2010 8:263.3917850.0United Kingdom
453636584029ERED WOOLLY HOTTIE WHITE HEART.612/1/2010 8:263.3917850.0United Kingdom

Null değerleri düşürelim. Yazarken hata alabiliyor.

df.dropna(inplace=True)

Verimize uygun bir mapping (şema) oluşturalım:

online_retail_index =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram","asciifolding"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 10
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "InvoiceNo":    { "type": "keyword" },  
      "StockCode":  { "type": "keyword"  }, 
      "Description":   { "type": "text"  },
      "Quantity": {"type": "integer"},
      "InvoiceDate": {
        "type":   "date",
        "format": "MM/dd/yyyy hh:ss"
      },
      "UnitPrice": {"type": "float"},
      "CustomerID": {"type": "keyword"},
      "Country": {"type": "keyword"}
    }
  }
  }

Dataframe’i okuyup Elasticsearch’e yazmaya hazır hale getiren bir fonksiyon yazalım. Burada yazma işi olmuyor bulk api ile yazmak için veri yapısı değişiyor.

def dataframe_to_es(df, es_index):
    for df_idx, line in df.iterrows():
        yield {
            "_index": es_index,
            "_id":df_idx,
            "_source" : {
                "InvoiceNo": line[0],
                "StockCode": line[1],
                "Description": line[2],
                "Quantity": line[3],
                "InvoiceDate": line[4],
                "UnitPrice": line[5],
                "CustomerID": line[6],
                "Country": line[7]
            }
        }

Şimdi fonksiyonu kullanalım. Öncesinde eğer varsa indeksi silelim. Çünkü indeks veri aktarılırken otomatik yaratılacaktır. Bunu biraz da ilişkisel veri tabanındaki DROP TABLE IF EXISTS sql cümleciğine benzetebiliriz. Daha sonra fonksiyonu kullanarak yazma işlemini gerekleştirelim.

# Index varsa sil
try:
   es.indices.delete("online_retail_python")
   print("online_retail_python index deleted.")
except:
    print("No index")

# Indeksi yukarıdaki mappingse göre oluştur.
es.indices.create("online_retail_python", body=online_retail_index)
{'acknowledged': True,
'shards_acknowledged': True,
'index': 'online_retail_python'}
# Bulk api ile indeksle helpers.bulk(es, dataframe_to_es(df, "online_retail_python"), raise_on_error=False)

Çıktı:

(406829, [])

şeklinde olacaktır.

Basit bir arama yapalım:

keyword = "Coffee"
res = es.search(index='online_retail_python', body={
    "query": {
        "bool": {
            "should": [
                {
                    "match": {
                        "Description": keyword
                    }
                }
            ]
        }
    }
    
})

res sözlüğündeki sonuçları görelim.

res['hits']['hits'][:4]

Çıktısı:

[{'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '9103',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537192',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/5/2010 13:42',
   'UnitPrice': 12.75,
   'CustomerID': 16402.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '14891',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537624',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/7/2010 14:41',
   'UnitPrice': 12.75,
   'CustomerID': 12748.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '22385',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '538167',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/9/2010 18:58',
   'UnitPrice': 12.75,
   'CustomerID': 14713.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '17532',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537765',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/8/2010 12:08',
   'UnitPrice': 12.75,
   'CustomerID': 14606.0,
   'Country': 'United Kingdom'}}]

İşte bu kadar. Hayırlı olsun. Elasticsearch’ün JSON dünyasında kaybolmadan python yeteneklerinizi kullanarak bir dataframe’i Elasticsearch’e aktardınız.

Başka bir yazıda görüşmek dileğiyle, mutlu analizler…

Yazar Hakkında
Toplam 180 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara