seafdav/tests/test_lock_manager.py
Martin Wendt bd11268a1e 2023
2023-02-18 16:38:22 +01:00

439 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# (c) 2009-2023 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""Unit test for lock_manager.py"""
import os
import sys
import unittest
from tempfile import gettempdir
from time import sleep
from wsgidav.dav_error import DAVError
from wsgidav.lock_man import lock_manager, lock_storage
try:
from wsgidav.lock_man.lock_storage_redis import LockStorageRedis
except ImportError:
LockStorageRedis = None
# ========================================================================
# BasicTest
# ========================================================================
class BasicTest(unittest.TestCase):
"""Test lock_manager.LockManager()."""
principal = "Joe Tester"
owner = b"joe.tester@example.com"
root = "/dav/res"
timeout = 10 * 60 # Default lock timeout 10 minutes
# @classmethod
# def suite(cls):
# """Return test case suite (so we can control the order)."""
# suite = TestSuite()
# suite.addTest(cls("testPreconditions"))
# # suite.addTest(cls("testOpen"))
# suite.addTest(cls("testValidation"))
# suite.addTest(cls("testLock"))
# suite.addTest(cls("testTimeout"))
# suite.addTest(cls("testConflict"))
# return suite
def setUp(self):
storage = lock_storage.LockStorageDict()
self.lm = lock_manager.LockManager(storage)
self.lm._verbose = 1
def tearDown(self):
del self.lm
def _acquire(
self,
url,
lock_type,
lock_scope,
lock_depth,
lock_owner,
timeout,
principal,
token_list,
):
"""Wrapper for lm.acquire, that returns None instead of raising DAVError."""
try:
return self.lm.acquire(
url=url,
lock_type=lock_type,
lock_scope=lock_scope,
lock_depth=lock_depth,
lock_owner=lock_owner,
timeout=timeout,
principal=principal,
token_list=token_list,
)
except DAVError:
return None
def _isLockDict(self, o):
try:
_ = o["root"] # noqa F841
except Exception:
return False
return True
def _isLockResultOK(self, resultTupleList):
"""Return True, if result is [ (lock_dict, None) ]."""
try:
return (
len(resultTupleList) == 1
and len(resultTupleList) == 2
and self._isLockDict(resultTupleList[0][0])
and resultTupleList[0][1] is None
)
except Exception:
return False
def _isLockResultFault(self, lock, conflictList, status=None):
"""Return True, if it is a valid result tuple containing a DAVError."""
try:
if lock is not None:
return False
if len(conflictList) < 1:
return False
resultTuple = conflictList[0]
if (
len(resultTuple) != 2
or not self._isLockDict(resultTuple[0])
or not isinstance(resultTuple[1], DAVError)
):
return False
elif status and status != DAVError.value:
return False
return True
except Exception:
return False
def testPreconditions(self):
"""Environment must be set."""
self.assertTrue(
__debug__, "__debug__ must be True, otherwise asserts are ignored"
)
# def testOpen(self):
# """Lock manager should be lazy opening on first access."""
# lm = self.lm
# # assert not lm._loaded, "LM must only be opened after first access"
# lm._generate_lock(self.principal, "write", "exclusive", "infinity",
# self.owner,
# "/dav",
# 10)
# assert lm._loaded, "LM must be opened after first access"
def testValidation(self):
"""Lock manager should raise errors on bad args."""
lm = self.lm
self.assertRaises(
AssertionError,
lm._generate_lock,
lm,
"writeX",
"exclusive",
"infinity",
self.owner,
self.root,
self.timeout,
)
self.assertRaises(
AssertionError,
lm._generate_lock,
lm,
"write",
"exclusiveX",
"infinity",
self.owner,
self.root,
self.timeout,
)
self.assertRaises(
AssertionError,
lm._generate_lock,
lm,
"write",
"exclusive",
"infinityX",
self.owner,
self.root,
self.timeout,
)
self.assertRaises(
AssertionError,
lm._generate_lock,
lm,
"write",
"exclusive",
"infinity",
None,
self.root,
self.timeout,
)
self.assertRaises(
AssertionError,
lm._generate_lock,
lm,
"write",
"exclusive",
"infinity",
self.owner,
None,
self.timeout,
)
# assert lm._dict is None, "No locks should have been created by this test"
def testLock(self):
"""Lock manager should create and find locks."""
lm = self.lm
url = "/dav/res"
# Create a new lock
lock_dict = lm._generate_lock(
self.principal,
"write",
"exclusive",
"infinity",
self.owner,
url,
self.timeout,
)
# Check returned dictionary
assert lock_dict is not None
assert lock_dict["root"] == url
assert lock_dict["type"] == "write"
assert lock_dict["scope"] == "exclusive"
assert lock_dict["depth"] == "infinity"
assert lock_dict["owner"] == self.owner
assert lock_dict["principal"] == self.principal
# Test lookup
tok = lock_dict.get("token")
assert lm.get_lock(tok, key="root") == url
lock_dict = lm.get_lock(tok)
assert lock_dict is not None
assert lock_dict["root"] == url
assert lock_dict["type"] == "write"
assert lock_dict["scope"] == "exclusive"
assert lock_dict["depth"] == "infinity"
assert lock_dict["owner"] == self.owner
assert lock_dict["principal"] == self.principal
# We locked "/dav/res", did we?
assert lm.is_token_locked_by_user(tok, self.principal)
# res = lm.get_url_lock_list(url, self.principal)
res = lm.get_url_lock_list(url)
self.assertEqual(len(res), 1)
# res = lm.get_url_lock_list(url, "another user")
# assert len(res) == 0
assert lm.is_url_locked_by_token(
"/dav/res", tok
), "url not directly locked by lock_token."
assert lm.is_url_locked_by_token(
"/dav/res/", tok
), "url not directly locked by lock_token."
assert lm.is_url_locked_by_token(
"/dav/res/sub", tok
), "child url not indirectly locked"
assert not lm.is_url_locked_by_token(
"/dav/ressub", tok
), "non-child url reported as locked"
assert not lm.is_url_locked_by_token(
"/dav", tok
), "parent url reported as locked"
assert not lm.is_url_locked_by_token(
"/dav/", tok
), "parent url reported as locked"
def testTimeout(self):
"""Locks should be purged after expiration date."""
lm = self.lm
timeout = 1
lock_dict = lm._generate_lock(
self.principal,
"write",
"exclusive",
"infinity",
self.owner,
self.root,
timeout,
)
assert lock_dict is not None
tok = lock_dict.get("token")
assert lm.get_lock(tok, key="root") == self.root
sleep(timeout - 0.5)
lock_dict = lm.get_lock(tok)
assert lock_dict is not None, "Lock expired too early"
sleep(1)
lock_dict = lm.get_lock(tok)
assert lock_dict is None, "Lock has not expired"
def testConflict(self):
"""Locks should prevent conflicts."""
token_list = []
# Create a lock for '/dav/res/'
lock = self._acquire(
"/dav/res/",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
self.principal,
token_list,
)
assert lock, "Could not acquire lock"
# Try to lock with a slightly different URL (without trailing '/')
lock = self._acquire(
"/dav/res",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
"another principal",
token_list,
)
assert lock is None, "Could acquire a conflicting lock"
# Try to lock with another principal
lock = self._acquire(
"/dav/res/",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
"another principal",
token_list,
)
assert lock is None, "Could acquire a conflicting lock"
# Try to lock child with another principal
lock = self._acquire(
"/dav/res/sub",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
"another principal",
token_list,
)
assert lock is None, "Could acquire a conflicting child lock"
# Try to lock parent with same principal
lock = self._acquire(
"/dav/",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
self.principal,
token_list,
)
assert lock is None, "Could acquire a conflicting parent lock"
# Try to lock child with same principal
lock = self._acquire(
"/dav/res/sub",
"write",
"exclusive",
"infinity",
self.owner,
self.timeout,
self.principal,
token_list,
)
assert lock is None, "Could acquire a conflicting child lock (same principal)"
# ========================================================================
# ShelveTest
# ========================================================================
class ShelveTest(BasicTest):
"""Test lock_manager.ShelveLockManager()."""
def setUp(self):
if sys.version_info < (3, 0):
modifier = "-py2" # shelve formats are incompatible
else:
modifier = "-py3"
self.path = os.path.join(
gettempdir(), "wsgidav-locks{}.shelve".format(modifier)
)
storage = lock_storage.LockStorageShelve(self.path)
self.lm = lock_manager.LockManager(storage)
self.lm._verbose = 2
def tearDown(self):
self.lm.storage.clear()
self.lm = None
# Note: os.remove(self.path) does not work, because Shelve may append
# a file extension.
# if os.path.exists(self.path):
# os.remove(self.path)
class RedisTest(BasicTest):
_redis_connect_failed = None
def setUp(self):
if LockStorageRedis is None:
raise unittest.SkipTest("Test redis installed")
if RedisTest._redis_connect_failed:
raise unittest.SkipTest("Test requires a running redis instance (again)")
try:
import redis
r = redis.Redis()
r.ping()
except redis.exceptions.ConnectionError:
RedisTest._redis_connect_failed = True
raise unittest.SkipTest("Test requires a running redis instance")
storage = LockStorageRedis()
self.lm = lock_manager.LockManager(storage)
self.lm._verbose = 2
def tearDown(self):
self.lm.storage.clear()
self.lm = None
# ========================================================================
# suite
# ========================================================================
# def suite():
# """Return suites of all test cases."""
# return TestSuite([BasicTest.suite(),
# ShelveTest.suite(),
# ])
if __name__ == "__main__":
unittest.main()
# suite = suite()
# TextTestRunner(descriptions=0, verbosity=2).run(suite)