# Elasticsearch and tabular integration

Elasticsearch is a NoSQL database, which indexes JSON records.
In the following the Winlog Beat index gets queried, which holds Windows EventLog data.
The Elasticsearch SQL endpoint is used to define a query, and the resulting data is retrieved as a JSON stream.
The data gets read into in-memory dataframe objects which allow data-manipulation tasks.

In-memory processing can be difficult if the datasets grow large.
Therefore a comparison is made between two polular in-memory dataframe libraries:

1.) Pandas
2.) Polars

The memory footprint is assessed, because runtime memory is the limiting factor for the implementations.

## Versions

In [1]:
!pip show pandas | grep -E 'Name:|Version:'

Name: pandas
Version: 2.1.4


In [2]:
!pip show polars | grep -E 'Name:|Version:'

Name: polars
Version: 0.20.26


## Elasticsearch API

The Elasticsearch API uses HTTP and is available on port 9200.

The index "winlogbeat-" contains data from the period. It's a periodically rotating index.

Here the Elasticsearch DSL is used, and an event timeline is being retrieved, in time-descending order.

The resulting JSON data is piped to the `jq` utility, which is prettier on a command-line.
Only the first JSON record is analyzed. 

The output shows the index and the timestamp.

In [21]:
%%bash
curl -s -X GET "http://192.168.20.106:9200/winlogbeat-*/_search" -H 'Content-Type: application/json' -d '{
  "size": 1,
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}' | jq '.hits.hits[0] | {index: ._index, timestamp: ._source["@timestamp"]}'


{
  "index": "winlogbeat-7.10.0-2024.05.15-000008",
  "timestamp": "2024-05-15T15:57:22.877Z"
}


The following Bash command shows a SQL query.

The `Limit 1` is a common SQL statement.
The output is further limited with the `head` command. Only the first fields of the first record are shown.

By default the order of records doesn't represent a timeline, but the order of records in the index.

In [27]:
%%bash
curl -s -X POST "http://192.168.20.106:9200/_sql/translate" -H 'Content-Type: application/json' -d '{
  "query": "SELECT * FROM \"winlogbeat-7.10.0-2024.05.15-*\" LIMIT 1"
}' | jq | head -n 3


