Coverage for portality / lib / es_snapshot.py: 86%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1""" Library for managing ElasticSearch snapshots - ported from esprit and modified to use the Elasticsearch bindings
2"""
4from datetime import datetime, timedelta
5from elasticsearch import Elasticsearch, ElasticsearchException
8class BadSnapshotMetaException(Exception):
9 pass
12class TodaySnapshotMissingException(Exception):
13 pass
16class FailedSnapshotException(Exception):
17 pass
20class SnapshotDeleteException(Exception):
21 pass
24class ESSnapshot(object):
25 """ Representation of an ES Snapshot """
26 def __init__(self, snapshot_json: dict):
27 self.data = snapshot_json
28 self.name = snapshot_json['snapshot']
29 self.state = snapshot_json['state']
30 self.datetime = datetime.utcfromtimestamp(snapshot_json['start_time_in_millis'] / 1000)
32 def __str__(self):
33 return str(self.__dict__)
35 def __repr__(self):
36 return self.__str__()
38 def __eq__(self, other):
39 return self.__dict__ == other.__dict__
42class ESSnapshotsClient(object):
43 """ Client for performing operations on the ES Snapshots """
45 def __init__(self, connection: Elasticsearch, snapshot_repository: str):
46 """
47 Initialise the Client with a connection to ES
48 :param connection: Elasticsearch connection object (elasticsearch.Elasticsearch)
49 :param snapshot_repository: the S3 repo identifier defined in the snapshot settings
50 """
51 self.conn = connection
52 self.repo = snapshot_repository
53 self.snapshots = []
55 def request_snapshot(self, snapshot_name: str = None):
56 """
57 Request the elasticsearch snapshot plugin to create a snapshot
58 :param snapshot_name a string to name the snapshot. Defaults to UTC timestamp e.g. 2019-01-26_1602z
59 :return: Tuple of the result as text & True / False for success / fail
60 """
61 name = snapshot_name if snapshot_name is not None else datetime.strftime(datetime.utcnow(), "%Y-%m-%d_%H%Mz")
62 try:
63 resp = self.conn.snapshot.create(repository=self.repo, snapshot=name, master_timeout='600s')
64 except ElasticsearchException as e:
65 return str(e), False
66 return resp, resp['accepted']
68 def delete_snapshot(self, snapshot: ESSnapshot):
69 """
70 Delete a snapshot from S3 storage
71 :param snapshot: An ESSnapshot object
72 :return: Tuple of the result as text & True / False for success / fail
73 """
74 try:
75 resp = self.conn.snapshot.delete(self.repo, snapshot.name, master_timeout='600s', request_timeout=90)
76 except ElasticsearchException as e:
77 return str(e), False
78 return resp, resp['acknowledged']
80 def list_snapshots(self):
81 """
82 Return a list of all snapshots in the S3 repository
83 :return: list of ESSnapshot objects, oldest to newest
84 """
86 # If the client doesn't have the snapshots, ask ES for them
87 if not self.snapshots:
88 resp = self.conn.snapshot.get(self.repo, '_all', master_timeout='600s', request_timeout=60)
90 if 'snapshots' in resp:
91 try:
92 snap_objs = [ESSnapshot(s) for s in resp['snapshots']]
93 except Exception as e:
94 raise BadSnapshotMetaException("Error creating snapshot object: ") from e
96 # Sort the snapshots old to new
97 self.snapshots = sorted(snap_objs, key=lambda x: x.datetime)
99 return self.snapshots
101 def check_today_snapshot(self):
102 """ Check we have a successful snapshot for today """
103 snapshots = self.list_snapshots()
104 if snapshots[-1].datetime.date() != datetime.utcnow().date():
105 raise TodaySnapshotMissingException('Snapshot appears to be missing for {}'.format(datetime.utcnow().date()))
106 elif snapshots[-1].state != 'SUCCESS':
107 raise FailedSnapshotException('Snapshot for {} has failed'.format(datetime.utcnow().date()))
109 def prune_snapshots(self, ttl_days: int, delete_callback=None):
110 """
111 Delete all snapshots outwith our TTL (Time To Live) period based on today's date.
112 :param ttl_days: integer number of days a snapshot should be retained
113 :param delete_callback: callback to run after the delete has occurred, should accept an ESSnapshot and
114 boolean success / fail: f(snapshot, succeeded)
115 :return: nothing, but throws SnapshotDeleteException if not all were successful.
116 """
117 snapshots = self.list_snapshots()
119 # Keep a list of boolean success / failures of our deletes
120 results = []
121 for snapshot in snapshots:
122 if snapshot.datetime < datetime.utcnow() - timedelta(days=ttl_days):
123 _, status = self.delete_snapshot(snapshot)
125 # Log a success if we get a 2xx response
126 results.append(status)
128 # Run the callback if there is one
129 if delete_callback:
130 delete_callback(snapshot, status, results[-1])
132 # Our snapshots list is outdated, invalidate it
133 self.snapshots = []
135 print("snapshots prune results: {}".format(results))
136 if not all(results):
137 raise SnapshotDeleteException('Not all snapshots were deleted successfully.')