Machine Learning with Elasticsearch using Eland

Machine Learning with Elasticsearch using Eland#

Introduction to Eland#

Eland is a Python package that provides a pandas-like interface for Elasticsearch data. It allows data scientists and analysts to work with Elasticsearch data using familiar pandas operations, making it easier to analyze and manipulate large datasets stored in Elasticsearch.

Python Libraries for Elasticsearch#

When working with Elasticsearch in Python, you have three main options:

  1. elasticsearch-py: The official low-level Python client for Elasticsearch
  2. elasticsearch-dsl-py: A higher-level, more Pythonic library built on top of elasticsearch-py
  3. eland: A pandas-like interface for Elasticsearch data analysis and machine learning

Each library serves different purposes:

# Using elasticsearch-py (low-level API)
from elasticsearch import Elasticsearch
client = Elasticsearch()

response = client.search(
    index="my-index",
    body={
      "query": {
        "bool": {
          "must": [{"match": {"title": "python"}}],
          "must_not": [{"match": {"description": "beta"}}],
          "filter": [{"term": {"category": "search"}}]
        }
      },
      "aggs" : {
        "per_tag": {
          "terms": {"field": "tags"},
          "aggs": {
            "max_lines": {"max": {"field": "lines"}}
          }
        }
      }
    }
)

# Using elasticsearch-dsl-py (higher-level API)
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

client = Elasticsearch()

s = Search(using=client, index="my-index") \
    .filter("term", category="search") \
    .query("match", title="python")   \
    .exclude("match", description="beta")

s.aggs.bucket('per_tag', 'terms', field='tags') \
    .metric('max_lines', 'max', field='lines')

response = s.execute()

Why Eland?#

  • Seamless integration with pandas
  • Efficient handling of large datasets
  • Native support for Elasticsearch data types
  • Ability to perform complex aggregations
  • Memory-efficient operations
  • Machine learning model deployment capabilities

Getting Started#

Installation#

pip install eland

Basic Connection#

import eland as ed

# Connect to Elasticsearch
es = ed.Elasticsearch("http://localhost:9200")

# Create a DataFrame from an index
df = ed.DataFrame("my_index", es_client=es)

Key Features#

1. Data Loading and Exploration#

Eland makes it easy to load and explore Elasticsearch data:

# Load data from an index
df = ed.DataFrame("my_index", es_client=es)

# Basic information about the dataset
print(df.info())

# First few rows
print(df.head())

2. Data Manipulation#

Eland supports many pandas-like operations:

# Filtering
filtered_df = df[df['age'] > 25]

# Grouping and aggregation
grouped = df.groupby('category').agg({'value': 'mean'})

# Sorting
sorted_df = df.sort_values('timestamp', ascending=False)

Common Use Cases#

1. Data Analysis Pipeline#

# Example of a complete analysis pipeline
def analyze_data():
    # Connect to Elasticsearch
    es = ed.Elasticsearch("http://localhost:9200")
    
    # Load data
    df = ed.DataFrame("my_index", es_client=es)
    
    # Preprocess data
    df_clean = df[df['value'].notna()]
    
    # Perform analysis
    results = df_clean.groupby('category').agg({
        'value': ['mean', 'std', 'count']
    })
    
    return results

2. Time Series Analysis#

# Time series analysis example
def time_series_analysis():
    df = ed.DataFrame("time_series_index", es_client=es)
    
    # Resample by time period
    daily_stats = df.resample('D')['value'].agg(['mean', 'sum'])
    
    # Calculate rolling statistics
    rolling_mean = df['value'].rolling(window=7).mean()
    
    return daily_stats, rolling_mean

Machine Learning with Eland#

Eland’s integration with pandas makes it an excellent tool for machine learning workflows with Elasticsearch data. Here’s how you can leverage Eland for ML tasks:

1. Data Preparation for ML#

import eland as ed
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def prepare_ml_data():
    # Load data from Elasticsearch
    df = ed.DataFrame("ml_dataset", es_client=es)
    
    # Select features and target
    X = df[['feature1', 'feature2', 'feature3']]
    y = df['target']
    
    # Convert to pandas for sklearn compatibility
    X_pd = X.to_pandas()
    y_pd = y.to_pandas()
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X_pd, y_pd, test_size=0.2, random_state=42
    )
    
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    return X_train_scaled, X_test_scaled, y_train, y_test

