Tuesday, 4 December 2018

CUCM Finding Incorrect DMS Recording Configuration

Overview
Tool to check the recording configuration for a list of DNs specified in a CSV file, such as a user export from the call recording application.
Requires Python 3 to run, many Linux distros have Python installed by default. For Windows the easiest install is the official Python Windows version, or Miniconda works fine too:
Miniconda distribution of Python 3: https://conda.io/miniconda.html
Official Python distribution: https://www.python.org/downloads/

The lxml, Requests, urllib3 and Zeep libraries are required to work.

Version History
Written by Chris Perkins in 2018:
v1.0 – initial release.
v1.1 – fixes some edge cases.

All testing was done using Windows with CUCM v11.5.

Using the Tool
For a list of DNs in a CSV file, the tool finds phones (tkclass=1) & device profiles (tkclass=254) where the built-in bridge isn’t on or privacy isn’t off, automatic call recording isn't enabled, the recording profile doesn't match & recording media source isn't phone preferred. It can optionally output the results to another CSV file.
The input CSV file should contain the list of DNs in a single column, with no header, like the below:

It connects to CUCM via the AXL API, so the AXL schema for the version of CUCM in use is required, this is downloaded from CUCM via Application > Plugins > Cisco AXL Toolkit. The required files contained within the .zip file are AXLAPI.wsdl, AXLEnums.xsd and AXLSoap.xsd.
Different CUCM servers are defined in JSON formatted files, allowing for multiple CUCM clusters running different versions (and thus different AXL schemas). Load the CSV file via File > Load AXL:
 
It will then prompt for the password:

It will then prompt for the input CSV file:

If you wish to save the output in a CSV file, enter the filename into the text box:

Click Check Recording Config, the results will be displayed & optionally saved.

Customising the Tool
The configuration for connecting via AXL to a CUCM cluster & what recording profile(s) to check against are stored in JSON format, for example:
[
{
    "fqdn": "cucm-emea-pub.somewhere.com",
    "username": "AppAdmin",
    "wsdl_file": "file://C://temp//AXLAPI.wsdl",
    "subquery": "(dnmap.fkrecordingprofile!=(SELECT rp.pkid FROM recordingprofile rp WHERE rp.name LIKE 'NICE_NTR_ABITL_RP') AND dnmap.fkrecordingprofile!=(SELECT rp.pkid FROM recordingprofile rp WHERE rp.name LIKE 'NICE_NTR_RP'))"
}
]


  • The JSON file starts with [ and ends with ].
  • “fqdn” should be the FQDN or IP address of the target CUCM publisher.
  • “username” is an application or end user with the Standard AXL API Access role.
  • “wsdl_file” points to the location of the AXL schema, note the slightly different path syntax for Windows.
  • “subquery” is an SQL query that specifies the name of the recording profile, simply paste it into the quotes after LIKE.
It is possible to simultaneously check against multiple recording profiles by joining 2 sub-queries via AND:
"subquery": "(dnmap.fkrecordingprofile!=(SELECT rp.pkid FROM recordingprofile rp WHERE rp.name LIKE 'NICE_NTR_RP') AND dnmap.fkrecordingprofile!=(SELECT rp.pkid FROM recordingprofile rp WHERE rp.name LIKE 'RED_BOX_RP'))"

If you've adjusted the CallManager service parameters so that built-in bridge is on by default & privacy is off by default, change the SQL queries as follows:

            # Check for phones (tkclass=1)
            sql_statement = "SELECT d.name, d.description, n.dnorpattern, n.description AS ndescription FROM device d INNER JOIN devicenumplanmap dnmap ON dnmap.fkdevice=d.pkid INNER JOIN numplan n ON dnmap.fknumplan=n.pkid " \
                "INNER JOIN deviceprivacydynamic dpd ON dpd.fkdevice=d.pkid INNER JOIN recordingdynamic rd ON rd.fkdevicenumplanmap=dnmap.pkid WHERE (d.tkclass=1 AND n.dnorpattern='" \
                + dn + \
                "') AND (d.tkstatus_builtinbridge=0 OR dpd.tkstatus_callinfoprivate=1 OR " \
                + axl_json['subquery'] + \
                " OR dnmap.fkrecordingprofile IS NULL OR dnmap.tkpreferredmediasource!=2 OR rd.tkrecordingflag!=1) ORDER BY d.name"


            # Check for device profiles (tkclass=254)
            sql_statement = "SELECT d.name, d.description, n.dnorpattern, n.description AS ndescription FROM device d INNER JOIN devicenumplanmap dnmap ON dnmap.fkdevice=d.pkid INNER JOIN numplan n ON dnmap.fknumplan=n.pkid " \
                "INNER JOIN deviceprivacydynamic dpd ON dpd.fkdevice=d.pkid INNER JOIN recordingdynamic rd ON rd.fkdevicenumplanmap=dnmap.pkid WHERE (d.tkclass=254 AND n.dnorpattern='" \
                + dn + \
                "') AND (dpd.tkstatus_callinfoprivate=1 OR " \
                + axl_json['subquery'] + \
                " OR dnmap.fkrecordingprofile IS NULL OR dnmap.tkpreferredmediasource!=2 OR rd.tkrecordingflag!=1) ORDER BY d.name"


Source Code

#!/usr/bin/env python
# v1.1 - written by Chris Perkins in 2018
# For a list of DNs in a CSV file, find phones (tkclass=1) & device profiles (tkclass=254) where built-in bridge isn’t on or privacy isn’t off, automatic call recording isn't enabled,
# recording profile doesn't match & recording media source isn't phone preferred. Optionally output to another CSV file

# v1.1 - fixes some edge cases
# v1.0 - original release

# Original AXL SQL query code courtesy of Jonathan Els - https://afterthenumber.com/2018/04/27/serializing-thin-axl-sql-query-responses-with-python-zeep/

