408 lines
16 KiB
Python
408 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, AUTO_BIND_NO_TLS, SUBTREE
|
|
from ldap3.core.exceptions import LDAPCursorError
|
|
import json
|
|
import pandas as pd
|
|
from io import StringIO
|
|
from datetime import datetime, timedelta
|
|
# from email.message import EmailMessage
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from tabulate import tabulate
|
|
|
|
# global variables
|
|
ad_audit_user = ""
|
|
ad_audit_password = ""
|
|
ad_server = ""
|
|
ad_server2 = ""
|
|
report_email = ""
|
|
|
|
# development config flags
|
|
detailed_output = True # results to stdout
|
|
verbose = False # debug output
|
|
|
|
inactive_enabled_onprem_ad_users_html_report = StringIO()
|
|
|
|
def read_config():
|
|
"""
|
|
Read config file and check if it is valid
|
|
:return:
|
|
"""
|
|
|
|
with open('config.json') as f:
|
|
|
|
# parse the JSON file and assert the values
|
|
try:
|
|
d = json.load(f)
|
|
|
|
assert 'admin' in d['user']
|
|
assert isinstance(d['password'], str)
|
|
assert isinstance(d['ad_server'], str)
|
|
assert isinstance(d['ad_server_2'], str)
|
|
assert isinstance(d['report_email'], str)
|
|
|
|
except AssertionError:
|
|
print("Config file is not valid")
|
|
return 0
|
|
|
|
# move the values to global variables
|
|
global ad_audit_user
|
|
global ad_audit_password
|
|
global ad_server
|
|
global ad_server_2
|
|
global report_email
|
|
|
|
ad_audit_user = d['user']
|
|
ad_audit_password = d['password']
|
|
ad_server = d['ad_server']
|
|
ad_server_2 = d['ad_server_2']
|
|
report_email = d['report_email']
|
|
|
|
|
|
def _parse_date(date_string: str):
|
|
"""
|
|
Parse date string, internal function
|
|
:param date_string:
|
|
:return: datetime object
|
|
"""
|
|
|
|
# print(date_string)
|
|
# https://bugs.python.org/msg169952 BUG in Python
|
|
if ":" == date_string[-3:-2]:
|
|
date_string = date_string[:-3] + date_string[-2:]
|
|
# handle optional microseconds - https://strftime.org/ - %f if the parse fails (chain)
|
|
try:
|
|
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f%z")
|
|
except ValueError:
|
|
return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S%z")
|
|
|
|
|
|
def send_email(message, subject="Inactive on-prem AD employee accounts (180d)", server='10.1.1.2',
|
|
from_email='audit-tool@company.com'):
|
|
"""
|
|
Send email (to Slack, Exchange, etc.
|
|
:param message:
|
|
:param to_email:
|
|
:param subject:
|
|
:param server:
|
|
:param from_email:
|
|
:return:
|
|
"""
|
|
|
|
# msg = EmailMessage()
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = subject
|
|
msg['From'] = from_email
|
|
msg['To'] = "monitoring@company.com"
|
|
|
|
message = '<font face="Courier New, Courier, monospace"><pre>' + message + '</pre></font>'
|
|
part2 = MIMEText(message, 'html')
|
|
# msg.set_content(message)
|
|
msg.attach(part2)
|
|
|
|
server = smtplib.SMTP(server)
|
|
server.set_debuglevel(1)
|
|
# server.login(from_email, ad_audit_password) # user & password
|
|
server.send_message(msg)
|
|
server.quit()
|
|
print('successfully sent the mail.')
|
|
|
|
|
|
def _resolve_ms_ad_status_code(code : int):
|
|
"""
|
|
Resolve the status code from MS AD
|
|
:param code:
|
|
:return:
|
|
"""
|
|
|
|
code_lookup_table = {
|
|
512: "Enabled, PW policy enforced",
|
|
514: "Disabled, PW policy enforced",
|
|
544: "Enabled, PW policy NOT enforced",
|
|
66048: "Enabled, PW policy NOT enforced"
|
|
}
|
|
|
|
if code in code_lookup_table:
|
|
return code_lookup_table[code]
|
|
else:
|
|
return code
|
|
|
|
|
|
def _connect_to_msaddcs():
|
|
pass
|
|
|
|
def analyze_ad_groups():
|
|
# move global variables to local scope
|
|
domain_name = 'company'
|
|
domain_suffix = "int"
|
|
server_name = ad_server
|
|
failover_server_name = ad_server_2
|
|
user_name = ad_audit_user
|
|
password = ad_audit_password
|
|
|
|
# Setup the connection to the AD server
|
|
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
|
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
|
auto_bind=True)
|
|
|
|
# Search for all empty security groups(, excluding dynamic groups)
|
|
conn.search(
|
|
search_base=f'dc={domain_name},dc={domain_suffix}',
|
|
search_filter = '(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648))',
|
|
# search_filter='(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648)(!(groupType:1.2.840.113556.1.4.803:=2147483648)))',
|
|
search_scope=SUBTREE,
|
|
attributes=['member']
|
|
)
|
|
# filer out dynamic groups manually (not supported by the search filter) based on the prefix
|
|
empty_groups = [entry.entry_dn for entry in conn.entries if len(entry.member) == 0 and not any(entry.entry_dn.startswith(s) for s in ['CN=dyn', 'CN=app'])]
|
|
|
|
# tabulate the results
|
|
empty_ad_security_groups = pd.DataFrame({'Empty Security Groups': empty_groups})
|
|
|
|
# Print the list of empty security groups
|
|
print(empty_ad_security_groups)
|
|
|
|
# Close the connection
|
|
conn.unbind()
|
|
|
|
|
|
def analyze_ad_admins():
|
|
# move global variables to local scope
|
|
domain_name = 'company'
|
|
domain_suffix = "int"
|
|
server_name = ad_server
|
|
failover_server_name = ad_server_2
|
|
user_name = ad_audit_user
|
|
password = ad_audit_password
|
|
|
|
# Setup the connection to the AD server
|
|
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
|
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
|
auto_bind=True)
|
|
|
|
# diff the results against the prior run (if available)
|
|
if os.path.isfile('ad_admins.csv'):
|
|
prior_ad_admins = pd.read_csv('ad_admins.csv')
|
|
|
|
# Search for all users who are members of Domain Admins or Enterprise Admins group
|
|
search_filter = f'(&(objectClass=user)(|(memberOf=CN=Domain Admins,CN=Users,DC={domain_name},DC={domain_suffix})(memberOf=CN=Enterprise Admins,CN=Users,DC=your_domain,DC=com)))'
|
|
search_base = f'dc={domain_name},dc={domain_suffix}'
|
|
search_scope = SUBTREE
|
|
attributes = ['sAMAccountName', 'displayName']
|
|
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
|
|
admin_users = [entry.sAMAccountName.value for entry in conn.entries]
|
|
|
|
# Print the list of admin users
|
|
# tabulate the results
|
|
ad_admin_users = pd.DataFrame({'AD Domain Admin Users': admin_users})
|
|
print(ad_admin_users)
|
|
|
|
# Close the connection
|
|
conn.unbind()
|
|
|
|
def analyze_ad_service_users():
|
|
# move global variables to local scope
|
|
domain_name = 'company'
|
|
domain_suffix = "int"
|
|
server_name = ad_server
|
|
failover_server_name = ad_server_2
|
|
user_name = ad_audit_user
|
|
password = ad_audit_password
|
|
|
|
# Setup the connection to the AD server
|
|
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
|
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
|
auto_bind=True)
|
|
|
|
# Search for all service users
|
|
search_filter = '(servicePrincipalName=*)'
|
|
# search_filter = '(&(objectClass=user)(servicePrincipalName=*)(!(objectCategory=computer))(userAccountControl:1.2.840.113556.1.4.803:=2))'
|
|
search_base = f'dc={domain_name},dc={domain_suffix}'
|
|
search_scope = SUBTREE
|
|
attributes = ['sAMAccountName', 'displayName']
|
|
conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes)
|
|
service_users = [entry.sAMAccountName.value for entry in conn.entries]
|
|
|
|
# Print the list of service users
|
|
# tabulate the results
|
|
ad_service_users = pd.DataFrame({'AD Service Users': service_users})
|
|
print(ad_service_users)
|
|
|
|
# Close the connection
|
|
conn.unbind()
|
|
|
|
|
|
def analyze_ad_users():
|
|
"""
|
|
Query AD for users
|
|
:param server_name:
|
|
:param user_name:
|
|
:param password:
|
|
:return:
|
|
"""
|
|
|
|
# move global variables to local scope
|
|
server_name = ad_server
|
|
failover_server_name = ad_server_2
|
|
user_name = ad_audit_user
|
|
password = ad_audit_password
|
|
|
|
# scope: employee, contractor, etc.
|
|
domain_name = 'company'
|
|
domain_suffix = "int"
|
|
|
|
org_ou = "companynetwork"
|
|
root_ou = "company"
|
|
sub_ou = "users"
|
|
|
|
# print header
|
|
format_string = '{:25} {:>6} {:19} {:35} {}'
|
|
print(format_string.format('User', 'Logins', 'Last Login', 'State', 'Description'))
|
|
|
|
# global exception handling
|
|
try:
|
|
|
|
server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)])
|
|
conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM,
|
|
auto_bind=True)
|
|
|
|
# ldap search query
|
|
conn.search(
|
|
search_base=f'OU={root_ou},OU={sub_ou},OU={org_ou},DC={domain_name},DC={domain_suffix}',
|
|
search_filter='(objectClass=person)',
|
|
search_scope=SUBTREE,
|
|
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
|
|
)
|
|
|
|
# check if we have any results, and if not exit the program
|
|
if len(conn.entries) == 0:
|
|
print("No results")
|
|
return 0
|
|
|
|
# list of object entries from LDAP query
|
|
onprem_ad_user_list = []
|
|
|
|
for e in conn.entries:
|
|
# handle missing description
|
|
try:
|
|
desc = e.description
|
|
except LDAPCursorError:
|
|
desc = ""
|
|
|
|
# lookup the status code from ms ad
|
|
status_text = _resolve_ms_ad_status_code(e.userAccountControl.value)
|
|
|
|
print(format_string.format(str(e.name), str(e.logonCount), str(e.lastLogon)[:19], str(status_text)[:35],
|
|
desc))
|
|
|
|
# convert the entry to JSON, and append it to the list as a dictionary
|
|
onprem_ad_user_entry_dict = json.loads(e.entry_to_json())
|
|
onprem_ad_user_list.append(onprem_ad_user_entry_dict)
|
|
|
|
if verbose:
|
|
print(e)
|
|
|
|
def _tabularize_ad_user_data(onprem_ad_user_entry_dict : dict):
|
|
# convert the list of dictionaries to a pandas dataframe
|
|
# normalize the data, flatten the JSON
|
|
onprem_ad_user_df = pd.json_normalize(onprem_ad_user_entry_dict)
|
|
|
|
# due to issues with the processing in Elastic, we need to remove the following attributes
|
|
onprem_ad_user_df.drop("attributes.msExchSafeSendersHash", inplace=True, axis=1)
|
|
onprem_ad_user_df.drop("attributes.msExchBlockedSendersHash", inplace=True, axis=1)
|
|
|
|
# fill in the lastLogonTimestamp with the lastLogon value if the lastLogonTimestamp is empty
|
|
onprem_ad_user_df["attributes.lastLogonTimestamp"].fillna(onprem_ad_user_df["attributes.lastLogon"], inplace=True)
|
|
|
|
# convert the date fields to datetime type, use UTC
|
|
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
|
lambda x: _parse_date(str(x[0])))
|
|
onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
|
lambda x: pd.to_datetime(x, errors='coerce', utc=True))
|
|
|
|
# check if last logon is 180 days ago, localize local timestamp to UTC for the comparison
|
|
onprem_ad_user_df["inactive"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply(
|
|
lambda x: pd.Timestamp.now().tz_localize('UTC') - timedelta(days=180) > x)
|
|
|
|
# simplify the account analysis by storing the user status in columns
|
|
onprem_ad_user_df["inactive"] = onprem_ad_user_df["inactive"].astype(bool)
|
|
onprem_ad_user_df["enabled"] = onprem_ad_user_df["attributes.userAccountControl"].apply(lambda x: 512 in x)
|
|
return onprem_ad_user_df
|
|
onprem_ad_users_df = _tabularize_ad_user_data(onprem_ad_user_list)
|
|
|
|
def _filter_inactive_and_non_disabled_users(onprem_ad_users_df : pd.DataFrame):
|
|
# filter for the inactive users AND enabled users
|
|
inactive_ad_users_df = onprem_ad_users_df.loc[onprem_ad_users_df["inactive"] & onprem_ad_users_df["enabled"]]
|
|
return inactive_ad_users_df
|
|
inactive_onprem_ad_users_df = _filter_inactive_and_non_disabled_users(onprem_ad_users_df)
|
|
|
|
# prepare the email report html part
|
|
inactive_onprem_ad_users_count = inactive_onprem_ad_users_df.shape[0]
|
|
|
|
# print the inactive users if detailed output is enabled
|
|
if detailed_output and inactive_onprem_ad_users_count > 0:
|
|
print()
|
|
print("Inactive users enabled:")
|
|
|
|
print(
|
|
inactive_onprem_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]]
|
|
)
|
|
|
|
inactive_ad_users_description_result = "Found " + str(inactive_onprem_ad_users_count) + \
|
|
" inactive enabled users in on-prem AD."
|
|
print(inactive_ad_users_description_result)
|
|
if inactive_onprem_ad_users_count == 0:
|
|
inactive_enabled_onprem_ad_users_html_report.write("No inactive enabled users found in on-prem AD.\r\n")
|
|
print("Terminated successfully")
|
|
exit(0)
|
|
|
|
def _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_ad_users_df : pd.DataFrame):
|
|
inactive_enabled_onprem_ad_users_html_report.write("The following employee accounts are active in COMPANY " +
|
|
"onprem AD, and have not logged in for 180 days:\r\n\r\n")
|
|
inactive_enabled_onprem_ad_users_html_report.write(
|
|
tabulate(
|
|
inactive_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]],
|
|
headers=["Name", "Last Logon"],
|
|
tablefmt="psql",
|
|
showindex=False)
|
|
)
|
|
inactive_enabled_onprem_ad_users_html_report.write("\r\nThese may have been missed within the Joiner / Leaver process\r\n")
|
|
return inactive_enabled_onprem_ad_users_html_report
|
|
inactive_enabled_ad_users_html_report = _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_onprem_ad_users_df)
|
|
|
|
print("Report Mail prepared, sending email...")
|
|
send_email(message=inactive_enabled_ad_users_html_report.getvalue())
|
|
|
|
def __append_log_output_to_json_file(onprem_ad_users_df : pd.DataFrame):
|
|
# serialize the dataframe to json and csv, keep lines for Elastic (append)
|
|
log_output = StringIO()
|
|
onprem_ad_users_df.to_json(path_or_buf=log_output, orient='records', lines=True)
|
|
# append the JSON output to a file
|
|
with open("data.json", "a+") as f:
|
|
f.write(log_output.getvalue())
|
|
__append_log_output_to_json_file(onprem_ad_users_df)
|
|
|
|
if detailed_output:
|
|
# overwrite the output to a file
|
|
onprem_ad_users_df.to_csv('data.csv', sep=";", index=False)
|
|
|
|
# global exception with failure message
|
|
except Exception as e:
|
|
print(e)
|
|
return 0
|
|
|
|
# close the connection, clean up
|
|
finally:
|
|
conn.unbind()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
read_config()
|
|
analyze_ad_users()
|
|
# analyze_ad_groups()
|
|
analyze_ad_admins()
|
|
# analyze_ad_service_users()
|
|
# send_email(message=inactive_enabled_onprem_ad_users_html_report.getvalue()) |