import argparse import configparser import os import pathlib import sys from datetime import datetime import pytz from conpeek_setup import util from conpeek_setup.api import operator_api from conpeek_setup.api.tenant_api import TenantAPI def validate_config(config): sections_to_validate = [ "tenant" ] for section in sections_to_validate: if not config.has_section(section): util.print_red(f"No {section} section in config") raise Exception(f"No {section} section in config") if __name__ == "__main__": parser = argparse.ArgumentParser(description="") parser.add_argument("--config", type=str, required=True, help="Path to the config file") parser.add_argument("--user", type=str, required=True, help="Operator login") parser.add_argument("--password", type=str, required=True, help="Operator password") args = parser.parse_args() config = configparser.ConfigParser() config_directory = args.config if not os.path.abspath(config_directory): config_directory = os.path.join(pathlib.Path(__file__).parent.resolve(), config_directory) config.read(config_directory) validate_config(config) app_domain = config["new_machine_network"]["app_installation_domain"] api = operator_api.OperatorAPI(app_domain) util.print_black_light(f"Wait for start TenantLoginAPI") api.wait_for_start(f"{api.api_tenant_login_url}/tenant/check_connection") util.print_green(f"TenantLoginAPI Started") api.login(args.user, args.password) util.print_black_light(f"Wait for start OperatorAPI") api.wait_for_start() util.print_green(f"OperatorAPI Started") tenant = None tenants = api.get_tenants() for o_tenant in tenants: tenant = o_tenant break if not tenant: # Create Tenant if not exists print("Creating tenant...") tenant = api.create_tenant({ "name": config["tenant"]["domain"], "domain": config["tenant"]["domain"], "time_zone": "Europe/Warsaw", "language": "pl", "active": 1, "estimated_number_of_users": 100, "allow_to_exceed_concurrent_packages": 0, "create_recording_checksum_file": 1, "video_recording_enabled": 0, "simultaneous_calls_limit": 100, "anonymous_sessions": 1, "access_network_id": 1, "plugin_enabled_domain_id": 1, "telco_network_id": 1 }) util.print_black_light(f"Created tenant ({tenant['id']}) {tenant['name']}.") else: util.print_yellow(f"Tenant {config['tenant']['domain']} exists, id={tenant['id']}.") license_packages = api.get_license_packages(tenant_id=tenant["id"]) packages_count = { "Super Admin": 3, } license_package_agreements = api.get_package_agreements(tenant_id=tenant["id"]) for license_package in license_package_agreements: if license_package["operator_license_package_name"] in packages_count: if license_package["commercial_license_package_amount"] >= packages_count[license_package["operator_license_package_name"]]: util.print_black_light(f'Licenses {license_package["operator_license_package_name"]} exists.') del packages_count[license_package["operator_license_package_name"]] super_admin_package_id = None package_activation_item_list = [] for package_license in license_packages: if package_license["name"] == "Super Admin": super_admin_package_id = package_license["id"] if package_license["name"] in packages_count: package_activation_item_list.append({ "license_package_amount": packages_count[package_license["name"]], "license_package_type": "COMMERCIAL", "concurrent_usage_limit": 0, "operator_license_package_id": package_license["id"] }) local_tz = pytz.timezone(tenant["time_zone"]) local_now = datetime.now(local_tz) local_start_of_day = local_tz.localize(datetime(local_now.year, local_now.month, local_now.day, 0, 0, 0)) utc_start_of_day = local_start_of_day.astimezone(pytz.utc) if package_activation_item_list: api.create_package_activation_request(tenant_id=tenant["id"], params={ "activate_date": utc_start_of_day.strftime("%Y-%m-%d %H:%M:%S"), "note": "Auto generated", "package_activation_item_list": package_activation_item_list }) # Create super administrators util.print_black_light("Creating super administrators...") super_admin_users_to_create = [ { "username": "super1", "extension": "1001" }, { "username": "super2", "extension": "1002" }, { "username": "super3", "extension": "1003" } ] super_admin_users_to_create_list = [] for user in super_admin_users_to_create: super_admin_users_to_create_list.append(user["username"]) users_in_tenant = [] super_tenant_user_id = None tenant_api = None user_mapping = {} users = api.get_users(tenant_id=tenant["id"]) for user in users: if not super_tenant_user_id: super_tenant_user_id = user["id"] user_mapping[user["username"]] = user["id"] if not tenant_api: login_data = api.login_as_user(tenant_id=tenant["id"], tenant_user_id=super_tenant_user_id) tenant_api = TenantAPI(app_domain) tenant_api.initialize_user_login(login_data) util.print_black_light(f"Wait for start TenantAPI") tenant_api.wait_for_start() util.print_green(f"TenantAPI Started") users_in_tenant.append(user["username"]) if super_admin_package_id: for user in super_admin_users_to_create: if user["username"] in users_in_tenant: util.print_black_light(f'User {user["username"]} exists.') continue d_user = api.create_user(tenant_id=tenant["id"], params={ "active": 1, "username": user["username"], "password": args.password, "main_license_package_id": super_admin_package_id, "language": "pl", "change_avatar_allowed": 1, "change_on_leave_allowed": 1, "on_leave": 0, "system_role": [], "tenant_role": [] }) user_mapping[user["username"]] = d_user["id"] if not super_tenant_user_id: super_tenant_user_id = d_user["id"] if not tenant_api: login_data = api.login_as_user(tenant_id=tenant["id"], tenant_user_id=super_tenant_user_id) tenant_api = TenantAPI(app_domain) tenant_api.initialize_user_login(login_data) util.print_black_light(f"Wait for start TenantAPI") tenant_api.wait_for_start() util.print_green(f"TenantAPI Started") tenant_api.update_feature_tenant_user_call(d_user["id"], { "call_enabled": 1, "extension": user["extension"], "click_to_dial_auto_answer": 1 }) tenant_api.update_feature_tenant_user_consultant(d_user["id"], { "work_state": "ONLINE" }) tenant_api.update_feature_tenant_user_consultant(d_user["id"], { "audio_enabled": 1, "file_enabled": 1, "chat_enabled": 1 }) tenant_api.update_feature_tenant_user_hotdesk(d_user["id"], { "hotdesk_enabled": 1, "hotdesk_identifier": user["extension"], "hotdesk_pin": user["extension"], }) tenant_api.create_web_link({ "name": "Web", "tenant_user_id": d_user["id"], "audio": 1, "chat": 1, "video": 1, "call_timeout": 30, "priority": 100, "state": "ON", "user_device_type": "WEBAPP" }) if not tenant_api: sys.exit()