Python Pandas Dataframe’i Elasticsearch’e Yazmak

Merhabalar bu yazımızda Python Pandas Dataframe verisini Python kullanarak Elasticsearch’e yazacağız. Şükür ki Python çok gelişmiş ve zengin bir dil. Birileri biz kullanalım diye kütüphane yazmış, biz de bunu kullanarak dataframe’i Elasticsearch’e yazacağız. Bu yazıyı takip etmek için bazı ön koşullarımız olacak.

  • Elasticsearch 7.9.0 kurulu ve 9200 portundan erişilebilir durumdadır. Örneğin, benim elasticsearch sunucum aşağıdaki adreste şuan çalışıyor. Yüklü değil ise docker ile hızlı bir şekilde kullanıma hazır hale getirebilirsiniz. Browser’a localhost:9200 girdiğimizde:
{
  "name" : "elasticsearch",
  "cluster_name" : "Atscale-ELK-Cluster",
  "cluster_uuid" : "nqlxX4saQFKMVTbrZ0YHgQ",
  "version" : {
    "number" : "7.9.0",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "a479a2a7fce0389512d6a9361301708b92dff667",
    "build_date" : "2020-08-11T21:36:48.204330Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

 

  • Python3 yüklü ve çalışır durumdadır.
  • Kullanacağımız modülün adı elasticsearch. python -m pip install elasticsearch komutu ile bu modülü yüklemiş durumdasınız.
  • Kullanacağımız veri setine buradan ulaşabilirsiniz.
  • Jupyter kullanabiliyorsunuz.

O halde başlayalım.


Veri seti,  klasik perakende satış verisi.

Kütüphanelerimizi indirelim:

# Kütüphaneler
import pandas as pd
from elasticsearch import Elasticsearch, helpers

Elasticsearh sınıfından nesne yaratalım. Biliyoruz ki bu yazıda ağırlıklı olarak elasticsearch modülünü kullanacağız. Sizin elasticsearch sunucunuz farklı olabilir. Genelde: localhost:9200

# Elasticsearch sunucusuna bağlanan Elasticsearch sınıfından bir nesne yaratalım
es = Elasticsearch("localhost:9200")

Veriyi okuyup pandas dataframe olarak tutalım:

df= pd.read_csv("https://github.com/erkansirin78/datasets/raw/master/spark_book_data/retail-data/all/online-retail-dataset.csv")
df.head()
  InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country
0 536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 12/1/2010 8:26 2.55 17850.0 United Kingdom
1 536365 71053 WHITE METAL LANTERN 6 12/1/2010 8:26 3.39 17850.0 United Kingdom
2 536365 84406B CREAM CUPID HEARTS COAT HANGER 8 12/1/2010 8:26 2.75 17850.0 United Kingdom
3 536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 12/1/2010 8:26 3.39 17850.0 United Kingdom
4 536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 12/1/2010 8:26 3.39 17850.0 United Kingdom

Null değerleri düşürelim. Yazarken hata alabiliyor.

df.dropna(inplace=True)

Verimize uygun bir mapping (şema) oluşturalım:

online_retail_index =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram","asciifolding"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 10
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "InvoiceNo":    { "type": "keyword" },  
      "StockCode":  { "type": "keyword"  }, 
      "Description":   { "type": "text"  },
      "Quantity": {"type": "integer"},
      "InvoiceDate": {
        "type":   "date",
        "format": "MM/dd/yyyy hh:ss"
      },
      "UnitPrice": {"type": "float"},
      "CustomerID": {"type": "keyword"},
      "Country": {"type": "keyword"}
    }
  }
  }

Dataframe’i okuyup Elasticsearch’e yazmaya hazır hale getiren bir fonksiyon yazalım. Burada yazma işi olmuyor bulk api ile yazmak için veri yapısı değişiyor.

def dataframe_to_es(df, es_index):
    for df_idx, line in df.iterrows():
        yield {
            "_index": es_index,
            "_id":df_idx,
            "_source" : {
                "InvoiceNo": line[0],
                "StockCode": line[1],
                "Description": line[2],
                "Quantity": line[3],
                "InvoiceDate": line[4],
                "UnitPrice": line[5],
                "CustomerID": line[6],
                "Country": line[7]
            }
        }

Şimdi fonksiyonu kullanalım. Öncesinde eğer varsa indeksi silelim. Çünkü indeks veri aktarılırken otomatik yaratılacaktır. Bunu biraz da ilişkisel veri tabanındaki DROP TABLE IF EXISTS sql cümleciğine benzetebiliriz. Daha sonra fonksiyonu kullanarak yazma işlemini gerekleştirelim.

# Index varsa sil
try:
   es.indices.delete("online_retail_python")
   print("online_retail_python index deleted.")
except:
    print("No index")

# Indeksi yukarıdaki mappingse göre oluştur.
es.indices.create("online_retail_python", body=online_retail_index)
{'acknowledged': True,
'shards_acknowledged': True,
'index': 'online_retail_python'}
# Bulk api ile indeksle helpers.bulk(es, dataframe_to_es(df, "online_retail_python"), raise_on_error=False)

Çıktı:

(406829, [])

şeklinde olacaktır.

Basit bir arama yapalım:

keyword = "Coffee"
res = es.search(index='online_retail_python', body={
    "query": {
        "bool": {
            "should": [
                {
                    "match": {
                        "Description": keyword
                    }
                }
            ]
        }
    }
    
})

res sözlüğündeki sonuçları görelim.

res['hits']['hits'][:4]

Çıktısı:

[{'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '9103',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537192',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/5/2010 13:42',
   'UnitPrice': 12.75,
   'CustomerID': 16402.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '14891',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537624',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/7/2010 14:41',
   'UnitPrice': 12.75,
   'CustomerID': 12748.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '22385',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '538167',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/9/2010 18:58',
   'UnitPrice': 12.75,
   'CustomerID': 14713.0,
   'Country': 'United Kingdom'}},
 {'_index': 'online_retail_python',
  '_type': '_doc',
  '_id': '17532',
  '_score': 5.77428,
  '_source': {'InvoiceNo': '537765',
   'StockCode': '20748',
   'Description': 'KENSINGTON COFFEE SET',
   'Quantity': 1,
   'InvoiceDate': '12/8/2010 12:08',
   'UnitPrice': 12.75,
   'CustomerID': 14606.0,
   'Country': 'United Kingdom'}}]

İşte bu kadar. Hayırlı olsun. Elasticsearch’ün JSON dünyasında kaybolmadan python yeteneklerinizi kullanarak bir dataframe’i Elasticsearch’e aktardınız.

Başka bir yazıda görüşmek dileğiyle, mutlu analizler…

Yazar Hakkında
Toplam 173 yazı
Erkan ŞİRİN
Erkan ŞİRİN
2014'ten beri hem akademik alanda hem de sektörde pratik anlamda büyük veri ve veri bilimi ile ilgili çalışmalar yürütmektedir. Büyük veri ve veri bilimi ile ilgili birçok kurum ve şirkete danışmanlık ve eğitimler vermekte, projeler icra etmektedir. Çalışma alanları: büyük veri platformlarının kurulum ve yönetimi, büyük veri üzerinde makine öğrenmesi, olağan dışılık ve sahtecilik tespiti, akan veri işleme ve veri hazırlama sürecidir.
Yorumlar (Yorum yapılmamış)

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara