Coverage for portality / bll / services / shorturl.py: 95%
61 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import random
2import string
3from typing import Optional
4from urllib.parse import urlparse
6from portality import models
7from portality.core import app
8from portality.models.shortened_url import AliasQuery, UrlQuery, CountWithinDaysQuery
10class UrlShortenerLimitExceeded(Exception):
11 pass
13class InvalidURL(Exception):
14 pass
17class ShortUrlService:
18 ALIAS_CHARS = string.ascii_letters + string.digits
20 def check_url_count(self, days=7, limit=100_000):
21 n_created = models.ShortenedUrl.hit_count(CountWithinDaysQuery(
22 app.config.get("URLSHORT_LIMIT_WITHIN_DAYS", days)
23 ).query())
24 n_created_limit = app.config.get("URLSHORT_LIMIT", limit)
26 if n_created >= n_created_limit:
27 msg = f"Url shortener limit reached: [{n_created=}] >= [{n_created_limit=}]"
28 app.logger.warning(msg)
29 raise UrlShortenerLimitExceeded(msg)
31 def validate_url(self, url):
32 # validate url
33 hostname = urlparse(url).hostname
34 if not any((hostname == d or hostname.endswith(f".{d}"))
35 for d in app.config.get("URLSHORT_ALLOWED_SUPERDOMAINS", [])):
36 msg = f"Invalid url shorten request: {url}"
37 app.logger.warning(msg)
38 raise InvalidURL(msg)
40 def get_short_url(self, url: str, throttled=True, validate=True) -> models.ShortenedUrl:
41 """
42 create or find a shorted url from the given url
44 Parameters
45 ----------
46 url
48 Returns
49 -------
50 shortened URL
51 """
52 if throttled:
53 self.check_url_count()
55 if validate:
56 self.validate_url(url)
58 shortened_url = self.find_shortened_url(url)
59 if shortened_url:
60 return shortened_url
62 alias = self.create_new_alias()
63 record = models.ShortenedUrl(url=url, alias=alias)
64 record.save()
65 return record
67 def create_new_alias(self, n_retry=5) -> str:
68 alias_len = app.config.get("URLSHORT_ALIAS_LENGTH", 6)
69 for _ in range(n_retry):
70 alias = ''.join(random.sample(self.ALIAS_CHARS, alias_len))
71 cnt = models.ShortenedUrl.hit_count(UrlQuery(alias).query())
72 if cnt == 0:
73 return alias
75 raise ValueError('Could not create a unique alias')
77 def find_shortened_url(self, url: str) -> Optional[models.ShortenedUrl]:
78 """ find the shorted url from the given url """
80 aliases = models.ShortenedUrl.q2obj(q=AliasQuery(url).query())
82 if len(aliases) == 0:
83 return None
85 if len(aliases) > 1:
86 app.logger.warning(f'More than one alias found for url[{url}] n[{len(aliases)}]')
88 return aliases[0]
90 def find_url_by_alias(self, alias: str) -> Optional[str]:
91 """ find the original url from the given alias """
93 urls = models.ShortenedUrl.q2obj(q=UrlQuery(alias).query())
94 n_url = len(urls)
95 if n_url == 0:
96 return None
97 if n_url > 1:
98 app.logger.warning(f'More than one URL found for alias[{alias}] n[{n_url}]')
100 return urls[0]