Coverage for watcher/api/middleware/parsable_error.py: 80%

38 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-17 12:22 +0000

1# -*- encoding: utf-8 -*- 

2# 

3# Copyright © 2012 New Dream Network, LLC (DreamHost) 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16""" 

17Middleware to replace the plain text message body of an error 

18response with one formatted so the client can parse it. 

19 

20Based on pecan.middleware.errordocument 

21""" 

22 

23from xml import etree as et 

24 

25from oslo_log import log 

26from oslo_serialization import jsonutils 

27import webob 

28 

29from watcher._i18n import _ 

30 

31LOG = log.getLogger(__name__) 

32 

33 

34class ParsableErrorMiddleware(object): 

35 """Replace error body with something the client can parse.""" 

36 

37 def __init__(self, app): 

38 self.app = app 

39 

40 def __call__(self, environ, start_response): 

41 # Request for this state, modified by replace_start_response() 

42 # and used when an error is being reported. 

43 state = {} 

44 

45 def replacement_start_response(status, headers, exc_info=None): 

46 """Overrides the default response to make errors parsable.""" 

47 try: 

48 status_code = int(status.split(' ')[0]) 

49 state['status_code'] = status_code 

50 except (ValueError, TypeError): # pragma: nocover 

51 raise Exception(_( 

52 'ErrorDocumentMiddleware received an invalid ' 

53 'status %s') % status) 

54 else: 

55 if (state['status_code'] // 100) not in (2, 3): 

56 # Remove some headers so we can replace them later 

57 # when we have the full error message and can 

58 # compute the length. 

59 headers = [(h, v) 

60 for (h, v) in headers 

61 if h not in ('Content-Length', 'Content-Type')] 

62 # Save the headers in case we need to modify them. 

63 state['headers'] = headers 

64 return start_response(status, headers, exc_info) 

65 

66 app_iter = self.app(environ, replacement_start_response) 

67 if (state['status_code'] // 100) not in (2, 3): 

68 req = webob.Request(environ) 

69 if ( 69 ↛ 74line 69 didn't jump to line 74 because the condition on line 69 was never true

70 req.accept.best_match( 

71 ['application/json', 

72 'application/xml']) == 'application/xml' 

73 ): 

74 try: 

75 # simple check xml is valid 

76 body = [ 

77 et.ElementTree.tostring( 

78 et.ElementTree.Element( 

79 'error_message', text='\n'.join(app_iter)))] 

80 except et.ElementTree.ParseError as err: 

81 LOG.error('Error parsing HTTP response: %s', err) 

82 body = ['<error_message>%s' 

83 '</error_message>' % state['status_code']] 

84 state['headers'].append(('Content-Type', 'application/xml')) 

85 else: 

86 app_iter = [i.decode('utf-8') for i in app_iter] 

87 body = [jsonutils.dumps( 

88 {'error_message': '\n'.join(app_iter)})] 

89 body = [item.encode('utf-8') for item in body] 

90 state['headers'].append(('Content-Type', 'application/json')) 

91 state['headers'].append(('Content-Length', str(len(body[0])))) 

92 else: 

93 body = app_iter 

94 return body