2. Feature Engineering#

def engineer_features():
    df = ed.DataFrame("ml_dataset", es_client=es)
    
    # Create new features
    df['feature_ratio'] = df['feature1'] / df['feature2']
    df['feature_interaction'] = df['feature1'] * df['feature2']
    
    # Time-based features
    df['hour'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    
    # Categorical encoding
    df['category_encoded'] = df['category'].factorize()[0]
    
    return df

3. Model Training and Evaluation#

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

def train_and_evaluate():
    # Prepare data
    X_train, X_test, y_train, y_test = prepare_ml_data()
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # Evaluate
    y_pred = model.predict(X_test)
    print(classification_report(y_test, y_pred))
    
    return model

4. Model Deployment to Elasticsearch#

One of Eland’s key features is the ability to deploy scikit-learn models to Elasticsearch for inference:

from eland.ml import MLModel

def deploy_model_to_elasticsearch(model, feature_names, model_id="my_model"):
    # Deploy the model to Elasticsearch
    es_model = MLModel.import_model(
        es_client=es,
        model=model,
        model_id=model_id,
        feature_names=feature_names
    )
    return es_model

5. Anomaly Detection Example#

Here’s a practical example of anomaly detection using Eland and scikit-learn:

import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt

def anomaly_detection_example():
    # Generate sample data
    sample_size = 10000
    outlier_fraction = 0.01
    
    # Create a dataset with anomalies
    X = np.random.randn(sample_size, 2)
    outliers = np.random.uniform(low=-4, high=4, size=(int(sample_size * outlier_fraction), 2))
    X = np.vstack([X, outliers])
    
    # Convert to DataFrame
    df = pd.DataFrame(X, columns=['feature_1', 'feature_2'])
    
    # Train Isolation Forest
    iso_forest = IsolationForest(contamination=outlier_fraction, random_state=42)
    iso_forest.fit(df)
    
    # Predict anomalies
    predictions = iso_forest.predict(df)
    
    # Convert to binary labels (1 for normal, 0 for anomaly)
    labels = np.where(predictions == 1, 1, 0)
    
    # Add labels to DataFrame
    df['is_anomaly'] = labels
    
    # Visualize results
    plt.figure(figsize=(10, 6))
    plt.scatter(df['feature_1'], df['feature_2'], c=df['is_anomaly'], cmap='viridis')
    plt.title('Anomaly Detection Results')
    plt.colorbar(label='Is Anomaly')
    plt.show()
    
    return df, iso_forest

Limitations and Considerations for ML with Eland#

While Eland provides powerful capabilities for machine learning with Elasticsearch, there are some important limitations to be aware of:

  1. Supported Model Types

    • Only tree-based models are currently supported for deployment to Elasticsearch:
      • DecisionTreeClassifier
      • DecisionTreeRegressor
      • RandomForestRegressor
      • RandomForestClassifier
      • XGBClassifier
      • XGBRegressor
      • LGBMRegressor
      • LGBMClassifier
    • Unsupervised learning models (like IsolationForest) are not supported
  2. Data Processing Limitations

    • While you can convert pandas DataFrames to Eland DataFrames using ed.pandas_to_eland(), Eland DataFrames can only be used for data processing
    • You cannot train models directly on Eland DataFrames - you need to convert back to pandas using to_pandas()
    • You cannot perform inference directly on data stored in Elasticsearch
  3. Performance Constraints

    • There are limits to how much data you can use for inference at once - large batches may result in timeouts
    • Model deployment requires appropriate Elasticsearch license (X-Pack or Elastic Cloud)
  4. Workflow Considerations

    • For a complete ML pipeline, you’ll need to:
      1. Use Eland for data loading and preprocessing
      2. Convert to pandas for model training
      3. Deploy the trained model back to Elasticsearch for inference
      4. Use the deployed model for predictions

Conclusion#

Eland provides a powerful bridge between Elasticsearch and Python’s data analysis ecosystem. Its pandas-like interface makes it accessible to data scientists while maintaining the power and flexibility of Elasticsearch. By following best practices and understanding its capabilities and limitations, you can efficiently analyze and manipulate large datasets stored in Elasticsearch, and deploy machine learning models for inference.

Resources#