Files
templates/lib/create_tenant.py
2026-01-22 12:37:26 +01:00

229 lines
8.2 KiB
Python

import argparse
import configparser
import os
import pathlib
import sys
from datetime import datetime
import pytz
from conpeek_setup import util
from conpeek_setup.api import operator_api
from conpeek_setup.api.tenant_api import TenantAPI
def validate_config(config):
sections_to_validate = [
"tenant"
]
for section in sections_to_validate:
if not config.has_section(section):
util.print_red(f"No {section} section in config")
raise Exception(f"No {section} section in config")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="")
parser.add_argument("--config", type=str, required=True, help="Path to the config file")
parser.add_argument("--user", type=str, required=True, help="Operator login")
parser.add_argument("--password", type=str, required=True, help="Operator password")
args = parser.parse_args()
config = configparser.ConfigParser()
config_directory = args.config
if not os.path.abspath(config_directory):
config_directory = os.path.join(pathlib.Path(__file__).parent.resolve(), config_directory)
config.read(config_directory)
validate_config(config)
app_domain = config["new_machine_network"]["app_installation_domain"]
api = operator_api.OperatorAPI(app_domain)
util.print_black_light(f"Wait for start TenantLoginAPI")
api.wait_for_start(f"{api.api_tenant_login_url}/tenant/check_connection")
util.print_green(f"TenantLoginAPI Started")
api.login(args.user, args.password)
util.print_black_light(f"Wait for start OperatorAPI")
api.wait_for_start()
util.print_green(f"OperatorAPI Started")
tenant = None
tenants = api.get_tenants()
for o_tenant in tenants:
tenant = o_tenant
break
if not tenant:
# Create Tenant if not exists
print("Creating tenant...")
tenant = api.create_tenant({
"name": config["tenant"]["domain"],
"domain": config["tenant"]["domain"],
"time_zone": "Europe/Warsaw",
"language": "pl",
"active": 1,
"estimated_number_of_users": 100,
"allow_to_exceed_concurrent_packages": 0,
"create_recording_checksum_file": 1,
"video_recording_enabled": 0,
"simultaneous_calls_limit": 100,
"anonymous_sessions": 1,
"access_network_id": 1,
"plugin_enabled_domain_id": 1,
"telco_network_id": 1
})
util.print_black_light(f"Created tenant ({tenant['id']}) {tenant['name']}.")
else:
util.print_yellow(f"Tenant {config['tenant']['domain']} exists, id={tenant['id']}.")
license_packages = api.get_license_packages(tenant_id=tenant["id"])
packages_count = {
"Super Admin": 3,
}
license_package_agreements = api.get_package_agreements(tenant_id=tenant["id"])
for license_package in license_package_agreements:
if license_package["operator_license_package_name"] in packages_count:
if license_package["commercial_license_package_amount"] >= packages_count[license_package["operator_license_package_name"]]:
util.print_black_light(f'Licenses {license_package["operator_license_package_name"]} exists.')
del packages_count[license_package["operator_license_package_name"]]
super_admin_package_id = None
package_activation_item_list = []
for package_license in license_packages:
if package_license["name"] == "Super Admin":
super_admin_package_id = package_license["id"]
if package_license["name"] in packages_count:
package_activation_item_list.append({
"license_package_amount": packages_count[package_license["name"]],
"license_package_type": "COMMERCIAL",
"concurrent_usage_limit": 0,
"operator_license_package_id": package_license["id"]
})
local_tz = pytz.timezone(tenant["time_zone"])
local_now = datetime.now(local_tz)
local_start_of_day = local_tz.localize(datetime(local_now.year, local_now.month, local_now.day, 0, 0, 0))
utc_start_of_day = local_start_of_day.astimezone(pytz.utc)
if package_activation_item_list:
api.create_package_activation_request(tenant_id=tenant["id"], params={
"activate_date": utc_start_of_day.strftime("%Y-%m-%d %H:%M:%S"),
"note": "Auto generated",
"package_activation_item_list": package_activation_item_list
})
# Create super administrators
util.print_black_light("Creating super administrators...")
super_admin_users_to_create = [
{
"username": "super1",
"extension": "1001"
},
{
"username": "super2",
"extension": "1002"
},
{
"username": "super3",
"extension": "1003"
}
]
super_admin_users_to_create_list = []
for user in super_admin_users_to_create:
super_admin_users_to_create_list.append(user["username"])
users_in_tenant = []
super_tenant_user_id = None
tenant_api = None
user_mapping = {}
users = api.get_users(tenant_id=tenant["id"])
for user in users:
if not super_tenant_user_id:
super_tenant_user_id = user["id"]
user_mapping[user["username"]] = user["id"]
if not tenant_api:
login_data = api.login_as_user(tenant_id=tenant["id"], tenant_user_id=super_tenant_user_id)
tenant_api = TenantAPI(app_domain)
tenant_api.initialize_user_login(login_data)
util.print_black_light(f"Wait for start TenantAPI")
tenant_api.wait_for_start()
util.print_green(f"TenantAPI Started")
users_in_tenant.append(user["username"])
if super_admin_package_id:
for user in super_admin_users_to_create:
if user["username"] in users_in_tenant:
util.print_black_light(f'User {user["username"]} exists.')
continue
d_user = api.create_user(tenant_id=tenant["id"], params={
"active": 1,
"username": user["username"],
"password": args.password,
"main_license_package_id": super_admin_package_id,
"language": "pl",
"change_avatar_allowed": 1,
"change_on_leave_allowed": 1,
"on_leave": 0,
"system_role": [],
"tenant_role": []
})
user_mapping[user["username"]] = d_user["id"]
if not super_tenant_user_id:
super_tenant_user_id = d_user["id"]
if not tenant_api:
login_data = api.login_as_user(tenant_id=tenant["id"], tenant_user_id=super_tenant_user_id)
tenant_api = TenantAPI(app_domain)
tenant_api.initialize_user_login(login_data)
util.print_black_light(f"Wait for start TenantAPI")
tenant_api.wait_for_start()
util.print_green(f"TenantAPI Started")
tenant_api.update_feature_tenant_user_call(d_user["id"], {
"call_enabled": 1,
"extension": user["extension"],
"click_to_dial_auto_answer": 1
})
tenant_api.update_feature_tenant_user_consultant(d_user["id"], {
"work_state": "ONLINE"
})
tenant_api.update_feature_tenant_user_consultant(d_user["id"], {
"audio_enabled": 1,
"file_enabled": 1,
"chat_enabled": 1
})
tenant_api.update_feature_tenant_user_hotdesk(d_user["id"], {
"hotdesk_enabled": 1,
"hotdesk_identifier": user["extension"],
"hotdesk_pin": user["extension"],
})
tenant_api.create_web_link({
"name": "Web",
"tenant_user_id": d_user["id"],
"audio": 1,
"chat": 1,
"video": 1,
"call_timeout": 30,
"priority": 100,
"state": "ON",
"user_device_type": "WEBAPP"
})
if not tenant_api:
sys.exit()