comparison mercurial/keepalive.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents 44d752efdbce
children 687b865b95ad
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
96 node, 96 node,
97 pycompat, 97 pycompat,
98 urllibcompat, 98 urllibcompat,
99 util, 99 util,
100 ) 100 )
101 from .utils import ( 101 from .utils import procutil
102 procutil,
103 )
104 102
105 httplib = util.httplib 103 httplib = util.httplib
106 urlerr = util.urlerr 104 urlerr = util.urlerr
107 urlreq = util.urlreq 105 urlreq = util.urlreq
108 106
109 DEBUG = None 107 DEBUG = None
108
110 109
111 class ConnectionManager(object): 110 class ConnectionManager(object):
112 """ 111 """
113 The connection manager must be able to: 112 The connection manager must be able to:
114 * keep track of all existing 113 * keep track of all existing
115 """ 114 """
115
116 def __init__(self): 116 def __init__(self):
117 self._lock = threading.Lock() 117 self._lock = threading.Lock()
118 self._hostmap = collections.defaultdict(list) # host -> [connection] 118 self._hostmap = collections.defaultdict(list) # host -> [connection]
119 self._connmap = {} # map connections to host 119 self._connmap = {} # map connections to host
120 self._readymap = {} # map connection to ready state 120 self._readymap = {} # map connection to ready state
121 121
122 def add(self, host, connection, ready): 122 def add(self, host, connection, ready):
123 self._lock.acquire() 123 self._lock.acquire()
124 try: 124 try:
125 self._hostmap[host].append(connection) 125 self._hostmap[host].append(connection)
167 if host: 167 if host:
168 return list(self._hostmap[host]) 168 return list(self._hostmap[host])
169 else: 169 else:
170 return dict(self._hostmap) 170 return dict(self._hostmap)
171 171
172
172 class KeepAliveHandler(object): 173 class KeepAliveHandler(object):
173 def __init__(self, timeout=None): 174 def __init__(self, timeout=None):
174 self._cm = ConnectionManager() 175 self._cm = ConnectionManager()
175 self._timeout = timeout 176 self._timeout = timeout
176 self.requestscount = 0 177 self.requestscount = 0
233 h = self._cm.get_ready_conn(host) 234 h = self._cm.get_ready_conn(host)
234 else: 235 else:
235 # no (working) free connections were found. Create a new one. 236 # no (working) free connections were found. Create a new one.
236 h = http_class(host, timeout=self._timeout) 237 h = http_class(host, timeout=self._timeout)
237 if DEBUG: 238 if DEBUG:
238 DEBUG.info("creating new connection to %s (%d)", 239 DEBUG.info(
239 host, id(h)) 240 "creating new connection to %s (%d)", host, id(h)
241 )
240 self._cm.add(host, h, False) 242 self._cm.add(host, h, False)
241 self._start_transaction(h, req) 243 self._start_transaction(h, req)
242 r = h.getresponse() 244 r = h.getresponse()
243 # The string form of BadStatusLine is the status line. Add some context 245 # The string form of BadStatusLine is the status line. Add some context
244 # to make the error message slightly more useful. 246 # to make the error message slightly more useful.
245 except httplib.BadStatusLine as err: 247 except httplib.BadStatusLine as err:
246 raise urlerr.urlerror( 248 raise urlerr.urlerror(
247 _('bad HTTP status line: %s') % pycompat.sysbytes(err.line)) 249 _('bad HTTP status line: %s') % pycompat.sysbytes(err.line)
250 )
248 except (socket.error, httplib.HTTPException) as err: 251 except (socket.error, httplib.HTTPException) as err:
249 raise urlerr.urlerror(err) 252 raise urlerr.urlerror(err)
250 253
251 # If not a persistent connection, don't try to reuse it. Look 254 # If not a persistent connection, don't try to reuse it. Look
252 # for this using getattr() since vcr doesn't define this 255 # for this using getattr() since vcr doesn't define this
278 r = h.getresponse() 281 r = h.getresponse()
279 # note: just because we got something back doesn't mean it 282 # note: just because we got something back doesn't mean it
280 # worked. We'll check the version below, too. 283 # worked. We'll check the version below, too.
281 except (socket.error, httplib.HTTPException): 284 except (socket.error, httplib.HTTPException):
282 r = None 285 r = None
283 except: # re-raises 286 except: # re-raises
284 # adding this block just in case we've missed 287 # adding this block just in case we've missed
285 # something we will still raise the exception, but 288 # something we will still raise the exception, but
286 # lets try and close the connection and remove it 289 # lets try and close the connection and remove it
287 # first. We previously got into a nasty loop 290 # first. We previously got into a nasty loop
288 # where an exception was uncaught, and so the 291 # where an exception was uncaught, and so the
289 # connection stayed open. On the next try, the 292 # connection stayed open. On the next try, the
290 # same exception was raised, etc. The trade-off is 293 # same exception was raised, etc. The trade-off is
291 # that it's now possible this call will raise 294 # that it's now possible this call will raise
292 # a DIFFERENT exception 295 # a DIFFERENT exception
293 if DEBUG: 296 if DEBUG:
294 DEBUG.error("unexpected exception - closing " 297 DEBUG.error(
295 "connection to %s (%d)", host, id(h)) 298 "unexpected exception - closing " "connection to %s (%d)",
299 host,
300 id(h),
301 )
296 self._cm.remove(h) 302 self._cm.remove(h)
297 h.close() 303 h.close()
298 raise 304 raise
299 305
300 if r is None or r.version == 9: 306 if r is None or r.version == 9:
301 # httplib falls back to assuming HTTP 0.9 if it gets a 307 # httplib falls back to assuming HTTP 0.9 if it gets a
302 # bad header back. This is most likely to happen if 308 # bad header back. This is most likely to happen if
303 # the socket has been closed by the server since we 309 # the socket has been closed by the server since we
304 # last used the connection. 310 # last used the connection.
305 if DEBUG: 311 if DEBUG:
306 DEBUG.info("failed to re-use connection to %s (%d)", 312 DEBUG.info(
307 host, id(h)) 313 "failed to re-use connection to %s (%d)", host, id(h)
314 )
308 r = None 315 r = None
309 else: 316 else:
310 if DEBUG: 317 if DEBUG:
311 DEBUG.info("re-using connection to %s (%d)", host, id(h)) 318 DEBUG.info("re-using connection to %s (%d)", host, id(h))
312 319
328 skipheaders[r'skip_' + n.replace(r'-', r'_')] = 1 335 skipheaders[r'skip_' + n.replace(r'-', r'_')] = 1
329 try: 336 try:
330 if urllibcompat.hasdata(req): 337 if urllibcompat.hasdata(req):
331 data = urllibcompat.getdata(req) 338 data = urllibcompat.getdata(req)
332 h.putrequest( 339 h.putrequest(
333 req.get_method(), urllibcompat.getselector(req), 340 req.get_method(),
334 **skipheaders) 341 urllibcompat.getselector(req),
342 **skipheaders
343 )
335 if r'content-type' not in headers: 344 if r'content-type' not in headers:
336 h.putheader(r'Content-type', 345 h.putheader(
337 r'application/x-www-form-urlencoded') 346 r'Content-type', r'application/x-www-form-urlencoded'
347 )
338 if r'content-length' not in headers: 348 if r'content-length' not in headers:
339 h.putheader(r'Content-length', r'%d' % len(data)) 349 h.putheader(r'Content-length', r'%d' % len(data))
340 else: 350 else:
341 h.putrequest( 351 h.putrequest(
342 req.get_method(), urllibcompat.getselector(req), 352 req.get_method(),
343 **skipheaders) 353 urllibcompat.getselector(req),
354 **skipheaders
355 )
344 except socket.error as err: 356 except socket.error as err:
345 raise urlerr.urlerror(err) 357 raise urlerr.urlerror(err)
346 for k, v in headers.items(): 358 for k, v in headers.items():
347 h.putheader(k, v) 359 h.putheader(k, v)
348 h.endheaders() 360 h.endheaders()
354 self.sentbytescount += getattr(h, 'sentbytescount', 0) - oldbytescount 366 self.sentbytescount += getattr(h, 'sentbytescount', 0) - oldbytescount
355 367
356 try: 368 try:
357 self.parent.requestscount += 1 369 self.parent.requestscount += 1
358 self.parent.sentbytescount += ( 370 self.parent.sentbytescount += (
359 getattr(h, 'sentbytescount', 0) - oldbytescount) 371 getattr(h, 'sentbytescount', 0) - oldbytescount
372 )
360 except AttributeError: 373 except AttributeError:
361 pass 374 pass
362 375
376
363 class HTTPHandler(KeepAliveHandler, urlreq.httphandler): 377 class HTTPHandler(KeepAliveHandler, urlreq.httphandler):
364 pass 378 pass
379
365 380
366 class HTTPResponse(httplib.HTTPResponse): 381 class HTTPResponse(httplib.HTTPResponse):
367 # we need to subclass HTTPResponse in order to 382 # we need to subclass HTTPResponse in order to
368 # 1) add readline(), readlines(), and readinto() methods 383 # 1) add readline(), readlines(), and readinto() methods
369 # 2) add close_connection() methods 384 # 2) add close_connection() methods
380 # the read method wraps the original to accommodate buffering, 395 # the read method wraps the original to accommodate buffering,
381 # although read() never adds to the buffer. 396 # although read() never adds to the buffer.
382 # Both readline and readlines have been stolen with almost no 397 # Both readline and readlines have been stolen with almost no
383 # modification from socket.py 398 # modification from socket.py
384 399
385
386 def __init__(self, sock, debuglevel=0, strict=0, method=None): 400 def __init__(self, sock, debuglevel=0, strict=0, method=None):
387 extrakw = {} 401 extrakw = {}
388 if not pycompat.ispy3: 402 if not pycompat.ispy3:
389 extrakw[r'strict'] = True 403 extrakw[r'strict'] = True
390 extrakw[r'buffering'] = True 404 extrakw[r'buffering'] = True
391 httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, 405 httplib.HTTPResponse.__init__(
392 method=method, **extrakw) 406 self, sock, debuglevel=debuglevel, method=method, **extrakw
407 )
393 self.fileno = sock.fileno 408 self.fileno = sock.fileno
394 self.code = None 409 self.code = None
395 self.receivedbytescount = 0 410 self.receivedbytescount = 0
396 self._rbuf = '' 411 self._rbuf = ''
397 self._rbufsize = 8096 412 self._rbufsize = 8096
398 self._handler = None # inserted by the handler later 413 self._handler = None # inserted by the handler later
399 self._host = None # (same) 414 self._host = None # (same)
400 self._url = None # (same) 415 self._url = None # (same)
401 self._connection = None # (same) 416 self._connection = None # (same)
402 417
403 _raw_read = httplib.HTTPResponse.read 418 _raw_read = httplib.HTTPResponse.read
404 _raw_readinto = getattr(httplib.HTTPResponse, 'readinto', None) 419 _raw_readinto = getattr(httplib.HTTPResponse, 'readinto', None)
405 420
406 # Python 2.7 has a single close() which closes the socket handle. 421 # Python 2.7 has a single close() which closes the socket handle.
411 def close(self): 426 def close(self):
412 if self.fp: 427 if self.fp:
413 self.fp.close() 428 self.fp.close()
414 self.fp = None 429 self.fp = None
415 if self._handler: 430 if self._handler:
416 self._handler._request_closed(self, self._host, 431 self._handler._request_closed(
417 self._connection) 432 self, self._host, self._connection
433 )
418 434
419 def _close_conn(self): 435 def _close_conn(self):
420 self.close() 436 self.close()
421 437
422 def close_connection(self): 438 def close_connection(self):
468 while True: 484 while True:
469 if chunk_left is None: 485 if chunk_left is None:
470 line = self.fp.readline() 486 line = self.fp.readline()
471 i = line.find(';') 487 i = line.find(';')
472 if i >= 0: 488 if i >= 0:
473 line = line[:i] # strip chunk-extensions 489 line = line[:i] # strip chunk-extensions
474 try: 490 try:
475 chunk_left = int(line, 16) 491 chunk_left = int(line, 16)
476 except ValueError: 492 except ValueError:
477 # close the connection as protocol synchronization is 493 # close the connection as protocol synchronization is
478 # probably lost 494 # probably lost
494 else: 510 else:
495 parts.append(self._safe_read(chunk_left)) 511 parts.append(self._safe_read(chunk_left))
496 amt -= chunk_left 512 amt -= chunk_left
497 513
498 # we read the whole chunk, get another 514 # we read the whole chunk, get another
499 self._safe_read(2) # toss the CRLF at the end of the chunk 515 self._safe_read(2) # toss the CRLF at the end of the chunk
500 chunk_left = None 516 chunk_left = None
501 517
502 # read and discard trailer up to the CRLF terminator 518 # read and discard trailer up to the CRLF terminator
503 ### note: we shouldn't have any trailers! 519 ### note: we shouldn't have any trailers!
504 while True: 520 while True:
573 def readinto(self, dest): 589 def readinto(self, dest):
574 if self._raw_readinto is None: 590 if self._raw_readinto is None:
575 res = self.read(len(dest)) 591 res = self.read(len(dest))
576 if not res: 592 if not res:
577 return 0 593 return 0
578 dest[0:len(res)] = res 594 dest[0 : len(res)] = res
579 return len(res) 595 return len(res)
580 total = len(dest) 596 total = len(dest)
581 have = len(self._rbuf) 597 have = len(self._rbuf)
582 if have >= total: 598 if have >= total:
583 dest[0:total] = self._rbuf[:total] 599 dest[0:total] = self._rbuf[:total]
595 611
596 dest[0:have] = self._rbuf 612 dest[0:have] = self._rbuf
597 got += len(self._rbuf) 613 got += len(self._rbuf)
598 self._rbuf = '' 614 self._rbuf = ''
599 return got 615 return got
616
600 617
601 def safesend(self, str): 618 def safesend(self, str):
602 """Send `str' to the server. 619 """Send `str' to the server.
603 620
604 Shamelessly ripped off from httplib to patch a bad behavior. 621 Shamelessly ripped off from httplib to patch a bad behavior.
640 else: 657 else:
641 self.sock.sendall(str) 658 self.sock.sendall(str)
642 self.sentbytescount += len(str) 659 self.sentbytescount += len(str)
643 except socket.error as v: 660 except socket.error as v:
644 reraise = True 661 reraise = True
645 if v.args[0] == errno.EPIPE: # Broken pipe 662 if v.args[0] == errno.EPIPE: # Broken pipe
646 if self._HTTPConnection__state == httplib._CS_REQ_SENT: 663 if self._HTTPConnection__state == httplib._CS_REQ_SENT:
647 self._broken_pipe_resp = None 664 self._broken_pipe_resp = None
648 self._broken_pipe_resp = self.getresponse() 665 self._broken_pipe_resp = self.getresponse()
649 reraise = False 666 reraise = False
650 self.close() 667 self.close()
651 if reraise: 668 if reraise:
652 raise 669 raise
653 670
671
654 def wrapgetresponse(cls): 672 def wrapgetresponse(cls):
655 """Wraps getresponse in cls with a broken-pipe sane version. 673 """Wraps getresponse in cls with a broken-pipe sane version.
656 """ 674 """
675
657 def safegetresponse(self): 676 def safegetresponse(self):
658 # In safesend() we might set the _broken_pipe_resp 677 # In safesend() we might set the _broken_pipe_resp
659 # attribute, in which case the socket has already 678 # attribute, in which case the socket has already
660 # been closed and we just need to give them the response 679 # been closed and we just need to give them the response
661 # back. Otherwise, we use the normal response path. 680 # back. Otherwise, we use the normal response path.
662 r = getattr(self, '_broken_pipe_resp', None) 681 r = getattr(self, '_broken_pipe_resp', None)
663 if r is not None: 682 if r is not None:
664 return r 683 return r
665 return cls.getresponse(self) 684 return cls.getresponse(self)
685
666 safegetresponse.__doc__ = cls.getresponse.__doc__ 686 safegetresponse.__doc__ = cls.getresponse.__doc__
667 return safegetresponse 687 return safegetresponse
688
668 689
669 class HTTPConnection(httplib.HTTPConnection): 690 class HTTPConnection(httplib.HTTPConnection):
670 # url.httpsconnection inherits from this. So when adding/removing 691 # url.httpsconnection inherits from this. So when adding/removing
671 # attributes, be sure to audit httpsconnection() for unintended 692 # attributes, be sure to audit httpsconnection() for unintended
672 # consequences. 693 # consequences.
678 699
679 def __init__(self, *args, **kwargs): 700 def __init__(self, *args, **kwargs):
680 httplib.HTTPConnection.__init__(self, *args, **kwargs) 701 httplib.HTTPConnection.__init__(self, *args, **kwargs)
681 self.sentbytescount = 0 702 self.sentbytescount = 0
682 self.receivedbytescount = 0 703 self.receivedbytescount = 0
704
683 705
684 ######################################################################### 706 #########################################################################
685 ##### TEST FUNCTIONS 707 ##### TEST FUNCTIONS
686 ######################################################################### 708 #########################################################################
687 709
719 break 741 break
720 fo.close() 742 fo.close()
721 m = md5(foo) 743 m = md5(foo)
722 print(format % ('keepalive readline', node.hex(m.digest()))) 744 print(format % ('keepalive readline', node.hex(m.digest())))
723 745
746
724 def comp(N, url): 747 def comp(N, url):
725 print(' making %i connections to:\n %s' % (N, url)) 748 print(' making %i connections to:\n %s' % (N, url))
726 749
727 procutil.stdout.write(' first using the normal urllib handlers') 750 procutil.stdout.write(' first using the normal urllib handlers')
728 # first use normal opener 751 # first use normal opener
737 urlreq.installopener(opener) 760 urlreq.installopener(opener)
738 t2 = fetch(N, url) 761 t2 = fetch(N, url)
739 print(' TIME: %.3f s' % t2) 762 print(' TIME: %.3f s' % t2)
740 print(' improvement factor: %.2f' % (t1 / t2)) 763 print(' improvement factor: %.2f' % (t1 / t2))
741 764
765
742 def fetch(N, url, delay=0): 766 def fetch(N, url, delay=0):
743 import time 767 import time
768
744 lens = [] 769 lens = []
745 starttime = time.time() 770 starttime = time.time()
746 for i in range(N): 771 for i in range(N):
747 if delay and i > 0: 772 if delay and i > 0:
748 time.sleep(delay) 773 time.sleep(delay)
758 if not i == lens[0]: 783 if not i == lens[0]:
759 print("WARNING: inconsistent length on read %i: %i" % (j, i)) 784 print("WARNING: inconsistent length on read %i: %i" % (j, i))
760 785
761 return diff 786 return diff
762 787
788
763 def test_timeout(url): 789 def test_timeout(url):
764 global DEBUG 790 global DEBUG
765 dbbackup = DEBUG 791 dbbackup = DEBUG
792
766 class FakeLogger(object): 793 class FakeLogger(object):
767 def debug(self, msg, *args): 794 def debug(self, msg, *args):
768 print(msg % args) 795 print(msg % args)
796
769 info = warning = error = debug 797 info = warning = error = debug
798
770 DEBUG = FakeLogger() 799 DEBUG = FakeLogger()
771 print(" fetching the file to establish a connection") 800 print(" fetching the file to establish a connection")
772 fo = urlreq.urlopen(url) 801 fo = urlreq.urlopen(url)
773 data1 = fo.read() 802 data1 = fo.read()
774 fo.close() 803 fo.close()
803 comp(N, url) 832 comp(N, url)
804 print('') 833 print('')
805 print("performing dropped-connection check") 834 print("performing dropped-connection check")
806 test_timeout(url) 835 test_timeout(url)
807 836
837
808 if __name__ == '__main__': 838 if __name__ == '__main__':
809 import time 839 import time
840
810 try: 841 try:
811 N = int(sys.argv[1]) 842 N = int(sys.argv[1])
812 url = sys.argv[2] 843 url = sys.argv[2]
813 except (IndexError, ValueError): 844 except (IndexError, ValueError):
814 print("%s <integer> <url>" % sys.argv[0]) 845 print("%s <integer> <url>" % sys.argv[0])