# To Do:
# Improve the GUI

import sys, json, csv
import tkinter as tk
import requests
from tkinter import ttk
from tkinter import filedialog, simpledialog, messagebox
from collections import OrderedDict
from zeep import Client
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.plugins import HistoryPlugin
from zeep.exceptions import Fault
from zeep.helpers import serialize_object
from requests import Session
from requests.auth import HTTPBasicAuth
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from lxml import etree

# GUI and main code
class DNRecordingCheckerFrame(tk.Frame):

    def __init__(self, parent):
        """Constructor checks parameters and initialise variables"""
        self.axl_input_filename = None
        self.axl_password = ""
        self.csv_input_filename = None
        tk.Frame.__init__(self, parent)
        parent.geometry("320x480")
        self.pack(fill=tk.BOTH, expand=True)
        menu_bar = tk.Menu(self)
        file_menu = tk.Menu(menu_bar, tearoff=0)
        file_menu.add_command(label="Load AXL", command=self.open_json_file_dialog)
        file_menu.add_separator()
        file_menu.add_command(label="Exit", command=self.quit)
        menu_bar.add_cascade(label="File", menu=file_menu)
        parent.config(menu=menu_bar)
        tk.Label(self, text="Output Filename:").place(relx=0.2, rely=0.0, height=22, width=200)
        self.output_csv_text = tk.StringVar()
        tk.Entry(self, textvariable=self.output_csv_text).place(relx=0.2, rely=0.05, height=22, width=200)
        tk.Button(self, text="Check Recording Config", command=self.check_recording).place(relx=0.265, rely=0.12, height=22, width=160)
        self.results_count_text = tk.StringVar()
        self.results_count_text.set("Results Found: ")
        tk.Label(self, textvariable=self.results_count_text).place(relx=0.35, rely=0.18, height=22, width=110)
        list_box_frame = tk.Frame(self, bd=2, relief=tk.SUNKEN)
        list_box_scrollbar_y = tk.Scrollbar(list_box_frame)
        list_box_scrollbar_x = tk.Scrollbar(list_box_frame, orient=tk.HORIZONTAL)
        self.list_box = tk.Listbox(list_box_frame, xscrollcommand=list_box_scrollbar_x.set, yscrollcommand=list_box_scrollbar_y.set)
        list_box_frame.place(relx=0.02, rely=0.22, relheight=0.75, relwidth=0.96)
        list_box_scrollbar_y.place(relx=0.94, rely=0.0, relheight=1.0, relwidth=0.06)
        list_box_scrollbar_x.place(relx=0.0, rely=0.94, relheight=0.06, relwidth=0.94)
        self.list_box.place(relx=0.0, rely=0.0, relheight=0.94, relwidth=0.94)
        list_box_scrollbar_y.config(command=self.list_box.yview)
        list_box_scrollbar_x.config(command=self.list_box.xview)

    def element_list_to_ordered_dict(self, elements):
        """Convert list to OrderedDict"""
        return [OrderedDict((element.tag, element.text) for element in row) for row in elements]


    def sql_query(self, service, sql_statement):
        """Execute SQL query via AXL and return results"""
        try:
            axl_resp = service.executeSQLQuery(sql=sql_statement)
            try:
                return self.element_list_to_ordered_dict(serialize_object(axl_resp)["return"]["rows"])
            except KeyError:
                # Single tuple response
                return self.element_list_to_ordered_dict(serialize_object(axl_resp)["return"]["row"])
            except TypeError:
                # No SQL tuples
                return serialize_object(axl_resp)["return"]
        except requests.exceptions.ConnectionError as e:
            tk.messagebox.showerror(title="Error", message=str(e))
            return None

    def read_axl(self, dn_list, output_filename):
        """Check configuration via AXL SQL query"""
        try:
            self.list_box.delete(0, tk.END)
            self.results_count_text.set("Results Found: ")
            with open(self.axl_input_filename) as f:
                axl_json_data = json.load(f)
                for axl_json in axl_json_data:
                    try:
                        if not axl_json['fqdn']:
                            tk.messagebox.showerror(title="Error", message="FQDN must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="FQDN must be specified.")
                        return
                    try:
                        if not axl_json['username']:
                            tk.messagebox.showerror(title="Error", message="Username must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="Username must be specified.")
                        return
                    try:
                        if not axl_json['wsdl_file']:
                            tk.messagebox.showerror(title="Error", message="WSDL file must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="WSDL file must be specified.")
                        return
                    try:
                        if not axl_json['subquery']:
                            tk.messagebox.showerror(title="Error", message="Subquery must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="Subquery must be specified.")
                        return
        except FileNotFoundError:
            messagebox.showerror(title="Error", message="Unable to open JSON file.")
            return
        except json.decoder.JSONDecodeError:
            messagebox.showerror(title="Error", message="Unable to parse JSON file.")
            return

        axl_binding_name = "{http://www.cisco.com/AXLAPIService/}AXLAPIBinding"
        axl_address = "https://{fqdn}:8443/axl/".format(fqdn=axl_json['fqdn'])
        session = Session()
        session.verify = False
        session.auth = HTTPBasicAuth(axl_json['username'], self.axl_password)
        transport = Transport(cache=SqliteCache(), session=session, timeout=60)
        history = HistoryPlugin()
        try:
            client = Client(wsdl=axl_json['wsdl_file'], transport=transport, plugins=[history])
        except FileNotFoundError as e:
            tk.messagebox.showerror(title="Error", message=str(e))
            return
        axl = client.create_service(axl_binding_name, axl_address)

        # For each DN read from CSV file
        cntr = 0
        result_list = [["Device Name", "Device Description", "DN", "DN Description"]]
        for dn in dn_list:
            # Check for phones (tkclass=1)
            sql_statement = "SELECT d.name, d.description, n.dnorpattern, n.description AS ndescription FROM device d INNER JOIN devicenumplanmap dnmap ON dnmap.fkdevice=d.pkid INNER JOIN numplan n ON dnmap.fknumplan=n.pkid " \
                "INNER JOIN deviceprivacydynamic dpd ON dpd.fkdevice=d.pkid INNER JOIN recordingdynamic rd ON rd.fkdevicenumplanmap=dnmap.pkid WHERE (d.tkclass=1 AND n.dnorpattern='" \
                + dn + \
                "') AND (d.tkstatus_builtinbridge!=1 OR dpd.tkstatus_callinfoprivate!=0 OR " \
                + axl_json['subquery'] + \
                " OR dnmap.fkrecordingprofile IS NULL OR dnmap.tkpreferredmediasource!=2 OR rd.tkrecordingflag!=1) ORDER BY d.name"
            try:
                for row in self.sql_query(service=axl, sql_statement=sql_statement):
                    try:
                        # Handle None results
                        if row['name'] is None:
                            d_name = ""
                        else:
                            d_name = row['name']
                        if row['description'] is None:
                            d_description = ""
                        else:
                            d_description = row['description']
                        if row['dnorpattern'] is None:
                            n_dnorpattern = ""
                        else:
                            n_dnorpattern = row['dnorpattern']
                        if row['ndescription'] is None:
                            n_description = ""
                        else:
                            n_description = row['ndescription']

                        self.list_box.insert(tk.END, d_name + ' "' + d_description + '", ' + n_dnorpattern + ' "' + n_ndescription + '"')
                        result_list.append(list(row.values()))
                        cntr += 1
                    except TypeError:
                        continue
            except TypeError:
                pass
            except Fault as thin_axl_error:
                tk.messagebox.showerror(title="Error", message=thin_axl_error.message)
                return

            # Check for device profiles (tkclass=254)
            sql_statement = "SELECT d.name, d.description, n.dnorpattern, n.description AS ndescription FROM device d INNER JOIN devicenumplanmap dnmap ON dnmap.fkdevice=d.pkid INNER JOIN numplan n ON dnmap.fknumplan=n.pkid " \
                "INNER JOIN deviceprivacydynamic dpd ON dpd.fkdevice=d.pkid INNER JOIN recordingdynamic rd ON rd.fkdevicenumplanmap=dnmap.pkid WHERE (d.tkclass=254 AND n.dnorpattern='" \
                + dn + \
                "') AND (dpd.tkstatus_callinfoprivate!=0 OR " \
                + axl_json['subquery'] + \
                " OR dnmap.fkrecordingprofile IS NULL OR dnmap.tkpreferredmediasource!=2 OR rd.tkrecordingflag!=1) ORDER BY d.name"
            try:
                for row in self.sql_query(service=axl, sql_statement=sql_statement):
                    try:
                        # Handle None results
                        if row['name'] is None:
                            d_name = ""
                        else:
                            d_name = row['name']
                        if row['description'] is None:
                            d_description = ""
                        else:
                            d_description = row['description']
                        if row['dnorpattern'] is None:
                            n_dnorpattern = ""
                        else:
                            n_dnorpattern = row['dnorpattern']
                        if row['ndescription'] is None:
                            n_description = ""
                        else:
                            n_description = row['ndescription']

                        self.list_box.insert(tk.END, d_name + ' "' + d_description + '", ' + n_dnorpattern + ' "' + n_ndescription + '"')
                        result_list.append(list(row.values()))
                        cntr += 1
                    except TypeError:
                        continue
            except TypeError:
                pass
            except Fault as thin_axl_error:
                tk.messagebox.showerror(title="Error", message=thin_axl_error.message)
                return
        self.results_count_text.set("Results Found: " + str(cntr))

        # Output to CSV file if required
        try:
            if len(output_filename) != 0:
                with open(output_filename, 'w', newline='') as csv_file:
                    writer = csv.writer(csv_file)
                    writer.writerows(result_list)
        except OSError:
            tk.messagebox.showerror(title="Error", message="Unable to write CSV file.")

    def check_recording(self):
        """Validate parameters, read CSV file of DNs and then call AXL query"""
        if not self.axl_input_filename:
            tk.messagebox.showerror(title="Error", message="No AXL file selected.")
            return
        if not self.csv_input_filename:
            tk.messagebox.showerror(title="Error", message="No CSV file selected.")
            return
        # Parse input CSV file
        dn_list = []
        try:
            with open(self.csv_input_filename, encoding='utf-8-sig') as f:
                reader = csv.reader(f)
                for row in reader:
                    dn_list.append(row[0])
        except FileNotFoundError:
            tk.messagebox.showerror(title="Error", message="Unable to open CSV file.")
            return

        output_string = self.output_csv_text.get()
        if len(output_string) == 0:
            self.read_axl(dn_list, '')
        else:
            self.read_axl(dn_list, output_string)

    def open_json_file_dialog(self):
        """Dialogue to prompt for JSON file to open and AXL password"""
        self.axl_input_filename = tk.filedialog.askopenfilename(initialdir="/", filetypes=(("JSON files", "*.json"),("All files", "*.*")))
        self.axl_password = tk.simpledialog.askstring("Input", "AXL Password?", show='*')
        self.csv_input_filename = tk.filedialog.askopenfilename(initialdir="/", filetypes=(("CSV files", "*.csv"),("All files", "*.*")))

if __name__ == "__main__":
    disable_warnings(InsecureRequestWarning)
    # Initialise TKinter GUI objects
    root = tk.Tk()
    root.title("DN Recording Checker v1.1")
    DNRecordingCheckerFrame(root)
    root.mainloop()

Monday, 22 October 2018

CUCM Daylight Savings Problems

Daylight savings time can be a problem, as the start & stop dates aren't necessarily the same every year due to legislative changes. For example this year Brazil changed the start date for DST, thus causing CUCM to change the time displayed on phones a week earlier than is correct.
CUCM uses the Olson timezone database, for which Cisco fairly regularly release patches to match changes to timezones around the world, so it is worth updating if you work on a multi-national deployment.
Unfortunately there's not a way to view when CUCM thinks it should be changing to DST from the GUI, but with a little bit of SQL we can get at this information to validate it.

First run show timezone list to view the available timezone names:

admin:show timezone list

   0 - Africa/Abidjan
   1 - Africa/Accra
   2 - Africa/Addis_Ababa
   3 - Africa/Algiers
   4 - Africa/Asmara


Then select * from typetimezone:

admin:run sql select * from typetimezone where name like 'America/Sao_Paulo'
enum name              description          moniker                    bias stddate             stdbias dstdate              dstbias abbreviation legacyname                             
==== ================= ==================== ========================== ==== =================== ======= ==================== ======= ============ =======================================
17   America/Sao_Paulo (GMT-03:00) Brasilia TIMEZONE_AMERICA_SAO_PAULO 180  0/2/0/3,00:00:00:00 0       0/10/0/3,00:00:00:00 -60     BST          E. South America Standard/Daylight Time


  • bias - how many minutes from UTC the time difference is
  • stddate - start date for standard time
  • stdbias - standard time offset from bias (if applicable)
  • dstdate - start date for daylight savings time
  • dstbias - DST offset from bias
The key to decoding the stddate & dstdate fields is that the format is day/month/year (not used)/week time. So looking at dstdate for Sao Paulo time above, 0 = Sunday, 10 = October, 0 = ignored, 3 = 3rd week & 00:00:00:00 = 24h time. In other words the clock changes by -60 minutes from normal (i.e. 1 hour ahead) on the 3rd Sunday in October at midnight. Except this year it's not supposed to be until November 4th at midnight & the install I ran the commands against hasn't been patched to update the Olson timezone data.

There's a bunch more useful information in this TechNote.

Friday, 7 September 2018

4300 / 4400 Forwarding CPU Architecture and Utilisation

The 4300 & 4400 series routers run IOS XE & behave quite differently from previous branch router models (e.g. 2900 or 3900 series). They use dedicated CPU cores to handle forwarding traffic, the 4300 series uses specific cores on the Intel Atom C2000 series CPU & the 4400 series have a dedicated Cavium Octeon series CPU for this purpose. There's separation of the control plane via a dedicated CPU core, with packet forwarding spread across multiple cores. However the IOSd process, which handles the CLI, runs on a separate CPU from the control plane. As a result show process cpu only displays the CPU utilisation by the IOSd process, which will generally be very low. Instead show platform hardware qfp active datapath utilization must be used to see the CPU utilisation by the forwarding CPU(s).

Here is the show process cpu history output of a 4331 router that is handling so much traffic punted to the control plane that it isn't able to process OSPF hellos & keeps dropping OSPF adjacencies:
wanr2#sh proc cpu history
                                           11111
      444444444444666664444444444666666666600000777775555566666555
  100
   90
   80
   70
   60
   50
   40
   30
   20
   10             *****          *******************************
     0....5....1....1....2....2....3....3....4....4....5....5....6
               0    5    0    5    0    5    0    5    0    5    0
               CPU% per second (last 60 seconds)
      1 1  1    1     1
      080562778529765604564547466449777678786796886767577678698777
  100
   90
   80
   70
   60
   50
   40
   30
   20
   10 ###**#****##***** ** * * **  *****#*****#*##*#****##*##*##
     0....5....1....1....2....2....3....3....4....4....5....5....6
               0    5    0    5    0    5    0    5    0    5    0
               CPU% per minute (last 60 minutes)
              * = maximum CPU%   # = average CPU%

For comparison, the forwarding CPU(s) are at 100%:
wanr2#sh plat hard qfp act datapath utilization
  CPP 0: Subdev 0            5 secs        1 min        5 min       60 min
Input:  Priority (pps)            2            2            2            2
                 (bps)         2744         2056         2072         2064
    Non-Priority (pps)        98504        98368        98543        77967
                 (bps)     48603624     48562944     48578680     38524136
           Total (pps)        98506        98370        98545        77969
                 (bps)     48606368     48565000     48580752     38526200
Output: Priority (pps)            2            3            2            3
                 (bps)         2704         3240         3152         3096
    Non-Priority (pps)          128          125          103          111
                 (bps)       243736       218816       174288       224744
           Total (pps)          130          128          105          114
                 (bps)       246440       222056       177440       227840
Processing: Load (pct)          100          100          100           79

If you know from which CPUs are assigned to which roles, you can also use show processes cpu platform sorted (0 is always IOSd):
wanr2#sh processes cpu platform sorted
CPU utilization for five seconds: 4%, one minute: 5%, five minutes: 4%
Core 0: CPU utilization for five seconds: 8%, one minute: 4%, five minutes: 3%
Core 1: CPU utilization for five seconds: 24%, one minute: 7%, five minutes: 4%
Core 2: CPU utilization for five seconds: 4%, one minute: 4%, five minutes: 3%
Core 3: CPU utilization for five seconds: 3%, one minute: 4%, five minutes: 4%
Core 4: CPU utilization for five seconds: 8%, one minute: 7%, five minutes: 7%
Core 5: CPU utilization for five seconds: 1%, one minute: 0%, five minutes: 0%
Core 6: CPU utilization for five seconds: 14%, one minute: 12%, five minutes: 11%
Core 7: CPU utilization for five seconds: 0%, one minute: 0%, five minutes: 0%

Note that these router platforms also have an optional higher throughput license, which unlocks more CPU cores for forwarding. If this feature has been licensed, it is enabled via the platform hardware throughput level command, which requires a reboot:
wanr2(config)#plat hardware throughput level ?
  100000  throughput in kbps
  300000  throughput in kbps
wanr2(config)#platform hardware throughput level 300000
         Feature Name:throughput
 
PLEASE  READ THE  FOLLOWING TERMS  CAREFULLY. INSTALLING THE LICENSE OR
LICENSE  KEY  PROVIDED FOR  ANY CISCO  PRODUCT  FEATURE  OR  USING SUCH
PRODUCT  FEATURE  CONSTITUTES  YOUR  FULL ACCEPTANCE  OF  THE FOLLOWING
TERMS. YOU MUST NOT PROCEED FURTHER IF YOU ARE NOT WILLING TO  BE BOUND
BY ALL THE TERMS SET FORTH HEREIN.
 
Use of this product feature requires  an additional license from Cisco,
together with an additional  payment.  You may use this product feature
on an evaluation basis, without payment to Cisco, for 60 days. Your use
of the  product,  including  during the 60 day  evaluation  period,  is
subject to the Cisco end user license agreement
http://www.cisco.com/en/US/docs/general/warranty/English/EU1KEN_.html
If you use the product feature beyond the 60 day evaluation period, you
must submit the appropriate payment to Cisco for the license. After the
60 day  evaluation  period,  your  use of the  product  feature will be
governed  solely by the Cisco  end user license agreement (link above),
together  with any supplements  relating to such product  feature.  The
above  applies  even if the evaluation  license  is  not  automatically
terminated  and you do  not receive any notice of the expiration of the
evaluation  period.  It is your  responsibility  to  determine when the
evaluation  period is complete and you are required to make  payment to
Cisco for your use of the product feature beyond the evaluation period.
 
Your  acceptance  of  this agreement  for the software  features on one
product  shall be deemed  your  acceptance  with  respect  to all  such
software  on all Cisco  products  you purchase  which includes the same
software.  (The foregoing  notwithstanding, you must purchase a license
for each software  feature you use past the 60 days evaluation  period,
so  that  if you enable a software  feature on  1000  devices, you must
purchase 1000 licenses for use past  the 60 day evaluation period.)
 
Activation  of the  software command line interface will be evidence of
your acceptance of this agreement.
 
 
ACCEPT? (yes/[no]): yes
% The config will take effect on next reboot

The current throughput level can be confirmed via show platform hardware throughput level:
wanr2#show plat hard throughput level
The current throughput level is 100000 kb/s

Further useful information can be found here:

Sunday, 24 June 2018

CUCM Dial Plan Analysis for Unused DNs

I've been tinkering with Python again, this in an updated version of a tool I wrote whilst working at AT&T, for the MACD team to aid with finding spare numbers within a direct dial range to use for DNs.

Overview
Tool to analyse CUCM dial plan to find unused phone numbers (i.e. no DN, translation pattern, route pattern, etc. that matches it), requires Python 3 to run.
For Windows the easiest install is the official Python Windows version, or Miniconda works fine too:
Miniconda distribution of Python 3: https://conda.io/miniconda.html
Official Python distribution: https://www.python.org/downloads/

The lxml, Requests, urllib3 and Zeep libraries are required to work.

Version History
Written by Chris Perkins in 2017 and 2018:
v1.0 – initial release with only CSV file support and CLI usage.
v1.1 – added GUI.
v1.2 – bug fixes.
v1.3 – added AXL support.
v1.4 - GUI adjustments & fixes some edge cases.

All testing was done using Windows. CSV files tested with CUCM v9.1 and v10.5, AXL tested with CUCM v11.5.

Using With CSV Files
This method imports dial plan information from CUCM via CSV files. These are created from within CUCM via Call Routing > Route Plan Report > View in file.

Therefore before using the tool, export the Route Plan Report from the CUCM cluster that you want to find unused numbers for.

Load the CSV file via File > Load CSV:


Then select a direct dial range from the drop down list under DN Range:


Click Find Unused DNs, it will then process the CSV file and find numbers in the selected range that aren’t currently in use. The list of unused DNs is in the format directory number / partition, so you can easily see which numbers and which partition the search is working on:

Unused DNs lists how many unused directory numbers were found during the dial plan analysis.
Dial Plan Entries Parsed lists how many possible numbers it had to analyse to find the unused DNs.

Using with AXL
This method imports dial plan information from CUCM using the AXL API. The AXL schema for the version of CUCM in use is required, this is downloaded from CUCM via Application > Plugins > Cisco AXL Toolkit. The requires files contained within the .zip file are AXLAPI.wsdl, AXLEnums.xsd and AXLSoap.xsd.
Different CUCM servers are defined in JSON formatted files, allowing for multiple CUCM clusters running different versions (and thus different AXL schemas). Load the CSV file via File > Load AXL:

It will then prompt for the password:

After this the process is identical to working with CSV files.

Customising the Tool
The direct dial ranges to search for can be customised, so that the tool can be used for any CUCM cluster. These settings are stored in dialplan.json in JSON format, for example:
[
{
    "description": "ANZ - Sydney - 2XXXX",
    "range_start": "20000",
    "range_end": "29999",
    "partition": "INTERNAL"
},
{
    "description": "ANZ - Adelaide - 30[23]XX",
    "range_start": "30200",
    "range_end": "30399",
    "partition": "INTERNAL"
}
]


  • The JSON file starts with [ and ends with ].
  • Each direct dial range is enclosed within { } and contains parameters for the description, range start, range end and partition. The field headings and values must be enclosed within “”.
  • The range end must be greater than the range start.
  • The direct dial ranges must have a comma after each, except for the last one.

So to add another range to the above example:
[
{
    "description": "ANZ - Sydney - 2XXXX",
    "range_start": "20000",
    "range_end": "29999",
    "partition": "INTERNAL"
},
{
    "description": "ANZ - Adelaide - 30[23]XX",
    "range_start": "30200",
    "range_end": "30399",
    "partition": "INTERNAL"
},
{
    "description": "ANZ - Adelaide - 309XX",
    "range_start": "30900",
    "range_end": "30999",
    "partition": "INTERNAL"
},
{
    "description": "ANZ - Canberra - 33[1-3]XX",
    "range_start": "33100",
    "range_end": "33399",
    "partition": "INTERNAL"
}
]


The parameters for using AXL are also stored in JSON format:
[
{
    "fqdn": "cucm-emea-pub.somewhere.com",
    "username": "AppAdmin",
    "wsdl_file": "file://C://temp//AXLAPI.wsdl"
}
]


  • “fqdn” should be the FQDN or IP address of the target CUCM publisher.
  • “username” is an application or end user with the Standard AXL API Access role.
  • “wsdl_file” points to the location of the AXL schema, note the slightly different path syntax for Windows.

Source Code

#!/usr/bin/env python
# v1.4 - written by Chris Perkins in 2017 & 2018, excuse the spaghetti code it was my first Python program...
# Takes CUCM Route Plan Report exported as CSV or uses AXL, parses the regexs for the dial plan to find unused numbers in a given direct dial range
# Number range to match against is defined in JSON format in dialplan.json
# Won't parse dial plan entries with * or # as they're invalid for a direct dial range

# v1.4 - GUI adjustments & fixes some edge cases
# v1.3 – added AXL support
# v1.2 – bug fixes
# v1.1 – added GUI
# v1.0 – initial release with only CSV file support and CLI usage

# Original AXL SQL query code courtesy of Jonathan Els - https://afterthenumber.com/2018/04/27/serializing-thin-axl-sql-query-responses-with-python-zeep/

# To Do:
# Improve the GUI

import itertools, csv, sys, json
import tkinter as tk
import requests
from tkinter import ttk
from tkinter import filedialog, simpledialog, messagebox
from collections import OrderedDict
from zeep import Client
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.plugins import HistoryPlugin
from zeep.exceptions import Fault
from zeep.helpers import serialize_object
from requests import Session
from requests.auth import HTTPBasicAuth
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from lxml import etree

# Stores information about numbers in a range
class DirectoryNumbers:
    def __init__(self, start_num, end_num):
        """Constructor initialises attributes"""
        self.number = []
        self.is_used = []
        self.classification = []

        for num in range(int(start_num), int(end_num) + 1):
            num_str = str(num)
            # For numbers with preceeding 0, conversion to int will strip, so prepend with 0 to match length of source string
            if len(num_str) < len(end_num):
                pad_str = ""
                for x in range(0, len(end_num) - len(num_str)):
                    pad_str += "0"
                num_str = pad_str + num_str
            self.number.append(num_str)
            self.is_used.append(False)
            self.classification.append(0)

# GUI and main code
class DialPlanAnalyserFrame(tk.Frame):
    def __init__(self, parent):
        """Constructor checks parameters and initialise variables"""
        self.range_descriptions = []
        self.numbers = []
        self.input_filename = None
        self.use_axl = False
        self.axl_password = ""

        try:
            with open("dialplan.json") as f:
                self.json_data = json.load(f)
                for range_data in self.json_data:
                    try:
                        if len(range_data['range_start']) != len(range_data['range_end']):
                            tk.messagebox.showerror(title="Error", message="The first and last numbers in range must be of equal length.")
                            sys.exit()
                        elif int(range_data['range_start']) >= int(range_data['range_end']):
                            tk.messagebox.showerror(title="Error", message="The last number in range must be greater than the first.")
                            sys.exit()
                    except (TypeError, ValueError, KeyError):
                        tk.messagebox.showerror(title="Error", message="Number range parameters incorrectly formatted.")
                        sys.exit()
                    try:
                        if not range_data['description']:
                            tk.messagebox.showerror(title="Error", message="Description must be specified.")
                            sys.exit()
                        # Uncomment to disallow DNs not in a partition
                        #elif not range_data['partition']:
                        #    tk.messagebox.showerror(title="Error", message="Partition must be specified.")
                        #    sys.exit()
                        self.range_descriptions.append(range_data['description'])
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="Description must be specified.")
                        sys.exit()
        except FileNotFoundError:
            messagebox.showerror(title="Error", message="Unable to open JSON file.")
            sys.exit()
        except json.decoder.JSONDecodeError:
            messagebox.showerror(title="Error", message="Unable to parse JSON file.")
            sys.exit()

        self.range_descriptions = sorted(self.range_descriptions)
        for item in self.json_data:
            if item['description'].upper() == self.range_descriptions[0].upper():
                self.range_description = item['description']
                self.range_start = int(item['range_start'])
                self.range_end = int(item['range_end'])
                self.range_partition = item['partition']
                self.directory_numbers = DirectoryNumbers(item['range_start'], item['range_end'])
                break

        tk.Frame.__init__(self, parent)
        parent.geometry("320x480")
        self.pack(fill=tk.BOTH, expand=True)
        menu_bar = tk.Menu(self)
        file_menu = tk.Menu(menu_bar, tearoff=0)
        file_menu.add_command(label="Load AXL", command=self.open_json_file_dialog)
        file_menu.add_command(label="Load CSV", command=self.open_csv_file_dialog)
        file_menu.add_separator()
        file_menu.add_command(label="Exit", command=self.quit)
        menu_bar.add_cascade(label="File", menu=file_menu)
        parent.config(menu=menu_bar)
        tk.Label(self, text="DN Range:").place(relx=0.4, rely=0.0, height=22, width=62)
        self.range_combobox = ttk.Combobox(self, values=self.range_descriptions, state="readonly")
        self.range_combobox.current(0)
        self.range_combobox.bind("<<ComboboxSelected>>", self.combobox_update)
        self.range_combobox.place(relx=0.02, rely=0.042, relheight=0.06, relwidth=0.96)
        tk.Button(self, text="Find Unused DNs", command=self.find_unused_dns).place(relx=0.35, rely=0.12, height=22, width=100)
        self.unused_label_text = tk.StringVar()
        self.unused_label_text.set("Unused DNs: ")
        tk.Label(self, textvariable=self.unused_label_text).place(relx=0.35, rely=0.18, height=22, width=110)
        list_box_frame = tk.Frame(self, bd=2, relief=tk.SUNKEN)
        list_box_scrollbar_y = tk.Scrollbar(list_box_frame)
        list_box_scrollbar_x = tk.Scrollbar(list_box_frame, orient=tk.HORIZONTAL)
        self.list_box = tk.Listbox(list_box_frame, xscrollcommand=list_box_scrollbar_x.set, yscrollcommand=list_box_scrollbar_y.set)
        list_box_frame.place(relx=0.02, rely=0.22, relheight=0.73, relwidth=0.96)
        list_box_scrollbar_y.place(relx=0.94, rely=0.0, relheight=1.0, relwidth=0.06)
        list_box_scrollbar_x.place(relx=0.0, rely=0.94, relheight=0.06, relwidth=0.94)
        self.list_box.place(relx=0.0, rely=0.0, relheight=0.94, relwidth=0.94)
        list_box_scrollbar_y.config(command=self.list_box.yview)
        list_box_scrollbar_x.config(command=self.list_box.xview)
        self.entries_label_text = tk.StringVar()
        self.entries_label_text.set("Dial Plan Entries Parsed: ")
        tk.Label(self, textvariable=self.entries_label_text).place(relx=0.21, rely=0.95, height=22, width=220)

    def combinations(self, terms, accum):
        """Recursively parse a jagged list of digits to generate list of combination strings"""
        # combinations(digits, '') would populate numbers_in_use with combination strings
        last = (len(terms) == 1)
        n = len(terms[0])
        for i in range(n):
            item = accum + terms[0][i]
            if last:
                self.numbers_in_use.append(item)
            else:
                self.combinations(terms[1:], item)

    def parse_regex(self, pattern, range_start, range_end):
        """Parse CUCM regex pattern and return list of the digit strings the regex matches within the number range specified"""
        is_slice = False
        is_range = False
        is_negate = False
        num_digits = 0
        digits = []
        numbers_in_use = []

        # Parse regex and store digits in jagged list
        for column in range(16):
            digits.append([])
        for char in pattern:
            if char == '[':
                is_slice = True
            elif char == '^' and is_slice == True:
                is_negate = True
            elif char == ']':
                is_slice = False
                if is_negate == True:
                    negate_slice = []
                    for range_char in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']:
                        if range_char not in digits[num_digits]:
                            negate_slice.append(range_char)
                    digits[num_digits] = negate_slice[:]
                    is_negate = False
                num_digits += 1
            elif char in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']:
                if is_range == False:
                    digits[num_digits].append(char)
                    if is_slice == False:
                        num_digits += 1
                else:
                    for range_char in range(int(digits[num_digits][-1]) + 1, int(char) + 1):
                        digits[num_digits].append(str(range_char))
                    is_range = False
            elif char == '-' and is_slice == True:
                is_range = True
            elif char == 'X':
                digits[num_digits] = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
                num_digits += 1
            elif char == '*' or char == '#':
                # Strings containing * or # can't be parsed as an integer so return empty list as also not a valid PSTN number
                return []

        # Strip empty lists
        digits2 = [x for x in digits if x != []]

        # Use itertools.product() to convert jagged list of digits to list of combination strings >= range_start & <= range_end
        for list in itertools.product(*digits2):
            char_string = ''
            for char in list:
                char_string += str(char)
            if char_string != '':
                number = int(char_string)
                if number >= range_start and number <= range_end:
                    numbers_in_use.append(char_string)

        return numbers_in_use

    def element_list_to_ordered_dict(self, elements):
        """Convert list to OrderedDict"""
        return [OrderedDict((element.tag, element.text) for element in row) for row in elements]


    def sql_query(self, service, sql_statement):
        """Execute SQL query via AXL and return results"""
        try:
            axl_resp = service.executeSQLQuery(sql=sql_statement)
            try:
                return self.element_list_to_ordered_dict(serialize_object(axl_resp)["return"]["rows"])
            except KeyError:
                # Single tuple response
                return self.element_list_to_ordered_dict(serialize_object(axl_resp)["return"]["row"])
            except TypeError:
                # No SQL tuples
                return serialize_object(axl_resp)["return"]
        except requests.exceptions.ConnectionError as e:
            tk.messagebox.showerror(title="Error", message=str(e))
            return None

    def read_axl(self):
        """Read and parse Route Plan via AXL"""
        try:
            self.list_box.delete(0, tk.END)
            with open(self.input_filename) as f:
                axl_json_data = json.load(f)
                for axl_json in axl_json_data:
                    try:
                        if not axl_json['fqdn']:
                            tk.messagebox.showerror(title="Error", message="FQDN must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="FQDN must be specified.")
                        return
                    try:
                        if not axl_json['username']:
                            tk.messagebox.showerror(title="Error", message="Username must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="Username must be specified.")
                        return
                    try:
                        if not axl_json['wsdl_file']:
                            tk.messagebox.showerror(title="Error", message="WSDL file must be specified.")
                            return
                    except KeyError:
                        tk.messagebox.showerror(title="Error", message="WSDL file must be specified.")
                        return
        except FileNotFoundError:
            messagebox.showerror(title="Error", message="Unable to open JSON file.")
            return
        except json.decoder.JSONDecodeError:
            messagebox.showerror(title="Error", message="Unable to parse JSON file.")
            return

        sql_statement = "SELECT n.dnorpattern, p.name FROM numplan n LEFT JOIN routepartition p ON n.fkroutepartition=p.pkid"
        axl_binding_name = "{http://www.cisco.com/AXLAPIService/}AXLAPIBinding"
        axl_address = "https://{fqdn}:8443/axl/".format(fqdn=axl_json['fqdn'])
        session = Session()
        session.verify = False
        session.auth = HTTPBasicAuth(axl_json['username'], self.axl_password)
        transport = Transport(cache=SqliteCache(), session=session, timeout=60)
        history = HistoryPlugin()
        try:
            client = Client(wsdl=axl_json['wsdl_file'], transport=transport, plugins=[history])
        except FileNotFoundError as e:
            tk.messagebox.showerror(title="Error", message=str(e))
            return
        axl = client.create_service(axl_binding_name, axl_address)

        try:
            raw_route_plan = []
            for row in self.sql_query(service=axl, sql_statement=sql_statement):
                # Ignore entries not in the correct partition and update directory_numbers with numbers found to be in use
                if row['name'] is None:
                    pname = ""
                else:
                    pname = row['name']
                if pname.upper() == self.range_partition.upper():
                    for char_string in self.parse_regex(row['dnorpattern'], self.range_start, self.range_end):
                        raw_route_plan.append(char_string)
                        try:
                            dn_index = self.directory_numbers.number.index(char_string)
                            self.directory_numbers.is_used[dn_index] = True
                        except (IndexError, ValueError):
                            continue
        except TypeError:
            return
        except Fault as thin_axl_error:
            tk.messagebox.showerror(title="Error", message=thin_axl_error.message)
            return

        # Update TKinter display objects with results
        self.entries_label_text.set("Dial Plan Entries Parsed: " + str(len(raw_route_plan)))
        cntr = 0
        for num in range(0, len(self.directory_numbers.number)):
            if self.directory_numbers.is_used[num] == False:
                cntr += 1
                self.list_box.insert(tk.END, self.directory_numbers.number[num] + " / " + self.range_partition)
        self.unused_label_text.set("Unused DNs: " + str(cntr))

    def read_csv_file(self):
        """Read and parse Route Plan Report CSV file"""
        column_index = []

        try:
            self.list_box.delete(0, tk.END)
            # encoding='utf-8-sig' is necessary for correct parsing fo UTF-8 encoding of CUCM Route Plan Report CSV file
            with open(self.input_filename, encoding='utf-8-sig') as f:
                reader = csv.reader(f)
                header_row = next(reader)
                for index, column_header in enumerate(header_row):
                    if column_header == "Pattern or URI":
                        column_index.append(index)
                    elif column_header == "Pattern/Directory Number":
                        column_index.append(index)
                    elif column_header == "Partition":
                        column_index.append(index)
                if len(column_index) != 2:
                    tk.messagebox.showerror(title="Error", message="Unable to parse CSV file.")
                    return
                raw_route_plan = []
                for row in reader:
                    # Ignore entries not in the correct partition and update directory_numbers with numbers found to be in use
                    if row[column_index[1]].upper() == self.range_partition.upper():
                        for char_string in self.parse_regex(row[column_index[0]], self.range_start, self.range_end):
                            raw_route_plan.append(char_string)
                            try:
                                dn_index = self.directory_numbers.number.index(char_string)
                                self.directory_numbers.is_used[dn_index] = True
                            except (IndexError, ValueError):
                                pass
        except FileNotFoundError:
            tk.messagebox.showerror(title="Error", message="Unable to open CSV file.")
            return

        # Update TKinter display objects
        self.entries_label_text.set("Dial Plan Entries Parsed: " + str(len(raw_route_plan)))
        cntr = 0
        for num in range(0, len(self.directory_numbers.number)):
            if self.directory_numbers.is_used[num] == False:
                cntr += 1
                self.list_box.insert(tk.END, self.directory_numbers.number[num] + " / " + self.range_partition)
        self.unused_label_text.set("Unused DNs: " + str(cntr))

    def find_unused_dns(self):
        """Check AXL or CSV selected and hand over to correct method to handle"""
        if self.use_axl:
            if not self.input_filename:
                tk.messagebox.showerror(title="Error", message="No AXL file selected.")
                return
            else:
                self.read_axl()
        else:
            if not self.input_filename:
                tk.messagebox.showerror(title="Error", message="No CSV file selected.")
                return
            else:
                self.read_csv_file()

    def open_csv_file_dialog(self):
        """Dialogue to prompt for CSV file to open"""
        self.input_filename = tk.filedialog.askopenfilename(initialdir="/", filetypes=(("CSV files", "*.csv"),("All files", "*.*")))
        self.use_axl = False
        self.axl_password = ""

    def open_json_file_dialog(self):
        """Dialogue to prompt for JSON file to open and AXL password"""
        self.input_filename = tk.filedialog.askopenfilename(initialdir="/", filetypes=(("JSON files", "*.json"),("All files", "*.*")))
        self.use_axl = True
        self.axl_password = tk.simpledialog.askstring("Input", "AXL Password?", show='*')

    def combobox_update(self, event):
        """Populate range variables when Combobox item selected"""
        self.list_box.delete(0, tk.END)
        self.unused_label_text.set("Unused DNs: ")
        self.entries_label_text.set("Dial Plan Entries Parsed: ")
        value = self.range_combobox.get()
        for item in self.json_data:
            if item['description'].upper() == value.upper():
                self.range_description = item['description']
                self.range_start = int(item['range_start'])
                self.range_end = int(item['range_end'])
                self.range_partition = item['partition']
                self.directory_numbers = DirectoryNumbers(item['range_start'], item['range_end'])
                break

if __name__ == "__main__":
    disable_warnings(InsecureRequestWarning)
    # Initialise TKinter GUI objects
    root = tk.Tk()
    root.title("Dial Plan Analyser v1.4")
    DialPlanAnalyserFrame(root)
    root.mainloop()