adding versions for polars versus pandas test

This commit is contained in:
Marius Ciepluch 2024-05-16 18:21:42 +02:00
parent e38d2ab4c8
commit f9cdeb781b
2 changed files with 685 additions and 47 deletions

View File

@ -21,6 +21,56 @@
"The memory footprint is assessed, because runtime memory is the limiting factor for the implementations."
]
},
{
"cell_type": "markdown",
"id": "0bd2a29a-68e0-42a6-b46d-13aa95eca26d",
"metadata": {},
"source": [
"## Versions"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "9ea2636c-b07e-44c8-ba5f-aace81dba245",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name: pandas\n",
"Version: 2.1.4\n"
]
}
],
"source": [
"!pip show pandas | grep -E 'Name:|Version:'"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "590ac008-153d-4603-83bc-02f87a7a8d60",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name: polars\n",
"Version: 0.20.26\n"
]
}
],
"source": [
"!pip show polars | grep -E 'Name:|Version:'"
]
},
{
"cell_type": "markdown",
"id": "b7760198-a975-4810-b3d4-25554e4fe3c4",

View File

@ -1,102 +1,690 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "eda4ddbf-c146-45b5-8a40-b9ac90f1465f",
"cell_type": "markdown",
"id": "fa00684b-2e50-4cf5-b8f3-bd28f583391b",
"metadata": {},
"outputs": [],
"source": [
"# Elasticsearch \n",
"# Elasticsearch and tabular integration\n",
"\n",
"\n"
"Elasticsearch is a NoSQL database, which indexes JSON records.\n",
"In the following the Winlog Beat index gets queried, which holds Windows EventLog data.\n",
"The Elasticsearch SQL endpoint is used to define a query, and the resulting data is retrieved as a JSON stream.\n",
"The data gets read into in-memory dataframe objects which allow data-manipulation tasks.\n",
"\n",
"In-memory processing can be difficult if the datasets grow large.\n",
"Therefore a comparison is made between two polular in-memory dataframe libraries:\n",
"\n",
"1.) Pandas\n",
"2.) Polars\n",
"\n",
"The memory footprint is assessed, because runtime memory is the limiting factor for the implementations."
]
},
{
"cell_type": "markdown",
"id": "a4127f02-a190-4d78-bddb-e660d2d55327",
"metadata": {},
"source": [
"## Versions"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 89,
"id": "51cfba52-67a8-4e5c-9afa-89fded16a03a",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name: pandas\n",
"Version: 2.1.4\n"
]
}
],
"source": [
"!pip show pandas | grep -E 'Name:|Version:'"
]
},
{
"cell_type": "code",
"execution_count": 90,
"id": "ad9779b4-8a30-4188-b875-f2bae857e308",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name: polars\n",
"Version: 0.20.26\n"
]
}
],
"source": [
"!pip show polars | grep -E 'Name:|Version:'"
]
},
{
"cell_type": "markdown",
"id": "b7760198-a975-4810-b3d4-25554e4fe3c4",
"metadata": {},
"source": [
"## Elasticsearch API\n",
"\n",
"The Elasticsearch API uses HTTP and is available on port 9200.\n",
"\n",
"The index \"winlogbeat-\" contains data from the period. It's a periodically rotating index.\n",
"\n",
"Here the Elasticsearch DSL is used, and an event timeline is being retrieved, in time-descending order.\n",
"\n",
"The resulting JSON data is piped to the `jq` utility, which is prettier on a command-line.\n",
"Only the first JSON record is analyzed. \n",
"\n",
"The output shows the index and the timestamp."
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "cce35135-52d7-484b-bbae-d1c876836433",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"index\": \"winlogbeat-7.10.0-2024.05.15-000008\",\n",
" \"timestamp\": \"2024-05-15T15:57:22.877Z\"\n",
"}\n"
]
}
],
"source": [
"%%bash\n",
"curl -s -X GET \"http://192.168.20.106:9200/winlogbeat-*/_search\" -H 'Content-Type: application/json' -d '{\n",
" \"size\": 1,\n",
" \"sort\": [\n",
" {\n",
" \"@timestamp\": {\n",
" \"order\": \"desc\"\n",
" }\n",
" }\n",
" ]\n",
"}' | jq '.hits.hits[0] | {index: ._index, timestamp: ._source[\"@timestamp\"]}'\n"
]
},
{
"cell_type": "markdown",
"id": "4e6efd1c-2f22-4f5b-9ad7-b569065f182d",
"metadata": {},
"source": [
"The following Bash command shows a SQL query.\n",
"\n",
"The `Limit 1` is a common SQL statement.\n",
"The output is further limited with the `head` command. Only the first fields of the first record are shown.\n",
"\n",
"By default the order of records doesn't represent a timeline, but the order of records in the index."
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "2d3f97cb-cc71-4d81-ad9c-df11125cd109",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"size\": 1,\n",
" \"_source\": {\n"
]
}
],
"source": [
"%%bash\n",
"curl -s -X POST \"http://192.168.20.106:9200/_sql/translate\" -H 'Content-Type: application/json' -d '{\n",
" \"query\": \"SELECT * FROM \\\"winlogbeat-7.10.0-2024.05.15-*\\\" LIMIT 1\"\n",
"}' | jq | head -n 3\n"
]
},
{
"cell_type": "markdown",
"id": "9e42a51f-e5a0-480d-9e2e-9744a288aef7",
"metadata": {},
"source": [
"## Elasticsearch tabular-integration and Pandas\n",
"\n",
"Pandas is the de-facto standard for data-manipulation of small to medium datasets in Data Science.\n",
"It offers robust functions for in-memory data transactions and tabular feature integration.\n",
"\n",
"In the following the expansion of JSON data is used to allow a simple feature selection for further processing.\n",
"The data is returned from Elasticsearch, from an SQL query.\n",
"\n",
"The data is provided via a Scrolling API, which delivers a portion of the data each time.\n",
"This simplifies batch processing of large datasets."
]
},
{
"cell_type": "code",
"execution_count": 64,
"id": "f8747542-a2d1-4814-8dc2-acf172db2d0c",
"metadata": {
"tags": []
},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Retrieved 1000 documents.\n",
"Retrieved 2000 documents.\n",
"Retrieved 3000 documents.\n",
"Retrieved 4000 documents.\n",
"Retrieved 5000 documents.\n",
"Files have been written.\n"
]
}
],
"source": [
"import requests\n",
"import pandas as pd\n",
"import json\n",
"\n",
"# Function to recursively normalize nested columns in a DataFrame\n",
"def recursively_normalize(data):\n",
" df = pd.json_normalize(data)\n",
" while True:\n",
" nested_cols = [col for col in df.columns if isinstance(df[col].iloc[0], (dict, list))]\n",
" if not nested_cols:\n",
" break\n",
" for col in nested_cols:\n",
" if isinstance(df[col].iloc[0], dict):\n",
" normalized = pd.json_normalize(df[col])\n",
" df = df.drop(columns=[col]).join(normalized)\n",
" elif isinstance(df[col].iloc[0], list):\n",
" df = df.explode(col)\n",
" normalized = pd.json_normalize(df[col])\n",
" df = df.drop(columns=[col]).join(normalized)\n",
" return df\n",
"\n",
"# Function to fetch the next batch using the cursor\n",
"def fetch_next_batch(cursor):\n",
" response = requests.post(\n",
" f\"{base_url}/_sql?format=json\",\n",
" headers={\"Content-Type\": \"application/json\"},\n",
" json={\"cursor\": cursor}\n",
" ).json()\n",
" return response\n",
"\n",
"# Elasticsearch base URL\n",
"base_url = \"http://192.168.20.106:9200\"\n",
"# Index name\n",
"index = \"winlogbeat-*\"\n",
"\n",
"# SQL query for initial search\n",
"sql_query = \"\"\"\n",
"SELECT \"@timestamp\", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM \"winlogbeat-7.10.0-2024.05.15-*\"\n",
"LIMIT 5000\n",
"\"\"\"\n",
"\n",
"# Initial search request to start scrolling\n",
"initial_response = requests.post(\n",
" f\"{base_url}/{index}/_search?scroll=1m\",\n",
" f\"{base_url}/_sql?format=json\",\n",
" headers={\"Content-Type\": \"application/json\"},\n",
" json={\n",
" \"size\": 10000, # Adjust the size as per your requirement\n",
" \"query\": {\"match_all\": {}}\n",
" \"query\": sql_query,\n",
" \"field_multi_value_leniency\": True\n",
" }\n",
").json()\n",
"\n",
"# Extract scroll ID from the initial response\n",
"scroll_id = initial_response[\"_scroll_id\"]\n",
"# Extract the cursor for scrolling\n",
"cursor = initial_response.get('cursor')\n",
"rows = initial_response.get('rows')\n",
"columns = [col['name'] for col in initial_response['columns']]\n",
"\n",
"# Process initial batch of documents\n",
"hits = initial_response[\"hits\"][\"hits\"]\n",
"data = [hit[\"_source\"] for hit in hits]\n",
"# Initialize CSV file (assumes the first batch is not empty)\n",
"if rows:\n",
" df = pd.DataFrame(rows, columns=columns)\n",
" df = recursively_normalize(df.to_dict(orient='records'))\n",
" df.to_csv(\"lab_logs_normal_activity.csv\", mode='w', index=False, header=True)\n",
"\n",
"# Track total documents retrieved\n",
"total_documents_retrieved = len(data)\n",
"total_documents_retrieved = len(rows)\n",
"print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
"\n",
"# Loop to fetch subsequent batches of documents until no more documents are left\n",
"while hits:\n",
" # Fetch next batch of documents using scroll API\n",
" response = requests.post(\n",
" f\"{base_url}/_search/scroll\",\n",
" json={\"scroll\": \"1m\", \"scroll_id\": scroll_id}\n",
" ).json()\n",
"while cursor:\n",
" # Fetch next batch of documents using cursor\n",
" response = fetch_next_batch(cursor)\n",
" \n",
" # Extract scroll ID from the response\n",
" scroll_id = response[\"_scroll_id\"]\n",
" # Update cursor for the next batch\n",
" cursor = response.get('cursor')\n",
" rows = response.get('rows')\n",
" \n",
" # Process batch of documents\n",
" hits = response[\"hits\"][\"hits\"]\n",
" \n",
" # If no hits, break out of the loop\n",
" if not hits:\n",
" # If no rows, break out of the loop\n",
" if not rows:\n",
" break\n",
" \n",
" # Extend data with new batch of documents\n",
" data.extend([hit[\"_source\"] for hit in hits])\n",
" # Normalize data and append to CSV\n",
" df = pd.DataFrame(rows, columns=columns)\n",
" df = recursively_normalize(df.to_dict(orient='records'))\n",
" \n",
" # Append to CSV file without headers\n",
" df.to_csv(\"lab_logs_normal_activity.csv\", mode='a', index=False, header=False)\n",
" \n",
" # Convert DataFrame to JSON, line by line\n",
" json_lines = df.to_json(orient='records', lines=True).splitlines()\n",
" # Append each line to an existing JSON file\n",
" with open(\"lab_logs_normal_activity.json\", 'a') as file:\n",
" for line in json_lines:\n",
" file.write(line + '\\n') # Append each line and add a newline\n",
" \n",
" # Update total documents retrieved\n",
" total_documents_retrieved += len(hits)\n",
" total_documents_retrieved += len(rows)\n",
" \n",
" print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
"\n",
"# Convert data to pandas DataFrame\n",
"df = pd.DataFrame(data)\n",
"\n",
"# Display DataFrame\n",
"print(df)\n"
"print(\"Files have been written.\")\n"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "deb60f70-f62d-4802-8928-5ea18bbc7b3e",
"metadata": {
"tags": []
},
"outputs": [],
"cell_type": "markdown",
"id": "1b236f1c-7060-43a0-b4e7-2b9697114a3e",
"metadata": {},
"source": [
"df.to_json(\"lab_logs_normal_activity_may_6_2024.json\")"
"## Alternative approach with polars\n",
"\n",
"Polars is a newer tabular-integration library, which challenges Pandas. \n",
"It's supposed to me more memory efficient, because it's backend is written in Rust."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9faae732-f464-4d11-924e-aeba4f293def",
"id": "78e37d61-4554-4bbb-99d9-ecbb2e892557",
"metadata": {},
"outputs": [],
"source": [
"%pip install polars"
]
},
{
"cell_type": "code",
"execution_count": 63,
"id": "328b8d13-3cc0-4239-b3e5-d98da9bb51ec",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Retrieved 1000 documents.\n",
"Retrieved 2000 documents.\n",
"Retrieved 3000 documents.\n",
"Retrieved 4000 documents.\n",
"Retrieved 5000 documents.\n",
"Files have been written.\n"
]
}
],
"source": [
"import requests\n",
"import polars as pl\n",
"import json\n",
"\n",
"# Function to recursively unnest nested columns in a DataFrame\n",
"def recursively_unnest(df):\n",
" nested = True\n",
" while nested:\n",
" nested = False\n",
" for col in df.columns:\n",
" if df[col].dtype == pl.List:\n",
" df = df.explode(col)\n",
" nested = True\n",
" elif df[col].dtype == pl.Struct:\n",
" df = df.unnest(col)\n",
" nested = True\n",
" return df\n",
"\n",
"# Function to fetch the next batch using the cursor\n",
"def fetch_next_batch(cursor):\n",
" response = requests.post(\n",
" f\"{base_url}/_sql?format=json\",\n",
" headers={\"Content-Type\": \"application/json\"},\n",
" json={\"cursor\": cursor}\n",
" ).json()\n",
" return response\n",
"\n",
"# Elasticsearch base URL\n",
"base_url = \"http://192.168.20.106:9200\"\n",
"# Index name\n",
"index = \"winlogbeat-*\"\n",
"\n",
"# SQL query for initial search\n",
"sql_query = \"\"\"\n",
"SELECT \"@timestamp\", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM \"winlogbeat-7.10.0-2024.05.15-*\"\n",
"LIMIT 5000\n",
"\"\"\"\n",
"\n",
"# Initial search request to start scrolling\n",
"initial_response = requests.post(\n",
" f\"{base_url}/_sql?format=json\",\n",
" headers={\"Content-Type\": \"application/json\"},\n",
" json={\n",
" \"query\": sql_query,\n",
" \"field_multi_value_leniency\": True\n",
" }\n",
").json()\n",
"\n",
"# Extract the cursor for scrolling\n",
"cursor = initial_response.get('cursor')\n",
"rows = initial_response.get('rows')\n",
"columns = [col['name'] for col in initial_response['columns']]\n",
"\n",
"# Initialize CSV file (assumes the first batch is not empty)\n",
"if rows:\n",
" df = pl.DataFrame(rows, schema=columns)\n",
" df = recursively_unnest(df)\n",
" df.write_csv(\"lab_logs_normal_activity.csv\", include_header=True)\n",
"\n",
"# Track total documents retrieved\n",
"total_documents_retrieved = len(rows)\n",
"print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
"\n",
"# Loop to fetch subsequent batches of documents until no more documents are left\n",
"while cursor:\n",
" # Fetch next batch of documents using cursor\n",
" response = fetch_next_batch(cursor)\n",
" \n",
" # Update cursor for the next batch\n",
" cursor = response.get('cursor')\n",
" rows = response.get('rows')\n",
" \n",
" # If no rows, break out of the loop\n",
" if not rows:\n",
" break\n",
" \n",
" # Normalize data and append to CSV\n",
" df = pl.DataFrame(rows, schema=columns)\n",
" df = recursively_unnest(df)\n",
" \n",
" # Manually write the CSV to avoid headers\n",
" with open(\"lab_logs_normal_activity.csv\", 'a') as f:\n",
" df.write_csv(f, include_header=False)\n",
" \n",
" # Convert DataFrame to JSON, line by line\n",
" json_lines = [json.dumps(record) for record in df.to_dicts()]\n",
" # Append each line to an existing JSON file\n",
" with open(\"lab_logs_normal_activity.json\", 'a') as file:\n",
" for line in json_lines:\n",
" file.write(line + '\\n') # Append each line and add a newline\n",
" \n",
" # Update total documents retrieved\n",
" total_documents_retrieved += len(rows)\n",
" \n",
" print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
"\n",
"print(\"Files have been written.\")\n"
]
},
{
"cell_type": "markdown",
"id": "3dd720a7-c716-4d41-9ab4-37652acca137",
"metadata": {
"tags": []
},
"source": [
"## Memory footprint comparison"
]
},
{
"cell_type": "code",
"execution_count": 74,
"id": "eefffe2a-f61c-47c8-90e3-d0de0ab932d6",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"@timestamp object\n",
"host.hostname object\n",
"host.ip object\n",
"log.level object\n",
"winlog.event_id int64\n",
"winlog.task object\n",
"message object\n",
"dtype: object\n"
]
}
],
"source": [
"import pandas as pd\n",
"\n",
"# Read a small chunk of the JSON file\n",
"file_path = \"lab_logs_normal_activity.json\"\n",
"pd_df = pd.read_json(file_path, lines=True, nrows=10)\n",
"\n",
"print(pd_df.dtypes)"
]
},
{
"cell_type": "code",
"execution_count": 69,
"id": "0b2be27e-a56c-411b-bbff-dc42e533ca80",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'@timestamp': String, 'host.hostname': String, 'host.ip': String, 'log.level': String, 'winlog.event_id': Int64, 'winlog.task': String, 'message': String}\n"
]
}
],
"source": [
"import polars as pl\n",
"\n",
"# Define the mapping from Pandas dtype to Polars dtype\n",
"dtype_mapping = {\n",
" \"object\": pl.Utf8,\n",
" \"int64\": pl.Int64,\n",
" \"float64\": pl.Float64,\n",
" # Add more mappings if needed\n",
"}\n",
"\n",
"# Generate the schema for Polars from Pandas dtype\n",
"schema = {col: dtype_mapping[str(dtype)] for col, dtype in pd_df.dtypes.items()}\n",
"print(schema)\n"
]
},
{
"cell_type": "code",
"execution_count": 78,
"id": "5ccc9d58-8e27-43d0-bf69-7f2ff44c9874",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"shape: (8_000, 7)\n",
"┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐\n",
"│ @timestamp ┆ host.hostna ┆ host.ip ┆ log.level ┆ winlog.even ┆ winlog.task ┆ message │\n",
"│ --- ┆ me ┆ --- ┆ --- ┆ t_id ┆ --- ┆ --- │\n",
"│ str ┆ --- ┆ str ┆ str ┆ --- ┆ str ┆ str │\n",
"│ ┆ str ┆ ┆ ┆ i64 ┆ ┆ │\n",
"╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
"│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
"│ 6:10:07.128Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
"│ 6:10:07.136Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
"│ 6:10:07.136Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
"│ 6:10:07.149Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
"│ 6:10:07.149Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
"└──────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘\n",
"Pandas DataFarme number of rows: 8000\n",
"Polars DataFrame memory usage: 4.76 MB\n"
]
}
],
"source": [
"# Read the JSON file using the defined schema\n",
"lazy_df = pl.scan_ndjson(file_path)\n",
"\n",
"# Collect the LazyFrame to a DataFrame\n",
"pl_df = lazy_df.collect()\n",
"\n",
"# Convert columns to the correct data types according to the schema\n",
"pl_df = pl_df.with_columns([pl.col(col).cast(dtype) for col, dtype in schema.items()])\n",
"\n",
"# Print the DataFrame and its memory usage\n",
"print(pl_df)\n",
"\n",
"num_rows_polars = pl_df.shape[0]\n",
"\n",
"print(f\"Pandas DataFarme number of rows: {num_rows_polars}\")\n",
"print(f\"Polars DataFrame memory usage: {pl_df.estimated_size() / (1024 ** 2):.2f} MB\")"
]
},
{
"cell_type": "code",
"execution_count": 79,
"id": "547f7253-cd62-44c6-8d7a-840dab2dbbbd",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" @timestamp host.hostname host.ip \\\n",
"0 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
"1 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
"2 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
"3 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
"4 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
"... ... ... ... \n",
"7995 2024-05-15T16:10:07.128Z win10 fe80::24b4:3691:44a6:38a1 \n",
"7996 2024-05-15T16:10:07.136Z win10 fe80::24b4:3691:44a6:38a1 \n",
"7997 2024-05-15T16:10:07.136Z win10 fe80::24b4:3691:44a6:38a1 \n",
"7998 2024-05-15T16:10:07.149Z win10 fe80::24b4:3691:44a6:38a1 \n",
"7999 2024-05-15T16:10:07.149Z win10 fe80::24b4:3691:44a6:38a1 \n",
"\n",
" log.level winlog.event_id winlog.task \\\n",
"0 information 13 Registry value set (rule: RegistryEvent) \n",
"1 information 13 Registry value set (rule: RegistryEvent) \n",
"2 information 13 Registry value set (rule: RegistryEvent) \n",
"3 information 13 Registry value set (rule: RegistryEvent) \n",
"4 information 13 Registry value set (rule: RegistryEvent) \n",
"... ... ... ... \n",
"7995 information 4663 Removable Storage \n",
"7996 information 4663 Removable Storage \n",
"7997 information 4663 Removable Storage \n",
"7998 information 4663 Removable Storage \n",
"7999 information 4663 Removable Storage \n",
"\n",
" message \n",
"0 Registry value set:\\nRuleName: InvDB-Ver\\nEven... \n",
"1 Registry value set:\\nRuleName: InvDB-Path\\nEve... \n",
"2 Registry value set:\\nRuleName: InvDB-Pub\\nEven... \n",
"3 Registry value set:\\nRuleName: InvDB-CompileTi... \n",
"4 Registry value set:\\nRuleName: InvDB-Ver\\nEven... \n",
"... ... \n",
"7995 An attempt was made to access an object.\\n\\nSu... \n",
"7996 An attempt was made to access an object.\\n\\nSu... \n",
"7997 An attempt was made to access an object.\\n\\nSu... \n",
"7998 An attempt was made to access an object.\\n\\nSu... \n",
"7999 An attempt was made to access an object.\\n\\nSu... \n",
"\n",
"[8000 rows x 7 columns]\n",
"Pandas DataFarme number of rows: 8000\n",
"Pandas DataFrame memory usage: 7.56 MB\n"
]
}
],
"source": [
"# Load the JSON file into a Pandas DataFrame\n",
"pd_df = pd.read_json(file_path, lines=True)\n",
"pd_memory_usage = pd_df.memory_usage(deep=True).sum()\n",
"\n",
"# Get the number of rows in the Pandas DataFrame\n",
"num_rows_pandas = pd_df.shape[0]\n",
"\n",
"print(pd_df)\n",
"\n",
"print(f\"Pandas DataFarme number of rows: {num_rows_pandas}\")\n",
"print(f\"Pandas DataFrame memory usage: {pd_memory_usage / (1024 ** 2):.2f} MB\")"
]
},
{
"cell_type": "markdown",
"id": "04937c37-16b4-4e03-9cf2-ac704e48f60e",
"metadata": {},
"source": [
"# Result\n",
"\n",
"Polars and Pandas borth processed the same data (8000 rows, categorical data represented as strings).\n",
"\n",
"Polars needed about 40% less memory. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e910ef56-2f6a-44f6-9db2-62c2aedc9a49",
"metadata": {},
"outputs": [],
"source": []