{
  "size": 1,
  "_source": {


## Elasticsearch tabular-integration and Pandas

Pandas is the de-facto standard for data-manipulation of small to medium datasets in Data Science.
It offers robust functions for in-memory data transactions and tabular feature integration.

In the following the expansion of JSON data is used to allow a simple feature selection for further processing.
The data is returned from Elasticsearch, from an SQL query.

The data is provided via a Scrolling API, which delivers a portion of the data each time.
This simplifies batch processing of large datasets.

In [64]:
import requests
import pandas as pd
import json

# Function to recursively normalize nested columns in a DataFrame
def recursively_normalize(data):
    df = pd.json_normalize(data)
    while True:
        nested_cols = [col for col in df.columns if isinstance(df[col].iloc[0], (dict, list))]
        if not nested_cols:
            break
        for col in nested_cols:
            if isinstance(df[col].iloc[0], dict):
                normalized = pd.json_normalize(df[col])
                df = df.drop(columns=[col]).join(normalized)
            elif isinstance(df[col].iloc[0], list):
                df = df.explode(col)
                normalized = pd.json_normalize(df[col])
                df = df.drop(columns=[col]).join(normalized)
    return df

# Function to fetch the next batch using the cursor
def fetch_next_batch(cursor):
    response = requests.post(
        f"{base_url}/_sql?format=json",
        headers={"Content-Type": "application/json"},
        json={"cursor": cursor}
    ).json()
    return response

# Elasticsearch base URL
base_url = "http://192.168.20.106:9200"
# Index name
index = "winlogbeat-*"

# SQL query for initial search
sql_query = """
SELECT "@timestamp", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM "winlogbeat-7.10.0-2024.05.15-*"
LIMIT 5000
"""

# Initial search request to start scrolling
initial_response = requests.post(
    f"{base_url}/_sql?format=json",
    headers={"Content-Type": "application/json"},
    json={
        "query": sql_query,
        "field_multi_value_leniency": True
    }
).json()

# Extract the cursor for scrolling
cursor = initial_response.get('cursor')
rows = initial_response.get('rows')
columns = [col['name'] for col in initial_response['columns']]

# Initialize CSV file (assumes the first batch is not empty)
if rows:
    df = pd.DataFrame(rows, columns=columns)
    df = recursively_normalize(df.to_dict(orient='records'))
    df.to_csv("lab_logs_normal_activity.csv", mode='w', index=False, header=True)

# Track total documents retrieved
total_documents_retrieved = len(rows)
print(f"Retrieved {total_documents_retrieved} documents.")

# Loop to fetch subsequent batches of documents until no more documents are left
while cursor:
    # Fetch next batch of documents using cursor
    response = fetch_next_batch(cursor)
    
    # Update cursor for the next batch
    cursor = response.get('cursor')
    rows = response.get('rows')
    
    # If no rows, break out of the loop
    if not rows:
        break
    
    # Normalize data and append to CSV
    df = pd.DataFrame(rows, columns=columns)
    df = recursively_normalize(df.to_dict(orient='records'))
    
    # Append to CSV file without headers
    df.to_csv("lab_logs_normal_activity.csv", mode='a', index=False, header=False)
    
    # Convert DataFrame to JSON, line by line
    json_lines = df.to_json(orient='records', lines=True).splitlines()
    # Append each line to an existing JSON file
    with open("lab_logs_normal_activity.json", 'a') as file:
        for line in json_lines:
            file.write(line + '\n')  # Append each line and add a newline
        
    # Update total documents retrieved
    total_documents_retrieved += len(rows)
    
    print(f"Retrieved {total_documents_retrieved} documents.")

print("Files have been written.")


Retrieved 1000 documents.
Retrieved 2000 documents.
Retrieved 3000 documents.
Retrieved 4000 documents.
Retrieved 5000 documents.
Files have been written.


## Alternative approach with polars

Polars is a newer tabular-integration library, which challenges Pandas. 
It's supposed to me more memory efficient, because it's backend is written in Rust.

In [None]:
%pip install polars

In [63]:
import requests
import polars as pl
import json

# Function to recursively unnest nested columns in a DataFrame
def recursively_unnest(df):
    nested = True
    while nested:
        nested = False
        for col in df.columns:
            if df[col].dtype == pl.List:
                df = df.explode(col)
                nested = True
            elif df[col].dtype == pl.Struct:
                df = df.unnest(col)
                nested = True
    return df

# Function to fetch the next batch using the cursor
def fetch_next_batch(cursor):
    response = requests.post(
        f"{base_url}/_sql?format=json",
        headers={"Content-Type": "application/json"},
        json={"cursor": cursor}
    ).json()
    return response

# Elasticsearch base URL
base_url = "http://192.168.20.106:9200"
# Index name
index = "winlogbeat-*"

# SQL query for initial search
sql_query = """
SELECT "@timestamp", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM "winlogbeat-7.10.0-2024.05.15-*"
LIMIT 5000
"""

# Initial search request to start scrolling
initial_response = requests.post(
    f"{base_url}/_sql?format=json",
    headers={"Content-Type": "application/json"},
    json={
        "query": sql_query,
        "field_multi_value_leniency": True
    }
).json()

# Extract the cursor for scrolling
cursor = initial_response.get('cursor')
rows = initial_response.get('rows')
columns = [col['name'] for col in initial_response['columns']]

# Initialize CSV file (assumes the first batch is not empty)
if rows:
    df = pl.DataFrame(rows, schema=columns)
    df = recursively_unnest(df)
    df.write_csv("lab_logs_normal_activity.csv", include_header=True)

# Track total documents retrieved
total_documents_retrieved = len(rows)
print(f"Retrieved {total_documents_retrieved} documents.")

# Loop to fetch subsequent batches of documents until no more documents are left
while cursor:
    # Fetch next batch of documents using cursor
    response = fetch_next_batch(cursor)
    
    # Update cursor for the next batch
    cursor = response.get('cursor')
    rows = response.get('rows')
    
    # If no rows, break out of the loop
    if not rows:
        break
    
    # Normalize data and append to CSV
    df = pl.DataFrame(rows, schema=columns)
    df = recursively_unnest(df)
    
    # Manually write the CSV to avoid headers
    with open("lab_logs_normal_activity.csv", 'a') as f:
        df.write_csv(f, include_header=False)
    
    # Convert DataFrame to JSON, line by line
    json_lines = [json.dumps(record) for record in df.to_dicts()]
    # Append each line to an existing JSON file
    with open("lab_logs_normal_activity.json", 'a') as file:
        for line in json_lines:
            file.write(line + '\n')  # Append each line and add a newline
        
    # Update total documents retrieved
    total_documents_retrieved += len(rows)
    
    print(f"Retrieved {total_documents_retrieved} documents.")

print("Files have been written.")


Retrieved 1000 documents.
Retrieved 2000 documents.
Retrieved 3000 documents.
Retrieved 4000 documents.
Retrieved 5000 documents.
Files have been written.


## Memory footprint and profile comparison

A JSON schema is provided in both cases to improve the comparison.

In [27]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])?  y


In [None]:
!pip install git+https://github.com/H4dr1en/jupyterflame.git

In [None]:
# directly on the shell within the conda env: conda install -y perl

