diff --git a/ad-user-review.py b/ad-user-review.py new file mode 100644 index 0000000..3fd0036 --- /dev/null +++ b/ad-user-review.py @@ -0,0 +1,408 @@ +#!/usr/bin/env python3 + +import os +from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, AUTO_BIND_NO_TLS, SUBTREE +from ldap3.core.exceptions import LDAPCursorError +import json +import pandas as pd +from io import StringIO +from datetime import datetime, timedelta +# from email.message import EmailMessage +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from tabulate import tabulate + +# global variables +ad_audit_user = "" +ad_audit_password = "" +ad_server = "" +ad_server2 = "" +report_email = "" + +# development config flags +detailed_output = True # results to stdout +verbose = False # debug output + +inactive_enabled_onprem_ad_users_html_report = StringIO() + +def read_config(): + """ + Read config file and check if it is valid + :return: + """ + + with open('config.json') as f: + + # parse the JSON file and assert the values + try: + d = json.load(f) + + assert 'admin' in d['user'] + assert isinstance(d['password'], str) + assert isinstance(d['ad_server'], str) + assert isinstance(d['ad_server_2'], str) + assert isinstance(d['report_email'], str) + + except AssertionError: + print("Config file is not valid") + return 0 + + # move the values to global variables + global ad_audit_user + global ad_audit_password + global ad_server + global ad_server_2 + global report_email + + ad_audit_user = d['user'] + ad_audit_password = d['password'] + ad_server = d['ad_server'] + ad_server_2 = d['ad_server_2'] + report_email = d['report_email'] + + +def _parse_date(date_string: str): + """ + Parse date string, internal function + :param date_string: + :return: datetime object + """ + + # print(date_string) + # https://bugs.python.org/msg169952 BUG in Python + if ":" == date_string[-3:-2]: + date_string = date_string[:-3] + date_string[-2:] + # handle optional microseconds - https://strftime.org/ - %f if the parse fails (chain) + try: + return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f%z") + except ValueError: + return datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S%z") + + +def send_email(message, subject="Inactive on-prem AD employee accounts (180d)", server='10.1.1.2', + from_email='audit-tool@company.com'): + """ + Send email (to Slack, Exchange, etc. + :param message: + :param to_email: + :param subject: + :param server: + :param from_email: + :return: + """ + + # msg = EmailMessage() + msg = MIMEMultipart('alternative') + msg['Subject'] = subject + msg['From'] = from_email + msg['To'] = "monitoring@company.com" + + message = '
' + message + '
' + part2 = MIMEText(message, 'html') + # msg.set_content(message) + msg.attach(part2) + + server = smtplib.SMTP(server) + server.set_debuglevel(1) + # server.login(from_email, ad_audit_password) # user & password + server.send_message(msg) + server.quit() + print('successfully sent the mail.') + + +def _resolve_ms_ad_status_code(code : int): + """ + Resolve the status code from MS AD + :param code: + :return: + """ + + code_lookup_table = { + 512: "Enabled, PW policy enforced", + 514: "Disabled, PW policy enforced", + 544: "Enabled, PW policy NOT enforced", + 66048: "Enabled, PW policy NOT enforced" + } + + if code in code_lookup_table: + return code_lookup_table[code] + else: + return code + + +def _connect_to_msaddcs(): + pass + +def analyze_ad_groups(): + # move global variables to local scope + domain_name = 'company' + domain_suffix = "int" + server_name = ad_server + failover_server_name = ad_server_2 + user_name = ad_audit_user + password = ad_audit_password + + # Setup the connection to the AD server + server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) + conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, + auto_bind=True) + + # Search for all empty security groups(, excluding dynamic groups) + conn.search( + search_base=f'dc={domain_name},dc={domain_suffix}', + search_filter = '(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648))', + # search_filter='(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=2147483648)(!(groupType:1.2.840.113556.1.4.803:=2147483648)))', + search_scope=SUBTREE, + attributes=['member'] + ) + # filer out dynamic groups manually (not supported by the search filter) based on the prefix + empty_groups = [entry.entry_dn for entry in conn.entries if len(entry.member) == 0 and not any(entry.entry_dn.startswith(s) for s in ['CN=dyn', 'CN=app'])] + + # tabulate the results + empty_ad_security_groups = pd.DataFrame({'Empty Security Groups': empty_groups}) + + # Print the list of empty security groups + print(empty_ad_security_groups) + + # Close the connection + conn.unbind() + + +def analyze_ad_admins(): + # move global variables to local scope + domain_name = 'company' + domain_suffix = "int" + server_name = ad_server + failover_server_name = ad_server_2 + user_name = ad_audit_user + password = ad_audit_password + + # Setup the connection to the AD server + server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) + conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, + auto_bind=True) + + # diff the results against the prior run (if available) + if os.path.isfile('ad_admins.csv'): + prior_ad_admins = pd.read_csv('ad_admins.csv') + + # Search for all users who are members of Domain Admins or Enterprise Admins group + search_filter = f'(&(objectClass=user)(|(memberOf=CN=Domain Admins,CN=Users,DC={domain_name},DC={domain_suffix})(memberOf=CN=Enterprise Admins,CN=Users,DC=your_domain,DC=com)))' + search_base = f'dc={domain_name},dc={domain_suffix}' + search_scope = SUBTREE + attributes = ['sAMAccountName', 'displayName'] + conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes) + admin_users = [entry.sAMAccountName.value for entry in conn.entries] + + # Print the list of admin users + # tabulate the results + ad_admin_users = pd.DataFrame({'AD Domain Admin Users': admin_users}) + print(ad_admin_users) + + # Close the connection + conn.unbind() + +def analyze_ad_service_users(): + # move global variables to local scope + domain_name = 'company' + domain_suffix = "int" + server_name = ad_server + failover_server_name = ad_server_2 + user_name = ad_audit_user + password = ad_audit_password + + # Setup the connection to the AD server + server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) + conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, + auto_bind=True) + + # Search for all service users + search_filter = '(servicePrincipalName=*)' + # search_filter = '(&(objectClass=user)(servicePrincipalName=*)(!(objectCategory=computer))(userAccountControl:1.2.840.113556.1.4.803:=2))' + search_base = f'dc={domain_name},dc={domain_suffix}' + search_scope = SUBTREE + attributes = ['sAMAccountName', 'displayName'] + conn.search(search_base=search_base, search_filter=search_filter, search_scope=search_scope, attributes=attributes) + service_users = [entry.sAMAccountName.value for entry in conn.entries] + + # Print the list of service users + # tabulate the results + ad_service_users = pd.DataFrame({'AD Service Users': service_users}) + print(ad_service_users) + + # Close the connection + conn.unbind() + + +def analyze_ad_users(): + """ + Query AD for users + :param server_name: + :param user_name: + :param password: + :return: + """ + + # move global variables to local scope + server_name = ad_server + failover_server_name = ad_server_2 + user_name = ad_audit_user + password = ad_audit_password + + # scope: employee, contractor, etc. + domain_name = 'company' + domain_suffix = "int" + + org_ou = "companynetwork" + root_ou = "company" + sub_ou = "users" + + # print header + format_string = '{:25} {:>6} {:19} {:35} {}' + print(format_string.format('User', 'Logins', 'Last Login', 'State', 'Description')) + + # global exception handling + try: + + server = Server(server_name, port=636, use_ssl=True, get_info=ALL, connect_timeout=10, allowed_referral_hosts=[(failover_server_name, True)]) + conn = Connection(server, user='{}\\{}'.format(domain_name, user_name), password=password, authentication=NTLM, + auto_bind=True) + + # ldap search query + conn.search( + search_base=f'OU={root_ou},OU={sub_ou},OU={org_ou},DC={domain_name},DC={domain_suffix}', + search_filter='(objectClass=person)', + search_scope=SUBTREE, + attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES] + ) + + # check if we have any results, and if not exit the program + if len(conn.entries) == 0: + print("No results") + return 0 + + # list of object entries from LDAP query + onprem_ad_user_list = [] + + for e in conn.entries: + # handle missing description + try: + desc = e.description + except LDAPCursorError: + desc = "" + + # lookup the status code from ms ad + status_text = _resolve_ms_ad_status_code(e.userAccountControl.value) + + print(format_string.format(str(e.name), str(e.logonCount), str(e.lastLogon)[:19], str(status_text)[:35], + desc)) + + # convert the entry to JSON, and append it to the list as a dictionary + onprem_ad_user_entry_dict = json.loads(e.entry_to_json()) + onprem_ad_user_list.append(onprem_ad_user_entry_dict) + + if verbose: + print(e) + + def _tabularize_ad_user_data(onprem_ad_user_entry_dict : dict): + # convert the list of dictionaries to a pandas dataframe + # normalize the data, flatten the JSON + onprem_ad_user_df = pd.json_normalize(onprem_ad_user_entry_dict) + + # due to issues with the processing in Elastic, we need to remove the following attributes + onprem_ad_user_df.drop("attributes.msExchSafeSendersHash", inplace=True, axis=1) + onprem_ad_user_df.drop("attributes.msExchBlockedSendersHash", inplace=True, axis=1) + + # fill in the lastLogonTimestamp with the lastLogon value if the lastLogonTimestamp is empty + onprem_ad_user_df["attributes.lastLogonTimestamp"].fillna(onprem_ad_user_df["attributes.lastLogon"], inplace=True) + + # convert the date fields to datetime type, use UTC + onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( + lambda x: _parse_date(str(x[0]))) + onprem_ad_user_df["attributes.lastLogonTimestamp"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( + lambda x: pd.to_datetime(x, errors='coerce', utc=True)) + + # check if last logon is 180 days ago, localize local timestamp to UTC for the comparison + onprem_ad_user_df["inactive"] = onprem_ad_user_df["attributes.lastLogonTimestamp"].apply( + lambda x: pd.Timestamp.now().tz_localize('UTC') - timedelta(days=180) > x) + + # simplify the account analysis by storing the user status in columns + onprem_ad_user_df["inactive"] = onprem_ad_user_df["inactive"].astype(bool) + onprem_ad_user_df["enabled"] = onprem_ad_user_df["attributes.userAccountControl"].apply(lambda x: 512 in x) + return onprem_ad_user_df + onprem_ad_users_df = _tabularize_ad_user_data(onprem_ad_user_list) + + def _filter_inactive_and_non_disabled_users(onprem_ad_users_df : pd.DataFrame): + # filter for the inactive users AND enabled users + inactive_ad_users_df = onprem_ad_users_df.loc[onprem_ad_users_df["inactive"] & onprem_ad_users_df["enabled"]] + return inactive_ad_users_df + inactive_onprem_ad_users_df = _filter_inactive_and_non_disabled_users(onprem_ad_users_df) + + # prepare the email report html part + inactive_onprem_ad_users_count = inactive_onprem_ad_users_df.shape[0] + + # print the inactive users if detailed output is enabled + if detailed_output and inactive_onprem_ad_users_count > 0: + print() + print("Inactive users enabled:") + + print( + inactive_onprem_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]] + ) + + inactive_ad_users_description_result = "Found " + str(inactive_onprem_ad_users_count) + \ + " inactive enabled users in on-prem AD." + print(inactive_ad_users_description_result) + if inactive_onprem_ad_users_count == 0: + inactive_enabled_onprem_ad_users_html_report.write("No inactive enabled users found in on-prem AD.\r\n") + print("Terminated successfully") + exit(0) + + def _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_ad_users_df : pd.DataFrame): + inactive_enabled_onprem_ad_users_html_report.write("The following employee accounts are active in COMPANY " + + "onprem AD, and have not logged in for 180 days:\r\n\r\n") + inactive_enabled_onprem_ad_users_html_report.write( + tabulate( + inactive_ad_users_df[["attributes.name", "attributes.lastLogonTimestamp"]], + headers=["Name", "Last Logon"], + tablefmt="psql", + showindex=False) + ) + inactive_enabled_onprem_ad_users_html_report.write("\r\nThese may have been missed within the Joiner / Leaver process\r\n") + return inactive_enabled_onprem_ad_users_html_report + inactive_enabled_ad_users_html_report = _prepare_html_email_report_section_inactive_enabled_onprem_ad_users(inactive_onprem_ad_users_df) + + print("Report Mail prepared, sending email...") + send_email(message=inactive_enabled_ad_users_html_report.getvalue()) + + def __append_log_output_to_json_file(onprem_ad_users_df : pd.DataFrame): + # serialize the dataframe to json and csv, keep lines for Elastic (append) + log_output = StringIO() + onprem_ad_users_df.to_json(path_or_buf=log_output, orient='records', lines=True) + # append the JSON output to a file + with open("data.json", "a+") as f: + f.write(log_output.getvalue()) + __append_log_output_to_json_file(onprem_ad_users_df) + + if detailed_output: + # overwrite the output to a file + onprem_ad_users_df.to_csv('data.csv', sep=";", index=False) + + # global exception with failure message + except Exception as e: + print(e) + return 0 + + # close the connection, clean up + finally: + conn.unbind() + + +if __name__ == '__main__': + read_config() + analyze_ad_users() + # analyze_ad_groups() + analyze_ad_admins() + # analyze_ad_service_users() + # send_email(message=inactive_enabled_onprem_ad_users_html_report.getvalue()) \ No newline at end of file