348 lines
12 KiB
Python
Executable file
348 lines
12 KiB
Python
Executable file
#!/usr/bin/python2
|
|
# -*-coding:Utf-8 -*
|
|
|
|
|
|
# Flexisip, a flexible SIP proxy server with media capabilities.
|
|
# Copyright (C) 2010-2015 Belledonne Communications SARL.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import md5
|
|
import socket
|
|
import errno
|
|
import threading
|
|
import linphone
|
|
|
|
|
|
class CoreManager:
|
|
def __init__(self, user_id, proxy_uri):
|
|
config = linphone.LpConfig.new(None)
|
|
config.set_int("sip", "sip_port", -1)
|
|
config.set_int("sip", "sip_tcp_port", -1)
|
|
config.set_int("sip", "sip_tls_port", -1)
|
|
|
|
vtable = {
|
|
"call_state_changed": CoreManager._call_state_callback,
|
|
"call_stats_updated": CoreManager._call_stats_updated_callback
|
|
}
|
|
|
|
self.core = linphone.Core.new_with_config(vtable, config)
|
|
self.core.user_data = self
|
|
self.core.echo_cancellation_enabled = False
|
|
self.core.video_capture_enabled = False
|
|
self.core.video_display_enabled = False
|
|
self.use_files = True
|
|
self.play_file = "/usr/lib/python2.7/site-packages/linphone/share/sounds/linphone/hello8000.wav"
|
|
|
|
proxy_config = self.core.create_proxy_config()
|
|
proxy_config.identity = user_id
|
|
proxy_config.server_addr = proxy_uri
|
|
proxy_config.register_enabled = False
|
|
|
|
address = self.core.create_address(user_id)
|
|
|
|
auth_info = self.core.create_auth_info(address.username,
|
|
None,
|
|
address.password,
|
|
None,
|
|
None,
|
|
address.domain)
|
|
self.core.add_proxy_config(proxy_config)
|
|
self.core.default_proxy_config = proxy_config
|
|
self.core.add_auth_info(auth_info)
|
|
|
|
def register(self, timeout=5):
|
|
self.core.default_proxy_config.edit()
|
|
self.core.default_proxy_config.register_enabled = True
|
|
self.core.default_proxy_config.done()
|
|
success = self.wait_for_until(lambda m: m.core.default_proxy_config.state == linphone.RegistrationState.RegistrationOk, timeout)
|
|
if(not success):
|
|
raise CoreManager.RegistrationFailError(self)
|
|
|
|
def unregister(self, timeout=5):
|
|
self.core.default_proxy_config.edit()
|
|
self.core.default_proxy_config.register_enabled = False
|
|
self.core.default_proxy_config.done()
|
|
success = self.wait_for_until(lambda m: m.core.default_proxy_config.state == linphone.RegistrationState.RegistrationCleared, timeout)
|
|
if(not success):
|
|
raise CoreManager.UnregistrationFailError(self)
|
|
|
|
def wait_for_until(self, test_func, timeout):
|
|
start_time = time.time()
|
|
delta = 0
|
|
while delta < timeout and not test_func(self):
|
|
self.core.iterate()
|
|
time.sleep(0.1)
|
|
delta = time.time() - start_time
|
|
return delta < timeout
|
|
|
|
def _call_state_callback(lc, call, state, msg):
|
|
if state == linphone.CallState.CallIncomingReceived:
|
|
lc.accept_call(call)
|
|
|
|
_call_state_callback = staticmethod(_call_state_callback)
|
|
|
|
def _call_stats_updated_callback(lc, call, stats):
|
|
logging.info("Call stats: D={0}kbit/s U={1}kbit/s".format(stats.download_bandwidth, stats.upload_bandwidth))
|
|
|
|
_call_stats_updated_callback = staticmethod(_call_stats_updated_callback)
|
|
|
|
class RegistrationFailError(Exception):
|
|
def __init__(self, manager):
|
|
self.manager = manager
|
|
|
|
class UnregistrationFailError(Exception):
|
|
def __init__(self, manager):
|
|
self.manager = manager
|
|
|
|
|
|
class AbstractTest:
|
|
def __init__(self):
|
|
self.listeners = []
|
|
self.test_succeed = False
|
|
|
|
def run(self):
|
|
self.test_succeed = self._run()
|
|
for listener in self.listeners:
|
|
listener.notify(self)
|
|
|
|
def _run(self):
|
|
pass
|
|
|
|
|
|
class CallTest(AbstractTest):
|
|
def __init__(self, caller_manager, callee_uris, timeout=5):
|
|
AbstractTest.__init__(self)
|
|
self.caller = caller_manager
|
|
self.callee_uris = callee_uris
|
|
self.test_count = 0
|
|
self.timeout = timeout
|
|
|
|
def _run(self):
|
|
self.test_count += 1
|
|
try:
|
|
logging.info("Starting test #{0}".format(self.test_count))
|
|
for uri in self.callee_uris:
|
|
logging.info("Calling {0}".format(uri))
|
|
self.test_call(uri)
|
|
logging.info("Call has successfuly terminated")
|
|
return True
|
|
except CallTest.TestFailException as e:
|
|
logging.error(e)
|
|
return False
|
|
|
|
def test_call(self, callee_uri, timeout=5):
|
|
call = self.caller.core.invite(callee_uri)
|
|
if call is None:
|
|
raise CallTest.InviteFailedException(callee_uri)
|
|
|
|
result = self.caller.wait_for_until(
|
|
lambda m: (m.core.current_call is not None) and (m.core.current_call.state == linphone.CallState.CallStreamsRunning),
|
|
timeout)
|
|
|
|
if not result:
|
|
raise CallTest.CallStreamNotRunException(callee_uri)
|
|
|
|
result = self.caller.wait_for_until(
|
|
lambda m: m.core.current_call.audio_stats.download_bandwidth > 0 and
|
|
m.core.current_call.audio_stats.upload_bandwidth > 0,
|
|
timeout)
|
|
|
|
self.caller.core.terminate_call(call)
|
|
if not result:
|
|
raise CallTest.NoDataException(callee_uri)
|
|
|
|
class TestFailException(Exception):
|
|
def __init__(self, callee_uri):
|
|
self.uri = callee_uri
|
|
|
|
def __str__(self):
|
|
return "Call with {0} failed".format(self.uri)
|
|
|
|
class CallStreamNotRunException(TestFailException):
|
|
def __str__(self):
|
|
return CallTest.TestFailException.__str__(self) + ". Could not reach Call state CallStreamsRunning"
|
|
|
|
class NoDataException(TestFailException):
|
|
def __str__(self):
|
|
return CallTest.TestFailException.__str__(self) + ". No rtp packet received or send"
|
|
|
|
class InviteFailedException(TestFailException):
|
|
def __str__(self):
|
|
return CallTest.TestFailException.__str__(self) + "Could not send the INVITE request"
|
|
|
|
|
|
class TcpPortAction(threading.Thread):
|
|
def __init__(self, port):
|
|
threading.Thread.__init__(self)
|
|
self._port = port
|
|
self._socket = None
|
|
self._lock_socket = threading.RLock()
|
|
self.daemon = True
|
|
self.start()
|
|
|
|
def run(self):
|
|
while not True:
|
|
self._lock_socket.acquire()
|
|
if self._socket:
|
|
self._lock_socket.release()
|
|
conn = self._socket.accept()
|
|
else:
|
|
self._lock_socket.release()
|
|
time.sleep(0.1)
|
|
|
|
def notify(self, test):
|
|
if test.test_succeed:
|
|
self._open()
|
|
else:
|
|
self._close()
|
|
|
|
def _get_port(self):
|
|
return self._port
|
|
|
|
port = property(_get_port)
|
|
|
|
def _open(self):
|
|
if not self._socket:
|
|
self._lock_socket.acquire()
|
|
self._socket = socket.socket()
|
|
self._socket.bind(('0.0.0.0', self._port))
|
|
self._socket.listen(1)
|
|
self._lock_socket.release()
|
|
|
|
def _close(self):
|
|
if self._socket:
|
|
self._lock_socket.acquire()
|
|
self._socket.close()
|
|
self._socket = None
|
|
self._lock_socket.release()
|
|
|
|
|
|
def md5sum(string):
|
|
ctx = md5.new()
|
|
ctx.update(string)
|
|
return ctx.hexdigest()
|
|
|
|
|
|
def generate_username(prefix, host):
|
|
return "{0}-{1}".format(prefix, md5sum(host))
|
|
|
|
|
|
def generate_password(host, salt):
|
|
return md5sum(host + salt)
|
|
|
|
|
|
def generate_proxy_config(host, salt, prefix, domain, transport="udp"):
|
|
username = generate_username(prefix, host)
|
|
password = generate_password(host, salt)
|
|
uid = "sip:{0}:{1}@{2}".format(username, password, domain)
|
|
proxy = "sip:{0};transport={1}".format(host, transport)
|
|
return (uid, proxy)
|
|
|
|
|
|
def find_local_address(nodes):
|
|
s = socket.socket(socket.AF_INET)
|
|
for node in nodes:
|
|
try:
|
|
s.bind((node, 0))
|
|
return node
|
|
except socket.error as e:
|
|
if e.errno != errno.EADDRNOTAVAIL:
|
|
raise
|
|
return None
|
|
|
|
|
|
class CalleeThread(threading.Thread):
|
|
def __init__(self, core_manager):
|
|
threading.Thread.__init__(self)
|
|
self.core_manager = core_manager
|
|
self._lock = threading.RLock()
|
|
self._terminate=False
|
|
|
|
def terminate(self):
|
|
self._lock.acquire()
|
|
self._terminate=True
|
|
self._lock.release()
|
|
|
|
def run(self):
|
|
self._lock.acquire()
|
|
while not self._terminate:
|
|
self._lock.release()
|
|
self.core_manager.core.iterate()
|
|
time.sleep(0.1)
|
|
self._lock.acquire()
|
|
self._lock.release()
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="daemon for testing availability of each server of a Flexisip cluster")
|
|
parser.add_argument("domain", help="domain handle by the cluster")
|
|
parser.add_argument("salt", help="salt used to generate passwords")
|
|
parser.add_argument("nodes", nargs='+', help="list of nodes to test")
|
|
parser.add_argument("--interval", type=int, help="set time interval in seconds between successive tests", dest="test_interval", default=30)
|
|
parser.add_argument("--log", help="log file path", dest="log_file", default="./flexisip_monitor.log")
|
|
parser.add_argument("--port", "-p", help="port to switch off when test fails", dest="port", type=int, default=12345)
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=logging.INFO, filename=args.log_file)
|
|
logging.info("Starting Flexisip monitior")
|
|
|
|
local_ip = find_local_address(args.nodes)
|
|
if local_ip is None:
|
|
logging.fatal("No node address matches with any local addresse")
|
|
exit(1)
|
|
|
|
args.nodes.remove(local_ip)
|
|
|
|
caller_config = generate_proxy_config(local_ip, args.salt, "monitor-caller", args.domain, transport="tcp")
|
|
callee_config = generate_proxy_config(local_ip, args.salt, "monitor-callee", args.domain, transport="tcp")
|
|
|
|
callee_uris = []
|
|
for node in args.nodes:
|
|
username = generate_username("monitor-callee", node)
|
|
uri = "sip:{0}@{1}".format(username, args.domain)
|
|
callee_uris.append(uri)
|
|
|
|
caller = CoreManager(*caller_config)
|
|
callee = CoreManager(*callee_config)
|
|
|
|
try:
|
|
caller.register()
|
|
callee.register()
|
|
except CoreManager.RegistrationFailError as e:
|
|
proxy_config = e.manager.core.default_proxy_config
|
|
identity = proxy_config.identity
|
|
proxy = proxy_config.server_addr
|
|
logging.fatal("One UA could not register. identity={0}, proxy={1}".format(identity, proxy))
|
|
exit(1)
|
|
|
|
callee_thread = CalleeThread(callee)
|
|
callee_thread.start()
|
|
|
|
test_ = CallTest(caller, callee_uris)
|
|
action = TcpPortAction(args.port)
|
|
test_.listeners.append(action)
|
|
|
|
try:
|
|
while True:
|
|
test_.run()
|
|
logging.info("sleeping for {0} seconds".format(args.test_interval))
|
|
time.sleep(args.test_interval)
|
|
except KeyboardInterrupt:
|
|
logging.info("Stopping Flexisip monitor")
|
|
|
|
callee_thread.terminate()
|