In [28]:
%load_ext jupyterflame

The jupyterflame extension is already loaded. To reload it, use:
  %reload_ext jupyterflame


In [29]:
import pandas as pd

# Read a small chunk of the JSON file
file_path = "lab_logs_normal_activity.json"
pd_df = pd.read_json(file_path, lines=True, nrows=10)

print(pd_df.dtypes)

@timestamp         object
host.hostname      object
host.ip            object
log.level          object
winlog.event_id     int64
winlog.task        object
message            object
dtype: object


In [46]:
import polars as pl

# Define the mapping from Pandas dtype to Polars dtype
dtype_mapping = {
    "object": pl.Utf8,
    "int64": pl.Int64,
    "float64": pl.Float64,
    # Add more mappings if needed
}

pandas_dtype_mapping = {
    "object": "str",
    "int64": "int64",
    "float64": "float64",
    # Add more mappings if needed
}


# Generate the schema for Polars from Pandas dtype
polars_schema = {col: dtype_mapping[str(dtype)] for col, dtype in pd_df.dtypes.items()}
print("Polars Schema:", polars_schema)

pandas_schema = {col: pandas_dtype_mapping[str(dtype)] for col, dtype in pd_df.dtypes.items()}
print("Pandas Schema:", pandas_schema)

Polars Schema: {'@timestamp': String, 'host.hostname': String, 'host.ip': String, 'log.level': String, 'winlog.event_id': Int64, 'winlog.task': String, 'message': String}
Pandas Schema: {'@timestamp': 'str', 'host.hostname': 'str', 'host.ip': 'str', 'log.level': 'str', 'winlog.event_id': 'int64', 'winlog.task': 'str', 'message': 'str'}


In [53]:
def test_polars():
    # Read the JSON file using the defined schema
    lazy_df = pl.scan_ndjson(file_path)

    # Collect the LazyFrame to a DataFrame
    pl_df = lazy_df.collect()

    # Convert columns to the correct data types according to the schema
    pl_df = pl_df.with_columns([pl.col(col).cast(dtype) for col, dtype in polars_schema.items()])

    # Print the DataFrame and its memory usage
    print(pl_df)

    num_rows_polars = pl_df.shape[0]

    print(f"Polars DataFarme number of rows:  {num_rows_polars}")
    print(f"Polars DataFrame memory usage: {pl_df.estimated_size() / (1024 ** 2):.2f} MB")

In [48]:
%%flame -q --inverted
test_polars()

