292 lines
12 KiB
Python
292 lines
12 KiB
Python
import argparse
|
|
import configparser
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
from itertools import product
|
|
|
|
from conpeek_setup import util
|
|
from conpeek_setup.api import operator_api
|
|
|
|
|
|
def unpack_regex(regex):
|
|
# Helper function to expand ranges and lists within square brackets, e.g., [2-5] or [234]
|
|
def expand_square_brackets(pattern):
|
|
expanded = []
|
|
content = pattern[1:-1] # Remove the square brackets
|
|
if '-' in content:
|
|
# Handle ranges like [2-5]
|
|
start, end = content.split('-')
|
|
expanded = [str(i) for i in range(int(start), int(end) + 1)]
|
|
elif pattern == '\\d':
|
|
return [str(i) for i in range(10)]
|
|
else:
|
|
# Handle specific digits like [234]
|
|
expanded = list(content)
|
|
return expanded
|
|
|
|
# Split the regex into parts of digits and square bracket patterns
|
|
# This regex splits on digits and square brackets with their content
|
|
parts = re.findall(r'\[\d+-\d+\]|\[\d+\]|\d+|\\d', regex)
|
|
|
|
# Process parts to generate all possible values
|
|
expanded_parts = []
|
|
|
|
for part in parts:
|
|
if part.startswith('[') and part.endswith(']'):
|
|
# Handle square brackets
|
|
expanded_parts.append(expand_square_brackets(part))
|
|
elif part == '\d':
|
|
expanded_parts.append(expand_square_brackets(part))
|
|
|
|
else:
|
|
# Handle digits
|
|
expanded_parts.append([part])
|
|
|
|
for combination in product(*expanded_parts):
|
|
print(''.join(combination))
|
|
|
|
# Generate all combinations of expanded parts
|
|
possible_values = [''.join(combination) for combination in product(*expanded_parts)]
|
|
|
|
print(possible_values)
|
|
|
|
return possible_values
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="")
|
|
parser.add_argument("--config", type=str, required=True, help="Path to the config file")
|
|
parser.add_argument("--user", type=str, required=True, help="Operator login")
|
|
parser.add_argument("--password", type=str, required=True, help="Operator password")
|
|
parser.add_argument("--output-path", type=str, required=False, help="Path to the output")
|
|
|
|
args = parser.parse_args()
|
|
config = configparser.ConfigParser()
|
|
|
|
config_directory = args.config
|
|
if not os.path.abspath(config_directory):
|
|
config_directory = os.path.join(pathlib.Path(__file__).parent.resolve(), config_directory)
|
|
config.read(config_directory)
|
|
if args.output_path:
|
|
util.Settings.output_directory = args.output_path
|
|
|
|
output_directory = os.path.join(util.get_output_path(), "freeswitch")
|
|
template_freeswitch_directory = os.path.join(util.get_templates_path(), "freeswitch/conf")
|
|
|
|
app_domain = config["new_machine_network"]["app_installation_domain"]
|
|
api = operator_api.OperatorAPI(app_domain)
|
|
api.login(args.user, args.password)
|
|
|
|
tenant = None
|
|
tenants = api.get_tenants()
|
|
for o_tenant in tenants:
|
|
tenant = o_tenant
|
|
break
|
|
|
|
if not tenant:
|
|
util.print_red("Tenant not exists. First you need to create tenant and the run this script.")
|
|
sys.exit()
|
|
|
|
# sip trunks
|
|
for section in config:
|
|
if not section.startswith("sip_trunk_"):
|
|
continue
|
|
|
|
operator = "default"
|
|
if config.has_option(section, "operator") and config[section]["operator"]:
|
|
if config[section]["operator"] not in util.Constants.sip_trunk_operators:
|
|
util.print_red(f'Operator does not exists {config[section]["operator"]}. Using default sip trunk!')
|
|
else:
|
|
operator = config[section]["operator"]
|
|
|
|
siptrunk_number = section[10:]
|
|
proxy = config[section]["proxy"]
|
|
|
|
SIP_TRUNK_NUMBER = str(int(siptrunk_number) + 1)
|
|
SIP_TRUNK_INTERNAL_PORT = 5100 + int(siptrunk_number)
|
|
SIP_TRUNK_NAME = f"ic{siptrunk_number}"
|
|
SIP_TRUNK_NAME_WITH_PORT = f"{SIP_TRUNK_NAME}_{SIP_TRUNK_INTERNAL_PORT}"
|
|
SIP_PROFILE_NAME = f"gw1_external_{siptrunk_number}_{SIP_TRUNK_INTERNAL_PORT}"
|
|
|
|
template_freeswitch_directory_operator = os.path.join(util.get_templates_path(), f"freeswitch/{operator}")
|
|
|
|
sip_profile_template_path = os.path.join(template_freeswitch_directory_operator, "sip_profiles/gw_external.xml")
|
|
if not os.path.exists(sip_profile_template_path):
|
|
sip_profile_template_path = os.path.join(template_freeswitch_directory, "sip_profiles/gw_external.xml")
|
|
|
|
sip_profile_output_path = os.path.join(output_directory, f"conf/sip_profiles/{SIP_PROFILE_NAME}.xml")
|
|
if os.path.exists(sip_profile_output_path):
|
|
util.print_red(f"SIP profile {sip_profile_output_path} exists.")
|
|
continue
|
|
else:
|
|
util.copy_file(sip_profile_template_path, sip_profile_output_path)
|
|
|
|
SIP_TRUNK_USERNAME = ""
|
|
if config.has_option(section, "username"):
|
|
SIP_TRUNK_USERNAME = config[section]["username"]
|
|
|
|
SIP_TRUNK_PASSWORD = ""
|
|
if config.has_option(section, "password"):
|
|
SIP_TRUNK_PASSWORD = config[section]["password"]
|
|
|
|
sip_profile_replacements = {
|
|
'SIP_PROFILE_NAME': SIP_PROFILE_NAME,
|
|
'SIP_TRUNK_NAME_WITH_PORT': SIP_TRUNK_NAME_WITH_PORT,
|
|
'SIP_TRUNK_PROXY': proxy,
|
|
'SIP_TRUNK_INTERNAL_PORT': str(SIP_TRUNK_INTERNAL_PORT),
|
|
'SIP_TRUNK_USERNAME': SIP_TRUNK_USERNAME,
|
|
'SIP_TRUNK_PASSWORD': SIP_TRUNK_PASSWORD,
|
|
'EXTERNAL_IP': config["new_machine_network"]["external_ip"],
|
|
'INTERNAL_IP': config["new_machine_network"]["internal_ip"],
|
|
}
|
|
|
|
for name in sip_profile_replacements.keys():
|
|
util.basic_on_location_sed(sip_profile_output_path, name, sip_profile_replacements[name])
|
|
|
|
operator_gateways = api.get_operator_gateways()
|
|
operator_gateway = None
|
|
for o_gateway in operator_gateways:
|
|
if o_gateway["name"] == SIP_TRUNK_NAME:
|
|
operator_gateway = o_gateway
|
|
break
|
|
|
|
tenant_telco_networks = api.get_tenant_telco_networks(tenant["id"])
|
|
tenant_has_telco_network_exists = False
|
|
for tenant_has_telco_network in tenant_telco_networks:
|
|
if tenant_has_telco_network["gateway_external_sip_port"] == SIP_TRUNK_INTERNAL_PORT:
|
|
tenant_has_telco_network_exists = True
|
|
break
|
|
|
|
if not tenant_has_telco_network_exists:
|
|
api.create_tenant_telco_network(tenant["id"], {
|
|
"8021q_vlan_id": 0,
|
|
"gateway_external_sip_port": SIP_TRUNK_INTERNAL_PORT,
|
|
"telco_network_id": 1
|
|
})
|
|
|
|
if not operator_gateway:
|
|
util.print_black_light(f"Creating operator gateway {SIP_TRUNK_NAME}")
|
|
operator_gateway = api.create_operator_gateway({
|
|
"name": SIP_TRUNK_NAME,
|
|
"validate_e164": True,
|
|
"presentation_preservation_policy": "DISABLED",
|
|
"telco_network_id": 1,
|
|
"username": SIP_TRUNK_USERNAME,
|
|
"password": SIP_TRUNK_PASSWORD
|
|
})
|
|
|
|
if not operator_gateway:
|
|
util.print_red(f"Cant insert to operator_gateway {SIP_TRUNK_NAME}.")
|
|
continue
|
|
|
|
normalization_to_create = [{
|
|
"direction": "outbound",
|
|
"peer": "caller",
|
|
"pattern": '^48([0-9]{9})$',
|
|
"replacement": '\\1',
|
|
"priority": 10
|
|
}, {
|
|
"direction": "inbound",
|
|
"peer": "callee",
|
|
"pattern": '^([0-9]{9})$',
|
|
"replacement": '48\\1',
|
|
"priority": 10
|
|
}, {
|
|
"direction": "inbound",
|
|
"peer": "caller",
|
|
"pattern": '^([0-9]{9})$',
|
|
"replacement": '48\\1',
|
|
"priority": 10
|
|
}, {
|
|
"direction": "inbound",
|
|
"peer": "caller",
|
|
"pattern": '^\\+([0-9]*)$',
|
|
"replacement": '\\1',
|
|
"priority": 10
|
|
}, {
|
|
"direction": "inbound",
|
|
"peer": "caller",
|
|
"pattern": '^00([0-9]*)$',
|
|
"replacement": '\\1',
|
|
"priority": 10
|
|
}, {
|
|
"direction": "outbound",
|
|
"peer": "callee",
|
|
"pattern": '^([0-9]*)$',
|
|
"replacement": '+\\1',
|
|
"priority": 10
|
|
}]
|
|
|
|
for normalization in normalization_to_create:
|
|
api.create_operator_gateway_normalization(operator_gateway["id"], normalization)
|
|
|
|
ddi = config[section]["ddi"]
|
|
ddi = ddi.replace(",", ";")
|
|
ddi_patterns = ddi.split(";")
|
|
for ddi_pattern in ddi_patterns:
|
|
if ddi_pattern.startswith("^") and ddi_pattern.endswith("$"):
|
|
api.create_operator_routing({
|
|
"priority": 100,
|
|
"src_number_to_charge": ddi_pattern,
|
|
"src_presentation_number": ".*",
|
|
"dst": ".*",
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
})
|
|
|
|
ddi_list = unpack_regex(ddi_pattern)
|
|
if len(ddi_list) > 1000:
|
|
util.print_red("There cannot be more than 1000 numbers -- invalid regexp pattern")
|
|
sys.exit()
|
|
|
|
for ddi_number in ddi_list:
|
|
api.create_ddi(tenant["id"], {
|
|
"e164number": ddi_number,
|
|
"fax_enabled": 1,
|
|
"voice_enabled": 1,
|
|
"sms_enabled": 0,
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
})
|
|
|
|
elif "-" in ddi_pattern:
|
|
from_number = int(ddi_pattern.split("-")[0])
|
|
to_number = int(ddi_pattern.split("-")[1])
|
|
|
|
numbers = set(range(from_number, to_number + 1))
|
|
if len(numbers) > 1000:
|
|
util.print_red("There cannot be more than 1000 numbers")
|
|
sys.exit()
|
|
|
|
for number in numbers:
|
|
api.create_operator_routing({
|
|
"priority": 100,
|
|
"src_number_to_charge": f"^{number}$",
|
|
"src_presentation_number": ".*",
|
|
"dst": ".*",
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
})
|
|
|
|
api.create_ddi(tenant["id"], {
|
|
"e164number": number,
|
|
"fax_enabled": 1,
|
|
"voice_enabled": 1,
|
|
"sms_enabled": 0,
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
})
|
|
else:
|
|
api.create_operator_routing({
|
|
"priority": 100,
|
|
"src_number_to_charge": f"^{ddi_pattern}$",
|
|
"src_presentation_number": ".*",
|
|
"dst": ".*",
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
})
|
|
|
|
api.create_ddi(tenant["id"], {
|
|
"e164number": ddi_pattern,
|
|
"fax_enabled": 1,
|
|
"voice_enabled": 1,
|
|
"sms_enabled": 0,
|
|
"operator_gateway_id": operator_gateway["id"]
|
|
}) |