hgext/infinitepush/sqlindexapi.py
changeset 37189 03ff17a4bf53
child 37236 c1fac3878196
equal deleted inserted replaced
37188:6d43b39fbaa0 37189:03ff17a4bf53
       
     1 # Infinite push
       
     2 #
       
     3 # Copyright 2016 Facebook, Inc.
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 from __future__ import absolute_import
       
     9 
       
    10 import logging
       
    11 import os
       
    12 import time
       
    13 
       
    14 import warnings
       
    15 import mysql.connector
       
    16 
       
    17 from . import indexapi
       
    18 
       
    19 def _convertbookmarkpattern(pattern):
       
    20     pattern = pattern.replace('_', '\\_')
       
    21     pattern = pattern.replace('%', '\\%')
       
    22     if pattern.endswith('*'):
       
    23         pattern = pattern[:-1] + '%'
       
    24     return pattern
       
    25 
       
    26 class sqlindexapi(indexapi.indexapi):
       
    27     '''
       
    28     Sql backend for infinitepush index. See schema.sql
       
    29     '''
       
    30 
       
    31     def __init__(self, reponame, host, port,
       
    32                  database, user, password, logfile, loglevel,
       
    33                  waittimeout=300, locktimeout=120):
       
    34         super(sqlindexapi, self).__init__()
       
    35         self.reponame = reponame
       
    36         self.sqlargs = {
       
    37             'host': host,
       
    38             'port': port,
       
    39             'database': database,
       
    40             'user': user,
       
    41             'password': password,
       
    42         }
       
    43         self.sqlconn = None
       
    44         self.sqlcursor = None
       
    45         if not logfile:
       
    46             logfile = os.devnull
       
    47         logging.basicConfig(filename=logfile)
       
    48         self.log = logging.getLogger()
       
    49         self.log.setLevel(loglevel)
       
    50         self._connected = False
       
    51         self._waittimeout = waittimeout
       
    52         self._locktimeout = locktimeout
       
    53 
       
    54     def sqlconnect(self):
       
    55         if self.sqlconn:
       
    56             raise indexapi.indexexception("SQL connection already open")
       
    57         if self.sqlcursor:
       
    58             raise indexapi.indexexception("SQL cursor already open without"
       
    59                                           " connection")
       
    60         retry = 3
       
    61         while True:
       
    62             try:
       
    63                 self.sqlconn = mysql.connector.connect(
       
    64                     force_ipv6=True, **self.sqlargs)
       
    65 
       
    66                 # Code is copy-pasted from hgsql. Bug fixes need to be
       
    67                 # back-ported!
       
    68                 # The default behavior is to return byte arrays, when we
       
    69                 # need strings. This custom convert returns strings.
       
    70                 self.sqlconn.set_converter_class(CustomConverter)
       
    71                 self.sqlconn.autocommit = False
       
    72                 break
       
    73             except mysql.connector.errors.Error:
       
    74                 # mysql can be flakey occasionally, so do some minimal
       
    75                 # retrying.
       
    76                 retry -= 1
       
    77                 if retry == 0:
       
    78                     raise
       
    79                 time.sleep(0.2)
       
    80 
       
    81         waittimeout = self.sqlconn.converter.escape('%s' % self._waittimeout)
       
    82 
       
    83         self.sqlcursor = self.sqlconn.cursor()
       
    84         self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout)
       
    85         self.sqlcursor.execute("SET innodb_lock_wait_timeout=%s" %
       
    86                                self._locktimeout)
       
    87         self._connected = True
       
    88 
       
    89     def close(self):
       
    90         """Cleans up the metadata store connection."""
       
    91         with warnings.catch_warnings():
       
    92             warnings.simplefilter("ignore")
       
    93             self.sqlcursor.close()
       
    94             self.sqlconn.close()
       
    95         self.sqlcursor = None
       
    96         self.sqlconn = None
       
    97 
       
    98     def __enter__(self):
       
    99         if not self._connected:
       
   100             self.sqlconnect()
       
   101         return self
       
   102 
       
   103     def __exit__(self, exc_type, exc_val, exc_tb):
       
   104         if exc_type is None:
       
   105             self.sqlconn.commit()
       
   106         else:
       
   107             self.sqlconn.rollback()
       
   108 
       
   109     def addbundle(self, bundleid, nodesctx):
       
   110         if not self._connected:
       
   111             self.sqlconnect()
       
   112         self.log.info("ADD BUNDLE %r %r" % (self.reponame, bundleid))
       
   113         self.sqlcursor.execute(
       
   114             "INSERT INTO bundles(bundle, reponame) VALUES "
       
   115             "(%s, %s)", params=(bundleid, self.reponame))
       
   116         for ctx in nodesctx:
       
   117             self.sqlcursor.execute(
       
   118                 "INSERT INTO nodestobundle(node, bundle, reponame) "
       
   119                 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE "
       
   120                 "bundle=VALUES(bundle)",
       
   121                 params=(ctx.hex(), bundleid, self.reponame))
       
   122 
       
   123             extra = ctx.extra()
       
   124             author_name = ctx.user()
       
   125             committer_name = extra.get('committer', ctx.user())
       
   126             author_date = int(ctx.date()[0])
       
   127             committer_date = int(extra.get('committer_date', author_date))
       
   128             self.sqlcursor.execute(
       
   129                 "INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, "
       
   130                 "author, committer, author_date, committer_date, "
       
   131                 "reponame) VALUES "
       
   132                 "(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
       
   133                 params=(ctx.hex(), ctx.description(),
       
   134                         ctx.p1().hex(), ctx.p2().hex(), author_name,
       
   135                         committer_name, author_date, committer_date,
       
   136                         self.reponame)
       
   137             )
       
   138 
       
   139     def addbookmark(self, bookmark, node):
       
   140         """Takes a bookmark name and hash, and records mapping in the metadata
       
   141         store."""
       
   142         if not self._connected:
       
   143             self.sqlconnect()
       
   144         self.log.info(
       
   145             "ADD BOOKMARKS %r bookmark: %r node: %r" %
       
   146             (self.reponame, bookmark, node))
       
   147         self.sqlcursor.execute(
       
   148             "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
       
   149             "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)",
       
   150             params=(bookmark, node, self.reponame))
       
   151 
       
   152     def addmanybookmarks(self, bookmarks):
       
   153         if not self._connected:
       
   154             self.sqlconnect()
       
   155         args = []
       
   156         values = []
       
   157         for bookmark, node in bookmarks.iteritems():
       
   158             args.append('(%s, %s, %s)')
       
   159             values.extend((bookmark, node, self.reponame))
       
   160         args = ','.join(args)
       
   161 
       
   162         self.sqlcursor.execute(
       
   163             "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
       
   164             "VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args,
       
   165             params=values)
       
   166 
       
   167     def deletebookmarks(self, patterns):
       
   168         """Accepts list of bookmark patterns and deletes them.
       
   169         If `commit` is set then bookmark will actually be deleted. Otherwise
       
   170         deletion will be delayed until the end of transaction.
       
   171         """
       
   172         if not self._connected:
       
   173             self.sqlconnect()
       
   174         self.log.info("DELETE BOOKMARKS: %s" % patterns)
       
   175         for pattern in patterns:
       
   176             pattern = _convertbookmarkpattern(pattern)
       
   177             self.sqlcursor.execute(
       
   178                 "DELETE from bookmarkstonode WHERE bookmark LIKE (%s) "
       
   179                 "and reponame = %s",
       
   180                 params=(pattern, self.reponame))
       
   181 
       
   182     def getbundle(self, node):
       
   183         """Returns the bundleid for the bundle that contains the given node."""
       
   184         if not self._connected:
       
   185             self.sqlconnect()
       
   186         self.log.info("GET BUNDLE %r %r" % (self.reponame, node))
       
   187         self.sqlcursor.execute(
       
   188             "SELECT bundle from nodestobundle "
       
   189             "WHERE node = %s AND reponame = %s", params=(node, self.reponame))
       
   190         result = self.sqlcursor.fetchall()
       
   191         if len(result) != 1 or len(result[0]) != 1:
       
   192             self.log.info("No matching node")
       
   193             return None
       
   194         bundle = result[0][0]
       
   195         self.log.info("Found bundle %r" % bundle)
       
   196         return bundle
       
   197 
       
   198     def getnode(self, bookmark):
       
   199         """Returns the node for the given bookmark. None if it doesn't exist."""
       
   200         if not self._connected:
       
   201             self.sqlconnect()
       
   202         self.log.info(
       
   203             "GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark))
       
   204         self.sqlcursor.execute(
       
   205             "SELECT node from bookmarkstonode WHERE "
       
   206             "bookmark = %s AND reponame = %s", params=(bookmark, self.reponame))
       
   207         result = self.sqlcursor.fetchall()
       
   208         if len(result) != 1 or len(result[0]) != 1:
       
   209             self.log.info("No matching bookmark")
       
   210             return None
       
   211         node = result[0][0]
       
   212         self.log.info("Found node %r" % node)
       
   213         return node
       
   214 
       
   215     def getbookmarks(self, query):
       
   216         if not self._connected:
       
   217             self.sqlconnect()
       
   218         self.log.info(
       
   219             "QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query))
       
   220         query = _convertbookmarkpattern(query)
       
   221         self.sqlcursor.execute(
       
   222             "SELECT bookmark, node from bookmarkstonode WHERE "
       
   223             "reponame = %s AND bookmark LIKE %s",
       
   224             params=(self.reponame, query))
       
   225         result = self.sqlcursor.fetchall()
       
   226         bookmarks = {}
       
   227         for row in result:
       
   228             if len(row) != 2:
       
   229                 self.log.info("Bad row returned: %s" % row)
       
   230                 continue
       
   231             bookmarks[row[0]] = row[1]
       
   232         return bookmarks
       
   233 
       
   234     def saveoptionaljsonmetadata(self, node, jsonmetadata):
       
   235         if not self._connected:
       
   236             self.sqlconnect()
       
   237         self.log.info(
       
   238             ("INSERT METADATA, QUERY BOOKMARKS reponame: %r " +
       
   239              "node: %r, jsonmetadata: %s") %
       
   240             (self.reponame, node, jsonmetadata))
       
   241 
       
   242         self.sqlcursor.execute(
       
   243             "UPDATE nodesmetadata SET optional_json_metadata=%s WHERE "
       
   244             "reponame=%s AND node=%s",
       
   245             params=(jsonmetadata, self.reponame, node))
       
   246 
       
   247 class CustomConverter(mysql.connector.conversion.MySQLConverter):
       
   248     """Ensure that all values being returned are returned as python string
       
   249     (versus the default byte arrays)."""
       
   250     def _STRING_to_python(self, value, dsc=None):
       
   251         return str(value)
       
   252 
       
   253     def _VAR_STRING_to_python(self, value, dsc=None):
       
   254         return str(value)
       
   255 
       
   256     def _BLOB_to_python(self, value, dsc=None):
       
   257         return str(value)