Coverage for portality / lib / es_snapshot.py: 86%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1""" Library for managing ElasticSearch snapshots - ported from esprit and modified to use the Elasticsearch bindings 

2""" 

3 

4from datetime import datetime, timedelta 

5from elasticsearch import Elasticsearch, ElasticsearchException 

6 

7 

8class BadSnapshotMetaException(Exception): 

9 pass 

10 

11 

12class TodaySnapshotMissingException(Exception): 

13 pass 

14 

15 

16class FailedSnapshotException(Exception): 

17 pass 

18 

19 

20class SnapshotDeleteException(Exception): 

21 pass 

22 

23 

24class ESSnapshot(object): 

25 """ Representation of an ES Snapshot """ 

26 def __init__(self, snapshot_json: dict): 

27 self.data = snapshot_json 

28 self.name = snapshot_json['snapshot'] 

29 self.state = snapshot_json['state'] 

30 self.datetime = datetime.utcfromtimestamp(snapshot_json['start_time_in_millis'] / 1000) 

31 

32 def __str__(self): 

33 return str(self.__dict__) 

34 

35 def __repr__(self): 

36 return self.__str__() 

37 

38 def __eq__(self, other): 

39 return self.__dict__ == other.__dict__ 

40 

41 

42class ESSnapshotsClient(object): 

43 """ Client for performing operations on the ES Snapshots """ 

44 

45 def __init__(self, connection: Elasticsearch, snapshot_repository: str): 

46 """ 

47 Initialise the Client with a connection to ES 

48 :param connection: Elasticsearch connection object (elasticsearch.Elasticsearch) 

49 :param snapshot_repository: the S3 repo identifier defined in the snapshot settings 

50 """ 

51 self.conn = connection 

52 self.repo = snapshot_repository 

53 self.snapshots = [] 

54 

55 def request_snapshot(self, snapshot_name: str = None): 

56 """ 

57 Request the elasticsearch snapshot plugin to create a snapshot 

58 :param snapshot_name a string to name the snapshot. Defaults to UTC timestamp e.g. 2019-01-26_1602z 

59 :return: Tuple of the result as text & True / False for success / fail 

60 """ 

61 name = snapshot_name if snapshot_name is not None else datetime.strftime(datetime.utcnow(), "%Y-%m-%d_%H%Mz") 

62 try: 

63 resp = self.conn.snapshot.create(repository=self.repo, snapshot=name, master_timeout='600s') 

64 except ElasticsearchException as e: 

65 return str(e), False 

66 return resp, resp['accepted'] 

67 

68 def delete_snapshot(self, snapshot: ESSnapshot): 

69 """ 

70 Delete a snapshot from S3 storage 

71 :param snapshot: An ESSnapshot object 

72 :return: Tuple of the result as text & True / False for success / fail 

73 """ 

74 try: 

75 resp = self.conn.snapshot.delete(self.repo, snapshot.name, master_timeout='600s', request_timeout=90) 

76 except ElasticsearchException as e: 

77 return str(e), False 

78 return resp, resp['acknowledged'] 

79 

80 def list_snapshots(self): 

81 """ 

82 Return a list of all snapshots in the S3 repository 

83 :return: list of ESSnapshot objects, oldest to newest 

84 """ 

85 

86 # If the client doesn't have the snapshots, ask ES for them 

87 if not self.snapshots: 

88 resp = self.conn.snapshot.get(self.repo, '_all', master_timeout='600s', request_timeout=60) 

89 

90 if 'snapshots' in resp: 

91 try: 

92 snap_objs = [ESSnapshot(s) for s in resp['snapshots']] 

93 except Exception as e: 

94 raise BadSnapshotMetaException("Error creating snapshot object: ") from e 

95 

96 # Sort the snapshots old to new 

97 self.snapshots = sorted(snap_objs, key=lambda x: x.datetime) 

98 

99 return self.snapshots 

100 

101 def check_today_snapshot(self): 

102 """ Check we have a successful snapshot for today """ 

103 snapshots = self.list_snapshots() 

104 if snapshots[-1].datetime.date() != datetime.utcnow().date(): 

105 raise TodaySnapshotMissingException('Snapshot appears to be missing for {}'.format(datetime.utcnow().date())) 

106 elif snapshots[-1].state != 'SUCCESS': 

107 raise FailedSnapshotException('Snapshot for {} has failed'.format(datetime.utcnow().date())) 

108 

109 def prune_snapshots(self, ttl_days: int, delete_callback=None): 

110 """ 

111 Delete all snapshots outwith our TTL (Time To Live) period based on today's date. 

112 :param ttl_days: integer number of days a snapshot should be retained 

113 :param delete_callback: callback to run after the delete has occurred, should accept an ESSnapshot and 

114 boolean success / fail: f(snapshot, succeeded) 

115 :return: nothing, but throws SnapshotDeleteException if not all were successful. 

116 """ 

117 snapshots = self.list_snapshots() 

118 

119 # Keep a list of boolean success / failures of our deletes 

120 results = [] 

121 for snapshot in snapshots: 

122 if snapshot.datetime < datetime.utcnow() - timedelta(days=ttl_days): 

123 _, status = self.delete_snapshot(snapshot) 

124 

125 # Log a success if we get a 2xx response 

126 results.append(status) 

127 

128 # Run the callback if there is one 

129 if delete_callback: 

130 delete_callback(snapshot, status, results[-1]) 

131 

132 # Our snapshots list is outdated, invalidate it 

133 self.snapshots = [] 

134 

135 print("snapshots prune results: {}".format(results)) 

136 if not all(results): 

137 raise SnapshotDeleteException('Not all snapshots were deleted successfully.')