shape: (8_000, 7)
┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ @timestamp   ┆ host.hostna ┆ host.ip     ┆ log.level   ┆ winlog.even ┆ winlog.task ┆ message     │
│ ---          ┆ me          ┆ ---         ┆ ---         ┆ t_id        ┆ ---         ┆ ---         │
│ str          ┆ ---         ┆ str         ┆ str         ┆ ---         ┆ str         ┆ str         │
│              ┆ str         ┆             ┆             ┆ i64         ┆             ┆             │
╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡
│ 2024-05-15T1 ┆ win10       ┆ fe80::24b4: ┆ information ┆ 13          ┆ Registry    ┆ Registry    │
│ 5:57:18.471Z ┆             ┆ 3691:44a6:3 ┆             ┆             ┆ value set   ┆ value set:  │
│              ┆             ┆ 8a1         ┆             ┆             ┆ (rule:      ┆ RuleName: … │
│              ┆             ┆             ┆             ┆             ┆ 

In [49]:
%%prun
test_polars()

shape: (8_000, 7)
┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ @timestamp   ┆ host.hostna ┆ host.ip     ┆ log.level   ┆ winlog.even ┆ winlog.task ┆ message     │
│ ---          ┆ me          ┆ ---         ┆ ---         ┆ t_id        ┆ ---         ┆ ---         │
│ str          ┆ ---         ┆ str         ┆ str         ┆ ---         ┆ str         ┆ str         │
│              ┆ str         ┆             ┆             ┆ i64         ┆             ┆             │
╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡
│ 2024-05-15T1 ┆ win10       ┆ fe80::24b4: ┆ information ┆ 13          ┆ Registry    ┆ Registry    │
│ 5:57:18.471Z ┆             ┆ 3691:44a6:3 ┆             ┆             ┆ value set   ┆ value set:  │
│              ┆             ┆ 8a1         ┆             ┆             ┆ (rule:      ┆ RuleName: … │
│              ┆             ┆             ┆             ┆             ┆ 

         256 function calls (253 primitive calls) in 0.020 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        2    0.014    0.007    0.014    0.007 {method 'collect' of 'builtins.PyLazyFrame' objects}
        1    0.003    0.003    0.003    0.003 {built-in method new_from_ndjson}
        2    0.001    0.001    0.001    0.001 {built-in method posix.stat}
        1    0.000    0.000    0.000    0.000 {method 'as_str' of 'builtins.PyDataFrame' objects}
        1    0.000    0.000    0.020    0.020 <string>:1(<module>)
        1    0.000    0.000    0.020    0.020 2832609216.py:1(test_polars)
        1    0.000    0.000    0.000    0.000 socket.py:543(send)
        2    0.000    0.000    0.000    0.000 wrap.py:12(wrap_df)
        6    0.000    0.000    0.000    0.000 iostream.py:610(write)
        1    0.000    0.000    0.020    0.020 {built-in method builtins.exec}
        2    0.000    0.000    0.014    0.007 frame.py:16

In [50]:
def test_pandas():
    # Load the JSON file into a Pandas DataFrame
    pd_df = pd.read_json(file_path, lines=True, dtype=pandas_schema)
    pd_memory_usage = pd_df.memory_usage(deep=True).sum()

    # Get the number of rows in the Pandas DataFrame
    num_rows_pandas = pd_df.shape[0]

    print(pd_df)

    print(f"Pandas DataFarme number of rows:  {num_rows_pandas}")
    print(f"Pandas DataFrame memory usage: {pd_memory_usage / (1024 ** 2):.2f} MB")    


In [51]:
%%flame -q --inverted
test_pandas()

                    @timestamp host.hostname                    host.ip  \
0     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
1     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
2     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
3     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
4     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
...                        ...           ...                        ...   
7995  2024-05-15T16:10:07.128Z         win10  fe80::24b4:3691:44a6:38a1   
7996  2024-05-15T16:10:07.136Z         win10  fe80::24b4:3691:44a6:38a1   
7997  2024-05-15T16:10:07.136Z         win10  fe80::24b4:3691:44a6:38a1   
7998  2024-05-15T16:10:07.149Z         win10  fe80::24b4:3691:44a6:38a1   
7999  2024-05-15T16:10:07.149Z         win10  fe80::24b4:3691:44a6:38a1   

        log.level  winlog.event_id                               winlog.task  \
0     information  

In [52]:
%%prun
test_pandas()

                    @timestamp host.hostname                    host.ip  \
0     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
1     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
2     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
3     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
4     2024-05-15T15:57:18.471Z         win10  fe80::24b4:3691:44a6:38a1   
...                        ...           ...                        ...   
7995  2024-05-15T16:10:07.128Z         win10  fe80::24b4:3691:44a6:38a1   
7996  2024-05-15T16:10:07.136Z         win10  fe80::24b4:3691:44a6:38a1   
7997  2024-05-15T16:10:07.136Z         win10  fe80::24b4:3691:44a6:38a1   
7998  2024-05-15T16:10:07.149Z         win10  fe80::24b4:3691:44a6:38a1   
7999  2024-05-15T16:10:07.149Z         win10  fe80::24b4:3691:44a6:38a1   

        log.level  winlog.event_id                               winlog.task  \
0     information  

         46681 function calls (46472 primitive calls) in 0.113 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.029    0.029    0.029    0.029 {built-in method pandas._libs.json.ujson_loads}
        6    0.010    0.002    0.010    0.002 {pandas._libs.lib.memory_usage_of_objects}
        1    0.009    0.009    0.012    0.012 {method 'read' of '_io.TextIOWrapper' objects}
     8001    0.005    0.000    0.005    0.000 construction.py:915(<genexpr>)
        1    0.005    0.005    0.005    0.005 {pandas._libs.lib.dicts_to_array}
       94    0.004    0.000    0.004    0.000 {method 'split' of 'str' objects}
        1    0.003    0.003    0.009    0.009 {pandas._libs.lib.fast_unique_multiple_list_gen}
        1    0.003    0.003    0.010    0.010 _json.py:960(_combine_lines)
        1    0.003    0.003    0.003    0.003 {built-in method _codecs.utf_8_decode}
     64/8    0.003    0.000    0.003    0.000 {method 'joi

# Results

Polars and Pandas both processed the same data (8000 rows, categorical data represented as strings).


## Versions


* Pandas: 2.1.4
* Polars: 0.20.26

## Memory usage comparison

File on disk: 6,0 MB (du -sh), 8000 rows, 7 columns. 

* Polars: 4,76 MB
* Pandas: 7,56 MB

-> Polars was more memory efficient: ~ 1,6 times less memory


## Profile comparison

* Polars: 256 function calls (253 primitive calls) in 0.020 seconds
* Pandas: 46681 function calls (46472 primitive calls) in 0.113 seconds

-> Polars was ~ 5,6 times faster and needed ~ 180x less function and primitive calls. 


## Conclusion

Polars should be used whenever possible.