Coverage for portality / bll / services / shorturl.py: 95%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import random 

2import string 

3from typing import Optional 

4from urllib.parse import urlparse 

5 

6from portality import models 

7from portality.core import app 

8from portality.models.shortened_url import AliasQuery, UrlQuery, CountWithinDaysQuery 

9 

10class UrlShortenerLimitExceeded(Exception): 

11 pass 

12 

13class InvalidURL(Exception): 

14 pass 

15 

16 

17class ShortUrlService: 

18 ALIAS_CHARS = string.ascii_letters + string.digits 

19 

20 def check_url_count(self, days=7, limit=100_000): 

21 n_created = models.ShortenedUrl.hit_count(CountWithinDaysQuery( 

22 app.config.get("URLSHORT_LIMIT_WITHIN_DAYS", days) 

23 ).query()) 

24 n_created_limit = app.config.get("URLSHORT_LIMIT", limit) 

25 

26 if n_created >= n_created_limit: 

27 msg = f"Url shortener limit reached: [{n_created=}] >= [{n_created_limit=}]" 

28 app.logger.warning(msg) 

29 raise UrlShortenerLimitExceeded(msg) 

30 

31 def validate_url(self, url): 

32 # validate url 

33 hostname = urlparse(url).hostname 

34 if not any((hostname == d or hostname.endswith(f".{d}")) 

35 for d in app.config.get("URLSHORT_ALLOWED_SUPERDOMAINS", [])): 

36 msg = f"Invalid url shorten request: {url}" 

37 app.logger.warning(msg) 

38 raise InvalidURL(msg) 

39 

40 def get_short_url(self, url: str, throttled=True, validate=True) -> models.ShortenedUrl: 

41 """ 

42 create or find a shorted url from the given url 

43 

44 Parameters 

45 ---------- 

46 url 

47 

48 Returns 

49 ------- 

50 shortened URL 

51 """ 

52 if throttled: 

53 self.check_url_count() 

54 

55 if validate: 

56 self.validate_url(url) 

57 

58 shortened_url = self.find_shortened_url(url) 

59 if shortened_url: 

60 return shortened_url 

61 

62 alias = self.create_new_alias() 

63 record = models.ShortenedUrl(url=url, alias=alias) 

64 record.save() 

65 return record 

66 

67 def create_new_alias(self, n_retry=5) -> str: 

68 alias_len = app.config.get("URLSHORT_ALIAS_LENGTH", 6) 

69 for _ in range(n_retry): 

70 alias = ''.join(random.sample(self.ALIAS_CHARS, alias_len)) 

71 cnt = models.ShortenedUrl.hit_count(UrlQuery(alias).query()) 

72 if cnt == 0: 

73 return alias 

74 

75 raise ValueError('Could not create a unique alias') 

76 

77 def find_shortened_url(self, url: str) -> Optional[models.ShortenedUrl]: 

78 """ find the shorted url from the given url """ 

79 

80 aliases = models.ShortenedUrl.q2obj(q=AliasQuery(url).query()) 

81 

82 if len(aliases) == 0: 

83 return None 

84 

85 if len(aliases) > 1: 

86 app.logger.warning(f'More than one alias found for url[{url}] n[{len(aliases)}]') 

87 

88 return aliases[0] 

89 

90 def find_url_by_alias(self, alias: str) -> Optional[str]: 

91 """ find the original url from the given alias """ 

92 

93 urls = models.ShortenedUrl.q2obj(q=UrlQuery(alias).query()) 

94 n_url = len(urls) 

95 if n_url == 0: 

96 return None 

97 if n_url > 1: 

98 app.logger.warning(f'More than one URL found for alias[{alias}] n[{n_url}]') 

99 

100 return urls[0]