import pandas as pd import os import shutil import functools from datetime import datetime # import openpyxl # from UnixACLscan import serverScanLinux from getDetails import getDetails from getLocation import getLocation from getDetails_group import getDetailsGroup from adhelper import get_user_details application_name = "Informatica" reviewer_name = "Sylvia, Frank" server_env = {} server_env_orig = {} curr_dir = os.getcwd() input_dir = f"{curr_dir}\\Input" output_dir = f"{curr_dir}\\Output" server_file = f"{input_dir}\\Unix Servers.xlsx" developers_file = f"{input_dir}\\Developers.xlsx" non_indiv_sol_file = f"{input_dir}\\Developers.xlsx" try: df_developers = pd.read_excel(developers_file) df_server = pd.read_excel(server_file) # multi-sheet workbook, read as a dictionary of dataframes # because of sheet_name=None option df_non_indiv_sol = pd.read_excel(non_indiv_sol_file, sheet_name=None) cyber_acct = df_non_indiv_sol["Cyberark Data"]["Account"].tolist() deny_acct = df_non_indiv_sol["Deny Data"]["Account"].tolist() except Exception as e: print("Exception thrown while trying to read input files.\n\n") print(e) exit() scan_results_file = f"{output_dir}\\ScanResults.xlsx" manager_colnames = ("L0manager", "L1manager", "L2manager", "L3manager", "L4manager") def log(msg): """Log a message to stdout with timestamp.""" print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") # FIXME: Currently below function is not used # def generate_files(app_name): # global server_env # server_count = 0 # server_file_copied_count = 0 # input_dir = os.getcwd() + "\\Input\\" # output_dir = os.getcwd() + "\\Output\\" # for f in os.listdir(output_dir): # os.remove(output_dir + f) # df_server = pd.read_excel(input_dir + "Unix Servers.xlsx", "Sheet1") # df_server_len = len(df_server.index) # for i in range(df_server_len): # if app_name.lower() == df_server.at[i, "Application"].lower(): # server_env[df_server.at[i, "Unix Server"]] = df_server.at[i, "Environment"] # server_path = df_server.at[i, "Path"] # server_count = server_count + 1 # try: # for f in os.listdir(server_path): # file_name, ext = os.path.splitext(f) # if df_server.at[i, "Unix Server"].lower() in str(file_name).lower(): # server_env_orig[file_name] = df_server.at[i, "Environment"] # shutil.copy2( # os.path.join(server_path, f"{file_name}{ext}"), # os.path.join(output_dir, f"{file_name}{ext}"), # ) # server_file_copied_count = server_file_copied_count + 1 # except: # raise RuntimeError("Could Not copy files") # if server_count != server_file_copied_count: # raise RuntimeError("All server scans not found") def color_yellow(val: str) -> str: if val in ( "Excluded from Review - Permissions", "Excluded from Review - Developer", ): return "background-color : yellow" def dump_group_info_file(groups: list[str]): """Fetch group details and write to 'Group Details.xlsx' file.""" df_group_info = pd.DataFrame(columns=["Group Name", "Description", "Notes"]) for group in groups: details = getDetailsGroup(group) # FIXME: replace this function df_group_info.append( { "Group Name": group, "Description": details["desc"], "Notes": details["notes"], }, ignore_index=True, ) df_group_info.drop_duplicates(inplace=True) filepath = f"{output_dir}\\Group Details.xlsx" df_group_info.to_excel(filepath, index=False) log(f"Details of {len(df_group_info.index)} groups written to '{filepath}'") def dump_supplemental_docs(scan_results: dict[str, pd.DataFrame]): """Create supplemental documents for prod and pre-prod.""" global server_env senv_dict = { "Prod": [k.lower() for (k, v) in server_env.items() if v == "Production"], "Pre Prod": [ k.lower() for (k, v) in server_env.items() if v == "Pre-Production" ], } sheet_names = ("Owner entitlements", "Group entitlements", "World entitlements") for env, env_servers in senv_dict.items(): if len(env_servers) == 0: continue dfs = { k: scan_results[k] .loc[scan_results[k]["Server Name"].isin(env_servers)] .fillna("") for k in sheet_names } fpath = f"{output_dir}\\{application_name} Supplemental Document - {env}.xlsx" with pd.ExcelWriter(fpath, mode="w") as writer: for k, v in prod_dfs.items(): v.to_excel(writer, sheet_name=k, index=False) writer.save() def unix_format(): """Format unix server report.""" scan_results = pd.read_excel(scan_results_file, sheet_name=None) # result = functools.reduce(lambda x, y: x|y, [df_owner[m].isin(df_developers["Developers"]) for m in cols]) df_owner = scan_results["Owner entitlements"].fillna("") df_group = scan_results["Group entitlements"].fillna("") df_world = scan_results["World entitlements"].fillna("") df_server_logon_access = scan_results["Server logon access"].fillna("") # sheet 1 - Owner entitlements log("Formatting sheet 1 - Owner entitlements") df_owner["Permission Source"] = "Owner" # TODO: check for developers here df_owner.drop( ["Path", "CostCentre", "L1manager", "L2manager", "L3manager", "L4manager"], axis=1, inplace=True, ) df_owner.drop_duplicates(inplace=True) df_owner.reset_index(drop=True, inplace=True) # sheet 2 - Group entitlements log("Formatting sheet 2 - Group entitlements") df_group.drop(["Path"], axis=1, inplace=True) df_group.drop_duplicates(inplace=True) df_group.reset_index(drop=True, inplace=True) df_group["Permission Source"] = "Group" df_group["EmployeeType"] = "Group" df_group["Company"] = "" df_group["L0manager"] = "" df_group.rename(columns={"Group/User": "UserID"}, inplace=True) df_group["Name"] = df_group["UserID"] dump_group_info_file(df_group["UserID"].tolist()) # write group details # sheet 3 - World entitlements log("Formatting sheet 3 - World entitlements") df_world.drop(["Path", "Group/User"], axis=1, inplace=True) df_world.drop_duplicates(inplace=True) df_world.reset_index(drop=True, inplace=True) # a dictionary of server: [permissions] where permission is greater than read. servers_from_world: dict[str, list[str]] = {} # include permissions greater than read for _, row in df_world.iterrows: server = row["Server Name"] permission = row["Entitlement"] if "write" in str(permission).lower(): if server not in servers_from_world: servers_from_world[server] = [permission] else: perm = servers_from_world[server] perm.append(permission) else: continue # sheet 4 - Server logon access log("Formatting sheet 4 - Server logon access") world_filter = df_server_logon_access["Server Name"].isin( list(servers_from_world.keys()) ) df_server_logon_access = df_server_logon_access.loc[world_filter] # TODO: check for developers here df_server_logon_access.drop( [ "Account Name", "CostCentre", "L1manager", "L2manager", "L3manager", "L4manager", ], axis=1, inplace=True, ) df_server_logon_access.drop_duplicates(inplace=True) df_server_logon_access.reset_index(drop=True, inplace=True) df_server_logon_access["Permission Source"] = "World" # if there are multiple permissions, put "Multiple, see supplemental document" # else put the permission in "Entitlement" column. for _, row in df_server_logon_access.iterrows(): server = row["Server Name"] perm = servers_from_world[server] if len(perm) > 1: row["Entitlement"] = "Multiple, see supplemental documents" else: row["Entitlement"] = perm mul = pd.concat([df_owner, df_group, df_server_logon_access], axis=0) mul.drop_duplicates(inplace=True) mul.reset_index(drop=True, inplace=True) mul["Environment"] = "Unidentified" # default value for _, row in mul.iterrows(): server = str(row["Server Name"]).lower() env = server_env.get(server, None) if env is not None: row["Environment"] = env reviewer_details = get_user_details(reviewer_name) mul["Reviewer"] = reviewer_name mul["Reviewer Email"] = reviewer_details["User Email"] mul["Escalation Reviewer"] = reviewer_details["Manager Name"] mul["Escalation Reviewer Email"] = reviewer_details["Manager Email"] mul["Non-Individual Solution"] = "Please Select Choice" mul["Action"] = "Please Select Action" for i, row in mul.iterrows(): user_id = row["UserID"] company = row["Company"] emp_type = row["EmployeeType"].lower() # make sure fillna("") is called owner = row["L0manager"] permission = row["Entitlement"] if "write" in permission.lower() or ( permission == "Multiple, see supplemental documents" ): row["Action"] = "Please Select Action" else: row["Action"] = "Excluded from Review - Permissions" # service/support accounts if emp_type in ("svc", "spt", "") and ( user_id.lower().startswith("svc") or user_id.lower().startswith("spt") ): # Fill names for svc and spt accounts if row["Name"] == "": row["Name"] = user_id # Non individual solution column if user_id in cyber_acct: row[ "Non-Individual Solution" ] = "CyberArk - Access to the Non-Individual Account is controlled thru CyberArk" elif user_id in deny_acct: row[ "Non-Individual Solution" ] = "Deny Local Login – Access to the Non-Individual Account is controlled thru Deny Local Login" elif company in ( "AGT", "GTS", "Aegon Global Technology", "Global Technology Services", ): row["Non-Individual Solution"] = "N/A – GTS Monitors Account" # Owner/Manager for service/support account if owner == "": manager = getLocation(user_id) if manager != "Not Found": row["L0manager"] = manager else: row["L0manager"] = "N/A" # Not a service or support account else: row["Non-Individual Solution"] = "N/A – Id is a User or Group" if row["Name"] == "": user_details = get_user_details(user_id) name = user_details["User Name"] if name != "Not Found": row["Name"] = name else: row["Name"] = user_id if owner == "": row["L0manager"] = "N/A" mul.rename( columns={ "UserID": "User ID", "Entitlement": "Permissions", "Name": "User Name", "EmployeeType": "Employee Type", "L0manager": "Manager/Owner", }, inplace=True, ) columns_order = ( "Reviewer", "Action", "Non-Individual Solution", "User ID", "User Name", "Server Name", "Permission Source", "Permissions", "Environment", "Employee Type", "Company", "Manager/Owner", "Escalation Reviewer", "Reviewer Email", "Escalation Reviewer Email", ) mul = mul[columns_order] mul.sort_values(by=["Action"], ascending=False, inplace=True) mul = mul.style.applymap(color_yellow) mul.to_excel( f"{output_dir}\\{application_name} - MUL.xlsx", sheet_name="MUL", index=False ) # generate_files(application_name) # serverScanLinux(os.getcwd()+'\\Output') formatReport()