mirror of
https://github.com/norandom/log2ml.git
synced 2025-01-22 14:13:43 +00:00
Polars vs Pandas for categorical log data, memory footprint analysis
This commit is contained in:
parent
f33270e60a
commit
8cea2d2eb5
655
Elasticsearch-Pandas-vs-Polars-May-15-2024.ipynb
Normal file
655
Elasticsearch-Pandas-vs-Polars-May-15-2024.ipynb
Normal file
@ -0,0 +1,655 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "fa00684b-2e50-4cf5-b8f3-bd28f583391b",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Elasticsearch and tabular integration\n",
|
||||
"\n",
|
||||
"Elasticsearch is a NoSQL database, which indexes JSON records.\n",
|
||||
"In the following the Winlog Beat index gets queried, which holds Windows EventLog data.\n",
|
||||
"The Elasticsearch SQL endpoint is used to define a query, and the resulting data is retrieved as a JSON stream.\n",
|
||||
"The data gets read into in-memory dataframe objects which allow data-manipulation tasks.\n",
|
||||
"\n",
|
||||
"In-memory processing can be difficult if the datasets grow large.\n",
|
||||
"Therefore a comparison is made between two polular in-memory dataframe libraries:\n",
|
||||
"\n",
|
||||
"1.) Pandas\n",
|
||||
"2.) Polars\n",
|
||||
"\n",
|
||||
"The memory footprint is assessed, because runtime memory is the limiting factor for the implementations."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "b7760198-a975-4810-b3d4-25554e4fe3c4",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Elasticsearch API\n",
|
||||
"\n",
|
||||
"The Elasticsearch API uses HTTP and is available on port 9200.\n",
|
||||
"\n",
|
||||
"The index \"winlogbeat-\" contains data from the period. It's a periodically rotating index.\n",
|
||||
"\n",
|
||||
"Here the Elasticsearch DSL is used, and an event timeline is being retrieved, in time-descending order.\n",
|
||||
"\n",
|
||||
"The resulting JSON data is piped to the `jq` utility, which is prettier on a command-line.\n",
|
||||
"Only the first JSON record is analyzed. \n",
|
||||
"\n",
|
||||
"The output shows the index and the timestamp."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 21,
|
||||
"id": "cce35135-52d7-484b-bbae-d1c876836433",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{\n",
|
||||
" \"index\": \"winlogbeat-7.10.0-2024.05.15-000008\",\n",
|
||||
" \"timestamp\": \"2024-05-15T15:57:22.877Z\"\n",
|
||||
"}\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"%%bash\n",
|
||||
"curl -s -X GET \"http://192.168.20.106:9200/winlogbeat-*/_search\" -H 'Content-Type: application/json' -d '{\n",
|
||||
" \"size\": 1,\n",
|
||||
" \"sort\": [\n",
|
||||
" {\n",
|
||||
" \"@timestamp\": {\n",
|
||||
" \"order\": \"desc\"\n",
|
||||
" }\n",
|
||||
" }\n",
|
||||
" ]\n",
|
||||
"}' | jq '.hits.hits[0] | {index: ._index, timestamp: ._source[\"@timestamp\"]}'\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "4e6efd1c-2f22-4f5b-9ad7-b569065f182d",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The following Bash command shows a SQL query.\n",
|
||||
"\n",
|
||||
"The `Limit 1` is a common SQL statement.\n",
|
||||
"The output is further limited with the `head` command. Only the first fields of the first record are shown.\n",
|
||||
"\n",
|
||||
"By default the order of records doesn't represent a timeline, but the order of records in the index."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 27,
|
||||
"id": "2d3f97cb-cc71-4d81-ad9c-df11125cd109",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{\n",
|
||||
" \"size\": 1,\n",
|
||||
" \"_source\": {\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"%%bash\n",
|
||||
"curl -s -X POST \"http://192.168.20.106:9200/_sql/translate\" -H 'Content-Type: application/json' -d '{\n",
|
||||
" \"query\": \"SELECT * FROM \\\"winlogbeat-7.10.0-2024.05.15-*\\\" LIMIT 1\"\n",
|
||||
"}' | jq | head -n 3\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "9e42a51f-e5a0-480d-9e2e-9744a288aef7",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Elasticsearch tabular-integration and Pandas\n",
|
||||
"\n",
|
||||
"Pandas is the de-facto standard for data-manipulation of small to medium datasets in Data Science.\n",
|
||||
"It offers robust functions for in-memory data transactions and tabular feature integration.\n",
|
||||
"\n",
|
||||
"In the following the expansion of JSON data is used to allow a simple feature selection for further processing.\n",
|
||||
"The data is returned from Elasticsearch, from an SQL query.\n",
|
||||
"\n",
|
||||
"The data is provided via a Scrolling API, which delivers a portion of the data each time.\n",
|
||||
"This simplifies batch processing of large datasets."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 64,
|
||||
"id": "f8747542-a2d1-4814-8dc2-acf172db2d0c",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Retrieved 1000 documents.\n",
|
||||
"Retrieved 2000 documents.\n",
|
||||
"Retrieved 3000 documents.\n",
|
||||
"Retrieved 4000 documents.\n",
|
||||
"Retrieved 5000 documents.\n",
|
||||
"Files have been written.\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"import requests\n",
|
||||
"import pandas as pd\n",
|
||||
"import json\n",
|
||||
"\n",
|
||||
"# Function to recursively normalize nested columns in a DataFrame\n",
|
||||
"def recursively_normalize(data):\n",
|
||||
" df = pd.json_normalize(data)\n",
|
||||
" while True:\n",
|
||||
" nested_cols = [col for col in df.columns if isinstance(df[col].iloc[0], (dict, list))]\n",
|
||||
" if not nested_cols:\n",
|
||||
" break\n",
|
||||
" for col in nested_cols:\n",
|
||||
" if isinstance(df[col].iloc[0], dict):\n",
|
||||
" normalized = pd.json_normalize(df[col])\n",
|
||||
" df = df.drop(columns=[col]).join(normalized)\n",
|
||||
" elif isinstance(df[col].iloc[0], list):\n",
|
||||
" df = df.explode(col)\n",
|
||||
" normalized = pd.json_normalize(df[col])\n",
|
||||
" df = df.drop(columns=[col]).join(normalized)\n",
|
||||
" return df\n",
|
||||
"\n",
|
||||
"# Function to fetch the next batch using the cursor\n",
|
||||
"def fetch_next_batch(cursor):\n",
|
||||
" response = requests.post(\n",
|
||||
" f\"{base_url}/_sql?format=json\",\n",
|
||||
" headers={\"Content-Type\": \"application/json\"},\n",
|
||||
" json={\"cursor\": cursor}\n",
|
||||
" ).json()\n",
|
||||
" return response\n",
|
||||
"\n",
|
||||
"# Elasticsearch base URL\n",
|
||||
"base_url = \"http://192.168.20.106:9200\"\n",
|
||||
"# Index name\n",
|
||||
"index = \"winlogbeat-*\"\n",
|
||||
"\n",
|
||||
"# SQL query for initial search\n",
|
||||
"sql_query = \"\"\"\n",
|
||||
"SELECT \"@timestamp\", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM \"winlogbeat-7.10.0-2024.05.15-*\"\n",
|
||||
"LIMIT 5000\n",
|
||||
"\"\"\"\n",
|
||||
"\n",
|
||||
"# Initial search request to start scrolling\n",
|
||||
"initial_response = requests.post(\n",
|
||||
" f\"{base_url}/_sql?format=json\",\n",
|
||||
" headers={\"Content-Type\": \"application/json\"},\n",
|
||||
" json={\n",
|
||||
" \"query\": sql_query,\n",
|
||||
" \"field_multi_value_leniency\": True\n",
|
||||
" }\n",
|
||||
").json()\n",
|
||||
"\n",
|
||||
"# Extract the cursor for scrolling\n",
|
||||
"cursor = initial_response.get('cursor')\n",
|
||||
"rows = initial_response.get('rows')\n",
|
||||
"columns = [col['name'] for col in initial_response['columns']]\n",
|
||||
"\n",
|
||||
"# Initialize CSV file (assumes the first batch is not empty)\n",
|
||||
"if rows:\n",
|
||||
" df = pd.DataFrame(rows, columns=columns)\n",
|
||||
" df = recursively_normalize(df.to_dict(orient='records'))\n",
|
||||
" df.to_csv(\"lab_logs_normal_activity.csv\", mode='w', index=False, header=True)\n",
|
||||
"\n",
|
||||
"# Track total documents retrieved\n",
|
||||
"total_documents_retrieved = len(rows)\n",
|
||||
"print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
|
||||
"\n",
|
||||
"# Loop to fetch subsequent batches of documents until no more documents are left\n",
|
||||
"while cursor:\n",
|
||||
" # Fetch next batch of documents using cursor\n",
|
||||
" response = fetch_next_batch(cursor)\n",
|
||||
" \n",
|
||||
" # Update cursor for the next batch\n",
|
||||
" cursor = response.get('cursor')\n",
|
||||
" rows = response.get('rows')\n",
|
||||
" \n",
|
||||
" # If no rows, break out of the loop\n",
|
||||
" if not rows:\n",
|
||||
" break\n",
|
||||
" \n",
|
||||
" # Normalize data and append to CSV\n",
|
||||
" df = pd.DataFrame(rows, columns=columns)\n",
|
||||
" df = recursively_normalize(df.to_dict(orient='records'))\n",
|
||||
" \n",
|
||||
" # Append to CSV file without headers\n",
|
||||
" df.to_csv(\"lab_logs_normal_activity.csv\", mode='a', index=False, header=False)\n",
|
||||
" \n",
|
||||
" # Convert DataFrame to JSON, line by line\n",
|
||||
" json_lines = df.to_json(orient='records', lines=True).splitlines()\n",
|
||||
" # Append each line to an existing JSON file\n",
|
||||
" with open(\"lab_logs_normal_activity.json\", 'a') as file:\n",
|
||||
" for line in json_lines:\n",
|
||||
" file.write(line + '\\n') # Append each line and add a newline\n",
|
||||
" \n",
|
||||
" # Update total documents retrieved\n",
|
||||
" total_documents_retrieved += len(rows)\n",
|
||||
" \n",
|
||||
" print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
|
||||
"\n",
|
||||
"print(\"Files have been written.\")\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "1b236f1c-7060-43a0-b4e7-2b9697114a3e",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Alternative approach with polars\n",
|
||||
"\n",
|
||||
"Polars is a newer tabular-integration library, which challenges Pandas. \n",
|
||||
"It's supposed to me more memory efficient, because it's backend is written in Rust."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "78e37d61-4554-4bbb-99d9-ecbb2e892557",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"%pip install polars"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 63,
|
||||
"id": "328b8d13-3cc0-4239-b3e5-d98da9bb51ec",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Retrieved 1000 documents.\n",
|
||||
"Retrieved 2000 documents.\n",
|
||||
"Retrieved 3000 documents.\n",
|
||||
"Retrieved 4000 documents.\n",
|
||||
"Retrieved 5000 documents.\n",
|
||||
"Files have been written.\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"import requests\n",
|
||||
"import polars as pl\n",
|
||||
"import json\n",
|
||||
"\n",
|
||||
"# Function to recursively unnest nested columns in a DataFrame\n",
|
||||
"def recursively_unnest(df):\n",
|
||||
" nested = True\n",
|
||||
" while nested:\n",
|
||||
" nested = False\n",
|
||||
" for col in df.columns:\n",
|
||||
" if df[col].dtype == pl.List:\n",
|
||||
" df = df.explode(col)\n",
|
||||
" nested = True\n",
|
||||
" elif df[col].dtype == pl.Struct:\n",
|
||||
" df = df.unnest(col)\n",
|
||||
" nested = True\n",
|
||||
" return df\n",
|
||||
"\n",
|
||||
"# Function to fetch the next batch using the cursor\n",
|
||||
"def fetch_next_batch(cursor):\n",
|
||||
" response = requests.post(\n",
|
||||
" f\"{base_url}/_sql?format=json\",\n",
|
||||
" headers={\"Content-Type\": \"application/json\"},\n",
|
||||
" json={\"cursor\": cursor}\n",
|
||||
" ).json()\n",
|
||||
" return response\n",
|
||||
"\n",
|
||||
"# Elasticsearch base URL\n",
|
||||
"base_url = \"http://192.168.20.106:9200\"\n",
|
||||
"# Index name\n",
|
||||
"index = \"winlogbeat-*\"\n",
|
||||
"\n",
|
||||
"# SQL query for initial search\n",
|
||||
"sql_query = \"\"\"\n",
|
||||
"SELECT \"@timestamp\", host.hostname, host.ip, log.level, winlog.event_id, winlog.task, message FROM \"winlogbeat-7.10.0-2024.05.15-*\"\n",
|
||||
"LIMIT 5000\n",
|
||||
"\"\"\"\n",
|
||||
"\n",
|
||||
"# Initial search request to start scrolling\n",
|
||||
"initial_response = requests.post(\n",
|
||||
" f\"{base_url}/_sql?format=json\",\n",
|
||||
" headers={\"Content-Type\": \"application/json\"},\n",
|
||||
" json={\n",
|
||||
" \"query\": sql_query,\n",
|
||||
" \"field_multi_value_leniency\": True\n",
|
||||
" }\n",
|
||||
").json()\n",
|
||||
"\n",
|
||||
"# Extract the cursor for scrolling\n",
|
||||
"cursor = initial_response.get('cursor')\n",
|
||||
"rows = initial_response.get('rows')\n",
|
||||
"columns = [col['name'] for col in initial_response['columns']]\n",
|
||||
"\n",
|
||||
"# Initialize CSV file (assumes the first batch is not empty)\n",
|
||||
"if rows:\n",
|
||||
" df = pl.DataFrame(rows, schema=columns)\n",
|
||||
" df = recursively_unnest(df)\n",
|
||||
" df.write_csv(\"lab_logs_normal_activity.csv\", include_header=True)\n",
|
||||
"\n",
|
||||
"# Track total documents retrieved\n",
|
||||
"total_documents_retrieved = len(rows)\n",
|
||||
"print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
|
||||
"\n",
|
||||
"# Loop to fetch subsequent batches of documents until no more documents are left\n",
|
||||
"while cursor:\n",
|
||||
" # Fetch next batch of documents using cursor\n",
|
||||
" response = fetch_next_batch(cursor)\n",
|
||||
" \n",
|
||||
" # Update cursor for the next batch\n",
|
||||
" cursor = response.get('cursor')\n",
|
||||
" rows = response.get('rows')\n",
|
||||
" \n",
|
||||
" # If no rows, break out of the loop\n",
|
||||
" if not rows:\n",
|
||||
" break\n",
|
||||
" \n",
|
||||
" # Normalize data and append to CSV\n",
|
||||
" df = pl.DataFrame(rows, schema=columns)\n",
|
||||
" df = recursively_unnest(df)\n",
|
||||
" \n",
|
||||
" # Manually write the CSV to avoid headers\n",
|
||||
" with open(\"lab_logs_normal_activity.csv\", 'a') as f:\n",
|
||||
" df.write_csv(f, include_header=False)\n",
|
||||
" \n",
|
||||
" # Convert DataFrame to JSON, line by line\n",
|
||||
" json_lines = [json.dumps(record) for record in df.to_dicts()]\n",
|
||||
" # Append each line to an existing JSON file\n",
|
||||
" with open(\"lab_logs_normal_activity.json\", 'a') as file:\n",
|
||||
" for line in json_lines:\n",
|
||||
" file.write(line + '\\n') # Append each line and add a newline\n",
|
||||
" \n",
|
||||
" # Update total documents retrieved\n",
|
||||
" total_documents_retrieved += len(rows)\n",
|
||||
" \n",
|
||||
" print(f\"Retrieved {total_documents_retrieved} documents.\")\n",
|
||||
"\n",
|
||||
"print(\"Files have been written.\")\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "3dd720a7-c716-4d41-9ab4-37652acca137",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"source": [
|
||||
"## Memory footprint comparison"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 74,
|
||||
"id": "eefffe2a-f61c-47c8-90e3-d0de0ab932d6",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"@timestamp object\n",
|
||||
"host.hostname object\n",
|
||||
"host.ip object\n",
|
||||
"log.level object\n",
|
||||
"winlog.event_id int64\n",
|
||||
"winlog.task object\n",
|
||||
"message object\n",
|
||||
"dtype: object\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"# Read a small chunk of the JSON file\n",
|
||||
"file_path = \"lab_logs_normal_activity.json\"\n",
|
||||
"pd_df = pd.read_json(file_path, lines=True, nrows=10)\n",
|
||||
"\n",
|
||||
"print(pd_df.dtypes)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 69,
|
||||
"id": "0b2be27e-a56c-411b-bbff-dc42e533ca80",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{'@timestamp': String, 'host.hostname': String, 'host.ip': String, 'log.level': String, 'winlog.event_id': Int64, 'winlog.task': String, 'message': String}\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"import polars as pl\n",
|
||||
"\n",
|
||||
"# Define the mapping from Pandas dtype to Polars dtype\n",
|
||||
"dtype_mapping = {\n",
|
||||
" \"object\": pl.Utf8,\n",
|
||||
" \"int64\": pl.Int64,\n",
|
||||
" \"float64\": pl.Float64,\n",
|
||||
" # Add more mappings if needed\n",
|
||||
"}\n",
|
||||
"\n",
|
||||
"# Generate the schema for Polars from Pandas dtype\n",
|
||||
"schema = {col: dtype_mapping[str(dtype)] for col, dtype in pd_df.dtypes.items()}\n",
|
||||
"print(schema)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 78,
|
||||
"id": "5ccc9d58-8e27-43d0-bf69-7f2ff44c9874",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"shape: (8_000, 7)\n",
|
||||
"┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐\n",
|
||||
"│ @timestamp ┆ host.hostna ┆ host.ip ┆ log.level ┆ winlog.even ┆ winlog.task ┆ message │\n",
|
||||
"│ --- ┆ me ┆ --- ┆ --- ┆ t_id ┆ --- ┆ --- │\n",
|
||||
"│ str ┆ --- ┆ str ┆ str ┆ --- ┆ str ┆ str │\n",
|
||||
"│ ┆ str ┆ ┆ ┆ i64 ┆ ┆ │\n",
|
||||
"╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
|
||||
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
|
||||
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
|
||||
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
|
||||
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
|
||||
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
|
||||
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
|
||||
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
|
||||
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 13 ┆ Registry ┆ Registry │\n",
|
||||
"│ 5:57:18.471Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ value set ┆ value set: │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ (rule: ┆ RuleName: … │\n",
|
||||
"│ ┆ ┆ ┆ ┆ ┆ Regi… ┆ │\n",
|
||||
"│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
|
||||
"│ 6:10:07.128Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
|
||||
"│ 6:10:07.136Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
|
||||
"│ 6:10:07.136Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
|
||||
"│ 6:10:07.149Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
|
||||
"│ 2024-05-15T1 ┆ win10 ┆ fe80::24b4: ┆ information ┆ 4663 ┆ Removable ┆ An attempt │\n",
|
||||
"│ 6:10:07.149Z ┆ ┆ 3691:44a6:3 ┆ ┆ ┆ Storage ┆ was made to │\n",
|
||||
"│ ┆ ┆ 8a1 ┆ ┆ ┆ ┆ access … │\n",
|
||||
"└──────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘\n",
|
||||
"Pandas DataFarme number of rows: 8000\n",
|
||||
"Polars DataFrame memory usage: 4.76 MB\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Read the JSON file using the defined schema\n",
|
||||
"lazy_df = pl.scan_ndjson(file_path)\n",
|
||||
"\n",
|
||||
"# Collect the LazyFrame to a DataFrame\n",
|
||||
"pl_df = lazy_df.collect()\n",
|
||||
"\n",
|
||||
"# Convert columns to the correct data types according to the schema\n",
|
||||
"pl_df = pl_df.with_columns([pl.col(col).cast(dtype) for col, dtype in schema.items()])\n",
|
||||
"\n",
|
||||
"# Print the DataFrame and its memory usage\n",
|
||||
"print(pl_df)\n",
|
||||
"\n",
|
||||
"num_rows_polars = pl_df.shape[0]\n",
|
||||
"\n",
|
||||
"print(f\"Pandas DataFarme number of rows: {num_rows_polars}\")\n",
|
||||
"print(f\"Polars DataFrame memory usage: {pl_df.estimated_size() / (1024 ** 2):.2f} MB\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 79,
|
||||
"id": "547f7253-cd62-44c6-8d7a-840dab2dbbbd",
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" @timestamp host.hostname host.ip \\\n",
|
||||
"0 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"1 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"2 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"3 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"4 2024-05-15T15:57:18.471Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"... ... ... ... \n",
|
||||
"7995 2024-05-15T16:10:07.128Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"7996 2024-05-15T16:10:07.136Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"7997 2024-05-15T16:10:07.136Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"7998 2024-05-15T16:10:07.149Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"7999 2024-05-15T16:10:07.149Z win10 fe80::24b4:3691:44a6:38a1 \n",
|
||||
"\n",
|
||||
" log.level winlog.event_id winlog.task \\\n",
|
||||
"0 information 13 Registry value set (rule: RegistryEvent) \n",
|
||||
"1 information 13 Registry value set (rule: RegistryEvent) \n",
|
||||
"2 information 13 Registry value set (rule: RegistryEvent) \n",
|
||||
"3 information 13 Registry value set (rule: RegistryEvent) \n",
|
||||
"4 information 13 Registry value set (rule: RegistryEvent) \n",
|
||||
"... ... ... ... \n",
|
||||
"7995 information 4663 Removable Storage \n",
|
||||
"7996 information 4663 Removable Storage \n",
|
||||
"7997 information 4663 Removable Storage \n",
|
||||
"7998 information 4663 Removable Storage \n",
|
||||
"7999 information 4663 Removable Storage \n",
|
||||
"\n",
|
||||
" message \n",
|
||||
"0 Registry value set:\\nRuleName: InvDB-Ver\\nEven... \n",
|
||||
"1 Registry value set:\\nRuleName: InvDB-Path\\nEve... \n",
|
||||
"2 Registry value set:\\nRuleName: InvDB-Pub\\nEven... \n",
|
||||
"3 Registry value set:\\nRuleName: InvDB-CompileTi... \n",
|
||||
"4 Registry value set:\\nRuleName: InvDB-Ver\\nEven... \n",
|
||||
"... ... \n",
|
||||
"7995 An attempt was made to access an object.\\n\\nSu... \n",
|
||||
"7996 An attempt was made to access an object.\\n\\nSu... \n",
|
||||
"7997 An attempt was made to access an object.\\n\\nSu... \n",
|
||||
"7998 An attempt was made to access an object.\\n\\nSu... \n",
|
||||
"7999 An attempt was made to access an object.\\n\\nSu... \n",
|
||||
"\n",
|
||||
"[8000 rows x 7 columns]\n",
|
||||
"Pandas DataFarme number of rows: 8000\n",
|
||||
"Pandas DataFrame memory usage: 7.56 MB\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Load the JSON file into a Pandas DataFrame\n",
|
||||
"pd_df = pd.read_json(file_path, lines=True)\n",
|
||||
"pd_memory_usage = pd_df.memory_usage(deep=True).sum()\n",
|
||||
"\n",
|
||||
"# Get the number of rows in the Pandas DataFrame\n",
|
||||
"num_rows_pandas = pd_df.shape[0]\n",
|
||||
"\n",
|
||||
"print(pd_df)\n",
|
||||
"\n",
|
||||
"print(f\"Pandas DataFarme number of rows: {num_rows_pandas}\")\n",
|
||||
"print(f\"Pandas DataFrame memory usage: {pd_memory_usage / (1024 ** 2):.2f} MB\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "04937c37-16b4-4e03-9cf2-ac704e48f60e",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Result\n",
|
||||
"\n",
|
||||
"Polars and Pandas borth processed the same data (8000 rows, categorical data represented as strings).\n",
|
||||
"\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.5"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
Loading…
Reference in New Issue
Block a user