1
0
Fork 0
ad-user-review/ad-user-review.py

408 lines
16 KiB
Python

#!/usr/bin/env python3
import os
from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, AUTO_BIND_NO_TLS, SUBTREE
from ldap3.core.exceptions import LDAPCursorError
import json
import pandas as pd
from io import StringIO
from datetime import datetime, timedelta
# from email.message import EmailMessage
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from tabulate import tabulate
# global variables
ad_audit_user = ""
ad_audit_password = ""
ad_server = ""
ad_server2 = ""
report_email = ""
# development config flags
detailed_output = True # results to stdout
verbose = False # debug output
inactive_enabled_onprem_ad_users_html_report = StringIO()
def read_config():
"""
Read config file and check if it is valid
:return:
"""
with open('config.json') as f:
# parse the JSON file and assert the values
try:
d = json.load(f)
assert 'admin' in d['user']
assert isinstance(d['password'], str)
assert isinstance(d['ad_server'], str)
assert isinstance(d['ad_server_2'], str)
assert isinstance(d['report_email'], str)
except AssertionError:
print("Config file is not valid")
return 0
# move the values to global variables
global ad_audit_user
global ad_audit_password
global ad_server
global ad_server_2
global report_email
ad_audit_user = d['user']
ad_audit_password = d['password']
ad_server = d['ad_server']
ad_server_2 = d['ad_server_2']
report_email = d['report_email']
def _parse_date(date_string: str):
"""
Parse date string, internal function
:param date_string:
:return: datetime object
"""
# print(date_string)
# https://bugs.python.org/msg169952 BUG in Python
if ":" == date_string[-3:-2]:
date_string = date_string[:-3] + date_string[-2:]
# handle optional microseconds - https://strftime.org/ - %f if the parse fails (chain)
try:
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f%z")
except ValueError:
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S%z")
def send_email(message, subject="Inactive on-prem AD employee accounts (180d)", server='10.1.1.2',
from_email='audit-tool@company.com'):
"""
Send email (to Slack, Exchange, etc.
:param message:
:param to_email:
:param subject:
:param server:
:param from_email:
:return:
"""
# msg = EmailMessage()
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = "monitoring@company.com"
message = '<font face="Courier New, Courier, monospace"><pre>' + message + '</pre></font>'
part2 = MIMEText(message, 'html')
# msg.set_content(message)
msg.attach(part2)
server = smtplib.SMTP(server)
server.set_debuglevel(1)
# server.login(from_email, ad_audit_password) # user & password
server.send_message(msg)
server.quit()
print('successfully sent the mail.')
def _resolve_ms_ad_status_code(code : int):
"""
Resolve the status code from MS AD
:param code:
:return:
"""
code_lookup_table = {
512: "Enabled, PW policy enforced",
514: "Disabled, PW policy enforced",
544: "Enabled, PW policy NOT enforced",
66048: "Enabled, PW policy NOT enforced"
}
if code in code_lookup_table:
return code_lookup_table[code]
else:
return code
def _connect_to_msaddcs():
pass
def analyze_ad_groups():
# move global variables to local scope
domain_name = 'company'
domain_suffix = "int"
server_name = ad_server
failover_server_name = ad_server_2
user_name = ad_audit_user
password = ad_audit_password
# Setup the connection to the AD server
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
auto_bind=True)
# Search for all empty security groups(, excluding dynamic groups)
conn.search(
search_base=f'dc={domain_name},dc={domain_suffix}',
search_filter = '(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648))',
# search_filter='(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648)(!(groupType:1.2.840.113556.1.4.803:=2147483648)))',
search_scope=SUBTREE,
attributes=['member']
)
# filer out dynamic groups manually (not supported by the search filter) based on the prefix
empty_groups = [entry.entry_dn for entry in conn.entries if len(entry.member) == 0 and not any(entry.entry_dn.startswith(s) for s in ['CN=dyn', 'CN=app'])]
# tabulate the results
empty_ad_security_groups = pd.DataFrame({'Empty Security Groups': empty_groups})
# Print the list of empty security groups
print(empty_ad_security_groups)
# Close the connection
conn.unbind()
def analyze_ad_admins():
# move global variables to local scope
domain_name = 'company'
domain_suffix = "int"
server_name = ad_server
failover_server_name = ad_server_2
user_name = ad_audit_user
password = ad_audit_password
# Setup the connection to the AD server
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
auto_bind=True)
# diff the results against the prior run (if available)
if os.path.isfile('ad_admins.csv'):
prior_ad_admins = pd.read_csv('ad_admins.csv')
# Search for all users who are members of Domain Admins or Enterprise Admins group
search_filter = f'(&(objectClass=user)(|(memberOf=CN=Domain Admins,CN=Users,DC={domain_name},DC={domain_suffix})(memberOf=CN=Enterprise Admins,CN=Users,DC=your_domain,DC=com)))'
search_base = f'dc={domain_name},dc={domain_suffix}'
search_scope = SUBTREE
attributes = ['sAMAccountName', 'displayName']
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
admin_users = [entry.sAMAccountName.value for entry in conn.entries]
# Print the list of admin users
# tabulate the results
ad_admin_users = pd.DataFrame({'AD Domain Admin Users': admin_users})
print(ad_admin_users)
# Close the connection
conn.unbind()
def analyze_ad_service_users():
# move global variables to local scope
domain_name = 'company'
domain_suffix = "int"
server_name = ad_server
failover_server_name = ad_server_2
user_name = ad_audit_user
password = ad_audit_password
# Setup the connection to the AD server
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
auto_bind=True)
# Search for all service users
search_filter = '(servicePrincipalName=*)'
# search_filter = '(&(objectClass=user)(servicePrincipalName=*)(!(objectCategory=computer))(userAccountControl:1.2.840.113556.1.4.803:=2))'
search_base = f'dc={domain_name},dc={domain_suffix}'
search_scope = SUBTREE
attributes = ['sAMAccountName', 'displayName']
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
service_users = [entry.sAMAccountName.value for entry in conn.entries]
# Print the list of service users
# tabulate the results
ad_service_users = pd.DataFrame({'AD Service Users': service_users})
print(ad_service_users)
# Close the connection
conn.unbind()
def analyze_ad_users():
"""
Query AD for users
:param server_name:
:param user_name:
:param password:
:return:
"""
# move global variables to local scope
server_name = ad_server
failover_server_name = ad_server_2
user_name = ad_audit_user
password = ad_audit_password
# scope: employee, contractor, etc.
domain_name = 'company'
domain_suffix = "int"
org_ou = "companynetwork"
root_ou = "company"
sub_ou = "users"
# print header
format_string = '{:25} {:>6} {:19} {:35} {}'
print(format_string.format('User', 'Logins', 'Last Login', 'State', 'Description'))
# global exception handling
try:
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
auto_bind=True)
# ldap search query
conn.search(
search_base=f'OU={root_ou},OU={sub_ou},OU={org_ou},DC={domain_name},DC={domain_suffix}',
search_filter='(objectClass=person)',
search_scope=SUBTREE,
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
)
# check if we have any results, and if not exit the program
if len(conn.entries) == 0:
print("No results")
return 0
# list of object entries from LDAP query
onprem_ad_user_list = []
for e in conn.entries:
# handle missing description
try:
desc = e.description
except LDAPCursorError:
desc = ""
# lookup the status code from ms ad
status_text = _resolve_ms_ad_status_code(e.userAccountControl.value)
print(format_string.format(str(e.name), str(e.logonCount), str(e.lastLogon)[:19], str(status_text)[:35],
desc))
# convert the entry to JSON, and append it to the list as a dictionary
onprem_ad_user_entry_dict = json.loads(e.entry_to_json())
onprem_ad_user_list.append(onprem_ad_user_entry_dict)
if verbose:
print(e)
def _tabularize_ad_user_data(onprem_ad_user_entry_dict : dict):
# convert the list of dictionaries to a pandas dataframe
# normalize the data, flatten the JSON
onprem_ad_user_df = pd.json_normalize(onprem_ad_user_entry_dict)
# due to issues with the processing in Elastic, we need to remove the following attributes
onprem_ad_user_df.drop("attributes.msExchSafeSendersHash", inplace=True, axis=1)
onprem_ad_user_df.drop("attributes.msExchBlockedSendersHash", inplace=True, axis=1)
# fill in the lastLogonTimestamp with the lastLogon value if the lastLogonTimestamp is empty
onprem_ad_user_df["attributes.lastLogonTimestamp"].fillna(onprem_ad_user_df["attributes.lastLogon"], inplace=True)
# convert the date fields to datetime type, use UTC
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
lambda x: _parse_date(str(x[0])))
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
lambda x: pd.to_datetime(x, errors='coerce', utc=True))
# check if last logon is 180 days ago, localize local timestamp to UTC for the comparison
onprem_ad_user_df["inactive"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
lambda x: pd.Timestamp.now().tz_localize('UTC') - timedelta(days=180) > x)
# simplify the account analysis by storing the user status in columns
onprem_ad_user_df["inactive"] = onprem_ad_user_df["inactive"].astype(bool)
onprem_ad_user_df["enabled"] = onprem_ad_user_df["attributes.userAccountControl"].apply(lambda x: 512 in x)
return onprem_ad_user_df
onprem_ad_users_df = _tabularize_ad_user_data(onprem_ad_user_list)
def _filter_inactive_and_non_disabled_users(onprem_ad_users_df : pd.DataFrame):
# filter for the inactive users AND enabled users
inactive_ad_users_df = onprem_ad_users_df.loc[onprem_ad_users_df["inactive"] & onprem_ad_users_df["enabled"]]
return inactive_ad_users_df
inactive_onprem_ad_users_df = _filter_inactive_and_non_disabled_users(onprem_ad_users_df)
# prepare the email report html part
inactive_onprem_ad_users_count = inactive_onprem_ad_users_df.shape[0]
# print the inactive users if detailed output is enabled
if detailed_output and inactive_onprem_ad_users_count > 0:
print()
print("Inactive users enabled:")
print(
inactive_onprem_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]]
)
inactive_ad_users_description_result = "Found " + str(inactive_onprem_ad_users_count) + \
" inactive enabled users in on-prem AD."
print(inactive_ad_users_description_result)
if inactive_onprem_ad_users_count == 0:
inactive_enabled_onprem_ad_users_html_report.write("No inactive enabled users found in on-prem AD.\r\n")
print("Terminated successfully")
exit(0)
def _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_ad_users_df : pd.DataFrame):
inactive_enabled_onprem_ad_users_html_report.write("The following employee accounts are active in COMPANY " +
"onprem AD, and have not logged in for 180 days:\r\n\r\n")
inactive_enabled_onprem_ad_users_html_report.write(
tabulate(
inactive_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]],
headers=["Name", "Last Logon"],
tablefmt="psql",
showindex=False)
)
inactive_enabled_onprem_ad_users_html_report.write("\r\nThese may have been missed within the Joiner / Leaver process\r\n")
return inactive_enabled_onprem_ad_users_html_report
inactive_enabled_ad_users_html_report = _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_onprem_ad_users_df)
print("Report Mail prepared, sending email...")
send_email(message=inactive_enabled_ad_users_html_report.getvalue())
def __append_log_output_to_json_file(onprem_ad_users_df : pd.DataFrame):
# serialize the dataframe to json and csv, keep lines for Elastic (append)
log_output = StringIO()
onprem_ad_users_df.to_json(path_or_buf=log_output, orient='records', lines=True)
# append the JSON output to a file
with open("data.json", "a+") as f:
f.write(log_output.getvalue())
__append_log_output_to_json_file(onprem_ad_users_df)
if detailed_output:
# overwrite the output to a file
onprem_ad_users_df.to_csv('data.csv', sep=";", index=False)
# global exception with failure message
except Exception as e:
print(e)
return 0
# close the connection, clean up
finally:
conn.unbind()
if __name__ == '__main__':
read_config()
analyze_ad_users()
# analyze_ad_groups()
analyze_ad_admins()
# analyze_ad_service_users()
# send_email(message=inactive_enabled_onprem_ad_users_html_report.getvalue())