Initial code commit
Just the hacky code.
This commit is contained in:
parent
c6633b686c
commit
a49d3f941c
408
ad-user-review.py
Normal file
408
ad-user-review.py
Normal file
@ -0,0 +1,408 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, AUTO_BIND_NO_TLS, SUBTREE
|
||||
from ldap3.core.exceptions import LDAPCursorError
|
||||
import json
|
||||
import pandas as pd
|
||||
from io import StringIO
|
||||
from datetime import datetime, timedelta
|
||||
# from email.message import EmailMessage
|
||||
import smtplib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from tabulate import tabulate
|
||||
|
||||
# global variables
|
||||
ad_audit_user = ""
|
||||
ad_audit_password = ""
|
||||
ad_server = ""
|
||||
ad_server2 = ""
|
||||
report_email = ""
|
||||
|
||||
# development config flags
|
||||
detailed_output = True # results to stdout
|
||||
verbose = False # debug output
|
||||
|
||||
inactive_enabled_onprem_ad_users_html_report = StringIO()
|
||||
|
||||
def read_config():
|
||||
"""
|
||||
Read config file and check if it is valid
|
||||
:return:
|
||||
"""
|
||||
|
||||
with open('config.json') as f:
|
||||
|
||||
# parse the JSON file and assert the values
|
||||
try:
|
||||
d = json.load(f)
|
||||
|
||||
assert 'admin' in d['user']
|
||||
assert isinstance(d['password'], str)
|
||||
assert isinstance(d['ad_server'], str)
|
||||
assert isinstance(d['ad_server_2'], str)
|
||||
assert isinstance(d['report_email'], str)
|
||||
|
||||
except AssertionError:
|
||||
print("Config file is not valid")
|
||||
return 0
|
||||
|
||||
# move the values to global variables
|
||||
global ad_audit_user
|
||||
global ad_audit_password
|
||||
global ad_server
|
||||
global ad_server_2
|
||||
global report_email
|
||||
|
||||
ad_audit_user = d['user']
|
||||
ad_audit_password = d['password']
|
||||
ad_server = d['ad_server']
|
||||
ad_server_2 = d['ad_server_2']
|
||||
report_email = d['report_email']
|
||||
|
||||
|
||||
def _parse_date(date_string: str):
|
||||
"""
|
||||
Parse date string, internal function
|
||||
:param date_string:
|
||||
:return: datetime object
|
||||
"""
|
||||
|
||||
# print(date_string)
|
||||
# https://bugs.python.org/msg169952 BUG in Python
|
||||
if ":" == date_string[-3:-2]:
|
||||
date_string = date_string[:-3] + date_string[-2:]
|
||||
# handle optional microseconds - https://strftime.org/ - %f if the parse fails (chain)
|
||||
try:
|
||||
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f%z")
|
||||
except ValueError:
|
||||
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S%z")
|
||||
|
||||
|
||||
def send_email(message, subject="Inactive on-prem AD employee accounts (180d)", server='10.1.1.2',
|
||||
from_email='audit-tool@company.com'):
|
||||
"""
|
||||
Send email (to Slack, Exchange, etc.
|
||||
:param message:
|
||||
:param to_email:
|
||||
:param subject:
|
||||
:param server:
|
||||
:param from_email:
|
||||
:return:
|
||||
"""
|
||||
|
||||
# msg = EmailMessage()
|
||||
msg = MIMEMultipart('alternative')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = from_email
|
||||
msg['To'] = "monitoring@company.com"
|
||||
|
||||
message = '<font face="Courier New, Courier, monospace"><pre>' + message + '</pre></font>'
|
||||
part2 = MIMEText(message, 'html')
|
||||
# msg.set_content(message)
|
||||
msg.attach(part2)
|
||||
|
||||
server = smtplib.SMTP(server)
|
||||
server.set_debuglevel(1)
|
||||
# server.login(from_email, ad_audit_password) # user & password
|
||||
server.send_message(msg)
|
||||
server.quit()
|
||||
print('successfully sent the mail.')
|
||||
|
||||
|
||||
def _resolve_ms_ad_status_code(code : int):
|
||||
"""
|
||||
Resolve the status code from MS AD
|
||||
:param code:
|
||||
:return:
|
||||
"""
|
||||
|
||||
code_lookup_table = {
|
||||
512: "Enabled, PW policy enforced",
|
||||
514: "Disabled, PW policy enforced",
|
||||
544: "Enabled, PW policy NOT enforced",
|
||||
66048: "Enabled, PW policy NOT enforced"
|
||||
}
|
||||
|
||||
if code in code_lookup_table:
|
||||
return code_lookup_table[code]
|
||||
else:
|
||||
return code
|
||||
|
||||
|
||||
def _connect_to_msaddcs():
|
||||
pass
|
||||
|
||||
def analyze_ad_groups():
|
||||
# move global variables to local scope
|
||||
domain_name = 'company'
|
||||
domain_suffix = "int"
|
||||
server_name = ad_server
|
||||
failover_server_name = ad_server_2
|
||||
user_name = ad_audit_user
|
||||
password = ad_audit_password
|
||||
|
||||
# Setup the connection to the AD server
|
||||
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
||||
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
||||
auto_bind=True)
|
||||
|
||||
# Search for all empty security groups(, excluding dynamic groups)
|
||||
conn.search(
|
||||
search_base=f'dc={domain_name},dc={domain_suffix}',
|
||||
search_filter = '(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648))',
|
||||
# search_filter='(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648)(!(groupType:1.2.840.113556.1.4.803:=2147483648)))',
|
||||
search_scope=SUBTREE,
|
||||
attributes=['member']
|
||||
)
|
||||
# filer out dynamic groups manually (not supported by the search filter) based on the prefix
|
||||
empty_groups = [entry.entry_dn for entry in conn.entries if len(entry.member) == 0 and not any(entry.entry_dn.startswith(s) for s in ['CN=dyn', 'CN=app'])]
|
||||
|
||||
# tabulate the results
|
||||
empty_ad_security_groups = pd.DataFrame({'Empty Security Groups': empty_groups})
|
||||
|
||||
# Print the list of empty security groups
|
||||
print(empty_ad_security_groups)
|
||||
|
||||
# Close the connection
|
||||
conn.unbind()
|
||||
|
||||
|
||||
def analyze_ad_admins():
|
||||
# move global variables to local scope
|
||||
domain_name = 'company'
|
||||
domain_suffix = "int"
|
||||
server_name = ad_server
|
||||
failover_server_name = ad_server_2
|
||||
user_name = ad_audit_user
|
||||
password = ad_audit_password
|
||||
|
||||
# Setup the connection to the AD server
|
||||
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
||||
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
||||
auto_bind=True)
|
||||
|
||||
# diff the results against the prior run (if available)
|
||||
if os.path.isfile('ad_admins.csv'):
|
||||
prior_ad_admins = pd.read_csv('ad_admins.csv')
|
||||
|
||||
# Search for all users who are members of Domain Admins or Enterprise Admins group
|
||||
search_filter = f'(&(objectClass=user)(|(memberOf=CN=Domain Admins,CN=Users,DC={domain_name},DC={domain_suffix})(memberOf=CN=Enterprise Admins,CN=Users,DC=your_domain,DC=com)))'
|
||||
search_base = f'dc={domain_name},dc={domain_suffix}'
|
||||
search_scope = SUBTREE
|
||||
attributes = ['sAMAccountName', 'displayName']
|
||||
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
|
||||
admin_users = [entry.sAMAccountName.value for entry in conn.entries]
|
||||
|
||||
# Print the list of admin users
|
||||
# tabulate the results
|
||||
ad_admin_users = pd.DataFrame({'AD Domain Admin Users': admin_users})
|
||||
print(ad_admin_users)
|
||||
|
||||
# Close the connection
|
||||
conn.unbind()
|
||||
|
||||
def analyze_ad_service_users():
|
||||
# move global variables to local scope
|
||||
domain_name = 'company'
|
||||
domain_suffix = "int"
|
||||
server_name = ad_server
|
||||
failover_server_name = ad_server_2
|
||||
user_name = ad_audit_user
|
||||
password = ad_audit_password
|
||||
|
||||
# Setup the connection to the AD server
|
||||
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
||||
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
||||
auto_bind=True)
|
||||
|
||||
# Search for all service users
|
||||
search_filter = '(servicePrincipalName=*)'
|
||||
# search_filter = '(&(objectClass=user)(servicePrincipalName=*)(!(objectCategory=computer))(userAccountControl:1.2.840.113556.1.4.803:=2))'
|
||||
search_base = f'dc={domain_name},dc={domain_suffix}'
|
||||
search_scope = SUBTREE
|
||||
attributes = ['sAMAccountName', 'displayName']
|
||||
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
|
||||
service_users = [entry.sAMAccountName.value for entry in conn.entries]
|
||||
|
||||
# Print the list of service users
|
||||
# tabulate the results
|
||||
ad_service_users = pd.DataFrame({'AD Service Users': service_users})
|
||||
print(ad_service_users)
|
||||
|
||||
# Close the connection
|
||||
conn.unbind()
|
||||
|
||||
|
||||
def analyze_ad_users():
|
||||
"""
|
||||
Query AD for users
|
||||
:param server_name:
|
||||
:param user_name:
|
||||
:param password:
|
||||
:return:
|
||||
"""
|
||||
|
||||
# move global variables to local scope
|
||||
server_name = ad_server
|
||||
failover_server_name = ad_server_2
|
||||
user_name = ad_audit_user
|
||||
password = ad_audit_password
|
||||
|
||||
# scope: employee, contractor, etc.
|
||||
domain_name = 'company'
|
||||
domain_suffix = "int"
|
||||
|
||||
org_ou = "companynetwork"
|
||||
root_ou = "company"
|
||||
sub_ou = "users"
|
||||
|
||||
# print header
|
||||
format_string = '{:25} {:>6} {:19} {:35} {}'
|
||||
print(format_string.format('User', 'Logins', 'Last Login', 'State', 'Description'))
|
||||
|
||||
# global exception handling
|
||||
try:
|
||||
|
||||
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
||||
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
||||
auto_bind=True)
|
||||
|
||||
# ldap search query
|
||||
conn.search(
|
||||
search_base=f'OU={root_ou},OU={sub_ou},OU={org_ou},DC={domain_name},DC={domain_suffix}',
|
||||
search_filter='(objectClass=person)',
|
||||
search_scope=SUBTREE,
|
||||
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
# check if we have any results, and if not exit the program
|
||||
if len(conn.entries) == 0:
|
||||
print("No results")
|
||||
return 0
|
||||
|
||||
# list of object entries from LDAP query
|
||||
onprem_ad_user_list = []
|
||||
|
||||
for e in conn.entries:
|
||||
# handle missing description
|
||||
try:
|
||||
desc = e.description
|
||||
except LDAPCursorError:
|
||||
desc = ""
|
||||
|
||||
# lookup the status code from ms ad
|
||||
status_text = _resolve_ms_ad_status_code(e.userAccountControl.value)
|
||||
|
||||
print(format_string.format(str(e.name), str(e.logonCount), str(e.lastLogon)[:19], str(status_text)[:35],
|
||||
desc))
|
||||
|
||||
# convert the entry to JSON, and append it to the list as a dictionary
|
||||
onprem_ad_user_entry_dict = json.loads(e.entry_to_json())
|
||||
onprem_ad_user_list.append(onprem_ad_user_entry_dict)
|
||||
|
||||
if verbose:
|
||||
print(e)
|
||||
|
||||
def _tabularize_ad_user_data(onprem_ad_user_entry_dict : dict):
|
||||
# convert the list of dictionaries to a pandas dataframe
|
||||
# normalize the data, flatten the JSON
|
||||
onprem_ad_user_df = pd.json_normalize(onprem_ad_user_entry_dict)
|
||||
|
||||
# due to issues with the processing in Elastic, we need to remove the following attributes
|
||||
onprem_ad_user_df.drop("attributes.msExchSafeSendersHash", inplace=True, axis=1)
|
||||
onprem_ad_user_df.drop("attributes.msExchBlockedSendersHash", inplace=True, axis=1)
|
||||
|
||||
# fill in the lastLogonTimestamp with the lastLogon value if the lastLogonTimestamp is empty
|
||||
onprem_ad_user_df["attributes.lastLogonTimestamp"].fillna(onprem_ad_user_df["attributes.lastLogon"], inplace=True)
|
||||
|
||||
# convert the date fields to datetime type, use UTC
|
||||
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
||||
lambda x: _parse_date(str(x[0])))
|
||||
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
||||
lambda x: pd.to_datetime(x, errors='coerce', utc=True))
|
||||
|
||||
# check if last logon is 180 days ago, localize local timestamp to UTC for the comparison
|
||||
onprem_ad_user_df["inactive"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
||||
lambda x: pd.Timestamp.now().tz_localize('UTC') - timedelta(days=180) > x)
|
||||
|
||||
# simplify the account analysis by storing the user status in columns
|
||||
onprem_ad_user_df["inactive"] = onprem_ad_user_df["inactive"].astype(bool)
|
||||
onprem_ad_user_df["enabled"] = onprem_ad_user_df["attributes.userAccountControl"].apply(lambda x: 512 in x)
|
||||
return onprem_ad_user_df
|
||||
onprem_ad_users_df = _tabularize_ad_user_data(onprem_ad_user_list)
|
||||
|
||||
def _filter_inactive_and_non_disabled_users(onprem_ad_users_df : pd.DataFrame):
|
||||
# filter for the inactive users AND enabled users
|
||||
inactive_ad_users_df = onprem_ad_users_df.loc[onprem_ad_users_df["inactive"] & onprem_ad_users_df["enabled"]]
|
||||
return inactive_ad_users_df
|
||||
inactive_onprem_ad_users_df = _filter_inactive_and_non_disabled_users(onprem_ad_users_df)
|
||||
|
||||
# prepare the email report html part
|
||||
inactive_onprem_ad_users_count = inactive_onprem_ad_users_df.shape[0]
|
||||
|
||||
# print the inactive users if detailed output is enabled
|
||||
if detailed_output and inactive_onprem_ad_users_count > 0:
|
||||
print()
|
||||
print("Inactive users enabled:")
|
||||
|
||||
print(
|
||||
inactive_onprem_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]]
|
||||
)
|
||||
|
||||
inactive_ad_users_description_result = "Found " + str(inactive_onprem_ad_users_count) + \
|
||||
" inactive enabled users in on-prem AD."
|
||||
print(inactive_ad_users_description_result)
|
||||
if inactive_onprem_ad_users_count == 0:
|
||||
inactive_enabled_onprem_ad_users_html_report.write("No inactive enabled users found in on-prem AD.\r\n")
|
||||
print("Terminated successfully")
|
||||
exit(0)
|
||||
|
||||
def _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_ad_users_df : pd.DataFrame):
|
||||
inactive_enabled_onprem_ad_users_html_report.write("The following employee accounts are active in COMPANY " +
|
||||
"onprem AD, and have not logged in for 180 days:\r\n\r\n")
|
||||
inactive_enabled_onprem_ad_users_html_report.write(
|
||||
tabulate(
|
||||
inactive_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]],
|
||||
headers=["Name", "Last Logon"],
|
||||
tablefmt="psql",
|
||||
showindex=False)
|
||||
)
|
||||
inactive_enabled_onprem_ad_users_html_report.write("\r\nThese may have been missed within the Joiner / Leaver process\r\n")
|
||||
return inactive_enabled_onprem_ad_users_html_report
|
||||
inactive_enabled_ad_users_html_report = _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_onprem_ad_users_df)
|
||||
|
||||
print("Report Mail prepared, sending email...")
|
||||
send_email(message=inactive_enabled_ad_users_html_report.getvalue())
|
||||
|
||||
def __append_log_output_to_json_file(onprem_ad_users_df : pd.DataFrame):
|
||||
# serialize the dataframe to json and csv, keep lines for Elastic (append)
|
||||
log_output = StringIO()
|
||||
onprem_ad_users_df.to_json(path_or_buf=log_output, orient='records', lines=True)
|
||||
# append the JSON output to a file
|
||||
with open("data.json", "a+") as f:
|
||||
f.write(log_output.getvalue())
|
||||
__append_log_output_to_json_file(onprem_ad_users_df)
|
||||
|
||||
if detailed_output:
|
||||
# overwrite the output to a file
|
||||
onprem_ad_users_df.to_csv('data.csv', sep=";", index=False)
|
||||
|
||||
# global exception with failure message
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return 0
|
||||
|
||||
# close the connection, clean up
|
||||
finally:
|
||||
conn.unbind()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
read_config()
|
||||
analyze_ad_users()
|
||||
# analyze_ad_groups()
|
||||
analyze_ad_admins()
|
||||
# analyze_ad_service_users()
|
||||
# send_email(message=inactive_enabled_onprem_ad_users_html_report.getvalue())
|
Loading…
Reference in New Issue
Block a user