#!/usr/bin/env python3 import os from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, AUTO_BIND_NO_TLS, SUBTREE from ldap3.core.exceptions import LDAPCursorError import json import pandas as pd from io import StringIO from datetime import datetime, timedelta # from email.message import EmailMessage import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from tabulate import tabulate # global variables ad_audit_user = "" ad_audit_password = "" ad_server = "" ad_server2 = "" report_email = "" # development config flags detailed_output = True # results to stdout verbose = False # debug output inactive_enabled_onprem_ad_users_html_report = StringIO() def read_config(): """ Read config file and check if it is valid :return: """ with open('config.json') as f: # parse the JSON file and assert the values try: d = json.load(f) assert 'admin' in d['user'] assert isinstance(d['password'], str) assert isinstance(d['ad_server'], str) assert isinstance(d['ad_server_2'], str) assert isinstance(d['report_email'], str) except AssertionError: print("Config file is not valid") return 0 # move the values to global variables global ad_audit_user global ad_audit_password global ad_server global ad_server_2 global report_email ad_audit_user = d['user'] ad_audit_password = d['password'] ad_server = d['ad_server'] ad_server_2 = d['ad_server_2'] report_email = d['report_email'] def _parse_date(date_string: str): """ Parse date string, internal function :param date_string: :return: datetime object """ # print(date_string) # https://bugs.python.org/msg169952 BUG in Python if ":" == date_string[-3:-2]: date_string = date_string[:-3] + date_string[-2:] # handle optional microseconds - https://strftime.org/ - %f if the parse fails (chain) try: return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f%z") except ValueError: return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S%z") def send_email(message, subject="Inactive on-prem AD employee accounts (180d)", server='10.1.1.2', from_email='audit-tool@company.com'): """ Send email (to Slack, Exchange, etc. :param message: :param to_email: :param subject: :param server: :param from_email: :return: """ # msg = EmailMessage() msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = from_email msg['To'] = "monitoring@company.com" message = '
' + message + '
' part2 = MIMEText(message, 'html') # msg.set_content(message) msg.attach(part2) server = smtplib.SMTP(server) server.set_debuglevel(1) # server.login(from_email, ad_audit_password) # user & password server.send_message(msg) server.quit() print('successfully sent the mail.') def _resolve_ms_ad_status_code(code : int): """ Resolve the status code from MS AD :param code: :return: """ code_lookup_table = { 512: "Enabled, PW policy enforced", 514: "Disabled, PW policy enforced", 544: "Enabled, PW policy NOT enforced", 66048: "Enabled, PW policy NOT enforced" } if code in code_lookup_table: return code_lookup_table[code] else: return code def _connect_to_msaddcs(): pass def analyze_ad_groups(): # move global variables to local scope domain_name = 'company' domain_suffix = "int" server_name = ad_server failover_server_name = ad_server_2 user_name = ad_audit_user password = ad_audit_password # Setup the connection to the AD server server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, auto_bind=True) # Search for all empty security groups(, excluding dynamic groups) conn.search( search_base=f'dc={domain_name},dc={domain_suffix}', search_filter = '(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648))', # search_filter='(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648)(!(groupType:1.2.840.113556.1.4.803:=2147483648)))', search_scope=SUBTREE, attributes=['member'] ) # filer out dynamic groups manually (not supported by the search filter) based on the prefix empty_groups = [entry.entry_dn for entry in conn.entries if len(entry.member) == 0 and not any(entry.entry_dn.startswith(s) for s in ['CN=dyn', 'CN=app'])] # tabulate the results empty_ad_security_groups = pd.DataFrame({'Empty Security Groups': empty_groups}) # Print the list of empty security groups print(empty_ad_security_groups) # Close the connection conn.unbind() def analyze_ad_admins(): # move global variables to local scope domain_name = 'company' domain_suffix = "int" server_name = ad_server failover_server_name = ad_server_2 user_name = ad_audit_user password = ad_audit_password # Setup the connection to the AD server server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, auto_bind=True) # diff the results against the prior run (if available) if os.path.isfile('ad_admins.csv'): prior_ad_admins = pd.read_csv('ad_admins.csv') # Search for all users who are members of Domain Admins or Enterprise Admins group search_filter = f'(&(objectClass=user)(|(memberOf=CN=Domain Admins,CN=Users,DC={domain_name},DC={domain_suffix})(memberOf=CN=Enterprise Admins,CN=Users,DC=your_domain,DC=com)))' search_base = f'dc={domain_name},dc={domain_suffix}' search_scope = SUBTREE attributes = ['sAMAccountName', 'displayName'] conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes) admin_users = [entry.sAMAccountName.value for entry in conn.entries] # Print the list of admin users # tabulate the results ad_admin_users = pd.DataFrame({'AD Domain Admin Users': admin_users}) print(ad_admin_users) # Close the connection conn.unbind() def analyze_ad_service_users(): # move global variables to local scope domain_name = 'company' domain_suffix = "int" server_name = ad_server failover_server_name = ad_server_2 user_name = ad_audit_user password = ad_audit_password # Setup the connection to the AD server server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, auto_bind=True) # Search for all service users search_filter = '(servicePrincipalName=*)' # search_filter = '(&(objectClass=user)(servicePrincipalName=*)(!(objectCategory=computer))(userAccountControl:1.2.840.113556.1.4.803:=2))' search_base = f'dc={domain_name},dc={domain_suffix}' search_scope = SUBTREE attributes = ['sAMAccountName', 'displayName'] conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes) service_users = [entry.sAMAccountName.value for entry in conn.entries] # Print the list of service users # tabulate the results ad_service_users = pd.DataFrame({'AD Service Users': service_users}) print(ad_service_users) # Close the connection conn.unbind() def analyze_ad_users(): """ Query AD for users :param server_name: :param user_name: :param password: :return: """ # move global variables to local scope server_name = ad_server failover_server_name = ad_server_2 user_name = ad_audit_user password = ad_audit_password # scope: employee, contractor, etc. domain_name = 'company' domain_suffix = "int" org_ou = "companynetwork" root_ou = "company" sub_ou = "users" # print header format_string = '{:25} {:>6} {:19} {:35} {}' print(format_string.format('User', 'Logins', 'Last Login', 'State', 'Description')) # global exception handling try: server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, auto_bind=True) # ldap search query conn.search( search_base=f'OU={root_ou},OU={sub_ou},OU={org_ou},DC={domain_name},DC={domain_suffix}', search_filter='(objectClass=person)', search_scope=SUBTREE, attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES] ) # check if we have any results, and if not exit the program if len(conn.entries) == 0: print("No results") return 0 # list of object entries from LDAP query onprem_ad_user_list = [] for e in conn.entries: # handle missing description try: desc = e.description except LDAPCursorError: desc = "" # lookup the status code from ms ad status_text = _resolve_ms_ad_status_code(e.userAccountControl.value) print(format_string.format(str(e.name), str(e.logonCount), str(e.lastLogon)[:19], str(status_text)[:35], desc)) # convert the entry to JSON, and append it to the list as a dictionary onprem_ad_user_entry_dict = json.loads(e.entry_to_json()) onprem_ad_user_list.append(onprem_ad_user_entry_dict) if verbose: print(e) def _tabularize_ad_user_data(onprem_ad_user_entry_dict : dict): # convert the list of dictionaries to a pandas dataframe # normalize the data, flatten the JSON onprem_ad_user_df = pd.json_normalize(onprem_ad_user_entry_dict) # due to issues with the processing in Elastic, we need to remove the following attributes onprem_ad_user_df.drop("attributes.msExchSafeSendersHash", inplace=True, axis=1) onprem_ad_user_df.drop("attributes.msExchBlockedSendersHash", inplace=True, axis=1) # fill in the lastLogonTimestamp with the lastLogon value if the lastLogonTimestamp is empty onprem_ad_user_df["attributes.lastLogonTimestamp"].fillna(onprem_ad_user_df["attributes.lastLogon"], inplace=True) # convert the date fields to datetime type, use UTC onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( lambda x: _parse_date(str(x[0]))) onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( lambda x: pd.to_datetime(x, errors='coerce', utc=True)) # check if last logon is 180 days ago, localize local timestamp to UTC for the comparison onprem_ad_user_df["inactive"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( lambda x: pd.Timestamp.now().tz_localize('UTC') - timedelta(days=180) > x) # simplify the account analysis by storing the user status in columns onprem_ad_user_df["inactive"] = onprem_ad_user_df["inactive"].astype(bool) onprem_ad_user_df["enabled"] = onprem_ad_user_df["attributes.userAccountControl"].apply(lambda x: 512 in x) return onprem_ad_user_df onprem_ad_users_df = _tabularize_ad_user_data(onprem_ad_user_list) def _filter_inactive_and_non_disabled_users(onprem_ad_users_df : pd.DataFrame): # filter for the inactive users AND enabled users inactive_ad_users_df = onprem_ad_users_df.loc[onprem_ad_users_df["inactive"] & onprem_ad_users_df["enabled"]] return inactive_ad_users_df inactive_onprem_ad_users_df = _filter_inactive_and_non_disabled_users(onprem_ad_users_df) # prepare the email report html part inactive_onprem_ad_users_count = inactive_onprem_ad_users_df.shape[0] # print the inactive users if detailed output is enabled if detailed_output and inactive_onprem_ad_users_count > 0: print() print("Inactive users enabled:") print( inactive_onprem_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]] ) inactive_ad_users_description_result = "Found " + str(inactive_onprem_ad_users_count) + \ " inactive enabled users in on-prem AD." print(inactive_ad_users_description_result) if inactive_onprem_ad_users_count == 0: inactive_enabled_onprem_ad_users_html_report.write("No inactive enabled users found in on-prem AD.\r\n") print("Terminated successfully") exit(0) def _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_ad_users_df : pd.DataFrame): inactive_enabled_onprem_ad_users_html_report.write("The following employee accounts are active in COMPANY " + "onprem AD, and have not logged in for 180 days:\r\n\r\n") inactive_enabled_onprem_ad_users_html_report.write( tabulate( inactive_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]], headers=["Name", "Last Logon"], tablefmt="psql", showindex=False) ) inactive_enabled_onprem_ad_users_html_report.write("\r\nThese may have been missed within the Joiner / Leaver process\r\n") return inactive_enabled_onprem_ad_users_html_report inactive_enabled_ad_users_html_report = _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_onprem_ad_users_df) print("Report Mail prepared, sending email...") send_email(message=inactive_enabled_ad_users_html_report.getvalue()) def __append_log_output_to_json_file(onprem_ad_users_df : pd.DataFrame): # serialize the dataframe to json and csv, keep lines for Elastic (append) log_output = StringIO() onprem_ad_users_df.to_json(path_or_buf=log_output, orient='records', lines=True) # append the JSON output to a file with open("data.json", "a+") as f: f.write(log_output.getvalue()) __append_log_output_to_json_file(onprem_ad_users_df) if detailed_output: # overwrite the output to a file onprem_ad_users_df.to_csv('data.csv', sep=";", index=False) # global exception with failure message except Exception as e: print(e) return 0 # close the connection, clean up finally: conn.unbind() if __name__ == '__main__': read_config() analyze_ad_users() # analyze_ad_groups() analyze_ad_admins() # analyze_ad_service_users() # send_email(message=inactive_enabled_onprem_ad_users_html_report.getvalue())