summaryrefslogtreecommitdiffstats
path: root/java/com/google/gerrit/server/schema/SchemaVersion.java
blob: da87c506912113b879269a35b2df0bc0447e5f85 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.gerrit.server.schema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.gerrit.reviewdb.client.CurrentSchemaVersion;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.reviewdb.server.ReviewDbUtil;
import com.google.gerrit.server.UsedAt;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gwtorm.jdbc.JdbcExecutor;
import com.google.gwtorm.jdbc.JdbcSchema;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.gwtorm.server.StatementExecutor;
import com.google.inject.Provider;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jgit.internal.storage.file.FileRepository;
import org.eclipse.jgit.internal.storage.file.GC;
import org.eclipse.jgit.lib.CommitBuilder;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.storage.pack.PackConfig;

/** A version of the database schema. */
public abstract class SchemaVersion {
  /** The current schema version. */
  public static final Class<Schema_170> C = Schema_170.class;

  public static int getBinaryVersion() {
    return guessVersion(C);
  }

  private static final ExecutorService backgroundGcThread = Executors.newFixedThreadPool(1);
  private final Provider<? extends SchemaVersion> prior;
  private final int versionNbr;
  private static SortedSet<Project.NameKey> projects;

  protected SchemaVersion(Provider<? extends SchemaVersion> prior) {
    this.prior = prior;
    this.versionNbr = guessVersion(getClass());
  }

  @UsedAt(UsedAt.Project.PLUGIN_DELETE_PROJECT)
  public static int guessVersion(Class<?> c) {
    String n = c.getName();
    n = n.substring(n.lastIndexOf('_') + 1);
    while (n.startsWith("0")) {
      n = n.substring(1);
    }
    return Integer.parseInt(n);
  }

  /** @return the {@link CurrentSchemaVersion#versionNbr} this step targets. */
  public final int getVersionNbr() {
    return versionNbr;
  }

  @VisibleForTesting
  public final SchemaVersion getPrior() {
    return prior.get();
  }

  public final void check(UpdateUI ui, CurrentSchemaVersion curr, SchemaFactory<ReviewDb> schema)
      throws OrmException, SQLException {
    if (curr.versionNbr == versionNbr) {
      // Nothing to do, we are at the correct schema.
    } else if (curr.versionNbr > versionNbr) {
      throw new OrmException(
          "Cannot downgrade database schema from version "
              + curr.versionNbr
              + " to "
              + versionNbr
              + ".");
    } else {
      upgradeFrom(ui, curr, schema);
    }
  }

  /** Runs check on the prior schema version, and then upgrades. */
  private void upgradeFrom(UpdateUI ui, CurrentSchemaVersion curr, SchemaFactory<ReviewDb> schema)
      throws OrmException, SQLException {
    List<SchemaVersion> pending = pending(curr.versionNbr);

    try (ReviewDb db = ReviewDbUtil.unwrapDb(schema.open())) {
      updateSchema(pending, ui, db);
    }

    migrateData(pending, ui, curr, schema);

    try (ReviewDb db = ReviewDbUtil.unwrapDb(schema.open())) {
      JdbcSchema s = (JdbcSchema) db;
      List<String> pruneList = new ArrayList<>();
      s.pruneSchema(
          new StatementExecutor() {
            @Override
            public void execute(String sql) {
              pruneList.add(sql);
            }

            @Override
            public void close() {
              // Do nothing.
            }
          });

      try (JdbcExecutor e = new JdbcExecutor(s)) {
        if (!pruneList.isEmpty()) {
          ui.pruneSchema(e, pruneList);
        }
      }
    }
  }

  private List<SchemaVersion> pending(int curr) {
    List<SchemaVersion> r = Lists.newArrayListWithCapacity(versionNbr - curr);
    for (SchemaVersion v = this; curr < v.getVersionNbr(); v = v.prior.get()) {
      r.add(v);
    }
    Collections.reverse(r);
    return r;
  }

  private void updateSchema(List<SchemaVersion> pending, UpdateUI ui, ReviewDb db)
      throws OrmException, SQLException {
    for (SchemaVersion v : pending) {
      ui.message(String.format("Upgrading schema to %d ...", v.getVersionNbr()));
      v.preUpdateSchema(db);
    }

    JdbcSchema s = (JdbcSchema) db;
    try (JdbcExecutor e = new JdbcExecutor(s)) {
      s.updateSchema(e);
    }
  }

  /**
   * Invoked before updateSchema adds new columns/tables.
   *
   * @param db open database handle.
   * @throws OrmException if a Gerrit-specific exception occurred.
   * @throws SQLException if an underlying SQL exception occurred.
   */
  protected void preUpdateSchema(ReviewDb db) throws OrmException, SQLException {}

  private void migrateData(
      List<SchemaVersion> pending,
      UpdateUI ui,
      CurrentSchemaVersion curr,
      SchemaFactory<ReviewDb> schema)
      throws OrmException, SQLException {
    for (SchemaVersion v : pending) {
      Stopwatch sw = Stopwatch.createStarted();
      ui.message(String.format("Migrating data to schema %d ...", v.getVersionNbr()));
      try (ReviewDb db = ReviewDbUtil.unwrapDb(schema.open())) {
        v.migrateData(db, ui);
      }
      try (ReviewDb db = ReviewDbUtil.unwrapDb(schema.open())) {
        v.finish(curr, db);
      }
      ui.message(String.format("\t> Done (%.3f s)", sw.elapsed(TimeUnit.MILLISECONDS) / 1000d));
    }
  }

  /**
   * Invoked between updateSchema (adds new columns/tables) and pruneSchema (removes deleted
   * columns/tables).
   *
   * @param db open database handle.
   * @param ui interface for interacting with the user.
   * @throws OrmException if a Gerrit-specific exception occurred.
   * @throws SQLException if an underlying SQL exception occurred.
   */
  protected void migrateData(ReviewDb db, UpdateUI ui) throws OrmException, SQLException {}

  /** Mark the current schema version. */
  protected void finish(CurrentSchemaVersion curr, ReviewDb db) throws OrmException {
    curr.versionNbr = versionNbr;
    db.schemaVersion().update(Collections.singleton(curr));
  }

  /** Rename an existing table. */
  protected static void renameTable(ReviewDb db, String from, String to) throws OrmException {
    JdbcSchema s = (JdbcSchema) db;
    try (JdbcExecutor e = new JdbcExecutor(s)) {
      s.renameTable(e, from, to);
    }
  }

  /** Rename an existing column. */
  protected static void renameColumn(ReviewDb db, String table, String from, String to)
      throws OrmException {
    JdbcSchema s = (JdbcSchema) db;
    try (JdbcExecutor e = new JdbcExecutor(s)) {
      s.renameColumn(e, table, from, to);
    }
  }

  /** Execute an SQL statement. */
  protected static void execute(ReviewDb db, String sql) throws SQLException {
    try (Statement s = newStatement(db)) {
      s.execute(sql);
    }
  }

  /** Open a new single statement. */
  protected static Statement newStatement(ReviewDb db) throws SQLException {
    return ((JdbcSchema) db).getConnection().createStatement();
  }

  /** Open a new prepared statement. */
  protected static PreparedStatement prepareStatement(ReviewDb db, String sql) throws SQLException {
    return ((JdbcSchema) db).getConnection().prepareStatement(sql);
  }

  /** Open a new statement executor. */
  protected static JdbcExecutor newExecutor(ReviewDb db) throws OrmException {
    return new JdbcExecutor(((JdbcSchema) db).getConnection());
  }

  protected static SortedSet<Project.NameKey> getSortedProjectsFromCache(
      GitRepositoryManager repoManager) {
    // TODO: Use Injection/Supplier pattern
    if (projects == null) {
      projects = repoManager.list();
    }
    return projects;
  }

  protected int getThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  protected ExecutorService createExecutor(UpdateUI ui) {
    int threads = getThreads();
    ui.message(String.format("... using %d threads ...", threads));
    return Executors.newFixedThreadPool(threads);
  }

  @FunctionalInterface
  protected interface ThrowingFunction<I, T> {
    default T accept(I input) {
      try {
        return acceptWithThrow(input);
      } catch (OrmException | IOException e) {
        throw new RuntimeException(e);
      }
    }

    T acceptWithThrow(I input) throws OrmException, IOException;
  }

  protected Collection<?> runParallelTasks(
      ExecutorService executor, Collection<?> lst, ThrowingFunction task, UpdateUI ui) {
    Collection<Object> returnSet = new HashSet<>();
    Set<Future> futures =
        lst.stream()
            .map(each -> executor.submit(() -> task.accept(each)))
            .collect(Collectors.toSet());
    for (Future each : futures) {
      try {
        Object rtn = each.get();
        if (Objects.nonNull(rtn)) {
          returnSet.add(rtn);
        }
      } catch (InterruptedException e) {
        ui.message(
            String.format(
                "Migration step was interrupted. Only %d of %d tasks done.",
                countDone(futures), lst.size()));
        throw new RuntimeException(e);
      } catch (ExecutionException e) {
        ui.message(e.getCause().getMessage());
      }
    }
    return returnSet;
  }

  protected CommitBuilder buildCommit(PersonIdent ident, ObjectId tree, String message) {
    CommitBuilder cb = new CommitBuilder();
    cb.setTreeId(tree);
    cb.setCommitter(ident);
    cb.setAuthor(ident);
    cb.setMessage(message);
    return cb;
  }

  /**
   * Add a task to GC a project to a single thread serving GC requests. If a GC task already exists
   * for that project, then the current request to GC it will be ignored.
   *
   * @param repoManager Git repository manager to open the git repository.
   * @param project Project name.
   * @param ui Interface for interacting with the user.
   */
  protected synchronized void runGcInBackground(
      GitRepositoryManager repoManager, Project.NameKey project, UpdateUI ui) {
    for (Runnable task : ((ThreadPoolExecutor) backgroundGcThread).getQueue()) {
      if (task instanceof GcTask) {
        if (((GcTask) task).project.equals(project)) {
          return;
        }
      }
    }
    backgroundGcThread.execute(new GcTask(project, repoManager, ui));
  }

  protected void gc(Repository repo, UpdateUI ui, ProgressMonitor pm, Stopwatch sw)
      throws IOException, ParseException {
    FileRepository r = (FileRepository) repo;
    GC gc = new GC(r);
    gc.setProgressMonitor(pm);
    pm.beginTask("gc", ProgressMonitor.UNKNOWN);
    // TODO(ms): Enable bitmap index when this JGit performance issue is fixed:
    // https://bugs.eclipse.org/bugs/show_bug.cgi?id=562740
    PackConfig pconfig = new PackConfig(repo);
    pconfig.setBuildBitmaps(false);
    gc.setPackConfig(pconfig);
    ui.message(
        String.format("... (%.3f s) gc --prune=now", sw.elapsed(TimeUnit.MILLISECONDS) / 1000d));
    gc.setExpire(new Date());
    gc.gc();
  }

  protected static ObjectId emptyTree(ObjectInserter oi) throws IOException {
    return oi.insert(Constants.OBJ_TREE, new byte[] {});
  }

  protected static ObjectInserter getPackInserterFirst(Repository repo) {
    if (repo instanceof FileRepository) {
      return ((FileRepository) repo).getObjectDatabase().newPackInserter();
    }
    return repo.getObjectDatabase().newInserter();
  }

  private static long countDone(Collection<Future> futures) {
    return futures.stream().filter(Future::isDone).count();
  }

  private class GcTask implements Runnable {
    final GitRepositoryManager repoManager;
    final Project.NameKey project;
    final UpdateUI ui;

    public GcTask(Project.NameKey project, GitRepositoryManager repoManager, UpdateUI ui) {
      this.project = project;
      this.repoManager = repoManager;
      this.ui = ui;
    }

    @Override
    public void run() {
      try (Repository repo = repoManager.openRepository(project)) {
        ProgressMonitor pm = new TextProgressMonitor();
        try {
          // Empty reference folders are only deleted if they have not been modified
          // in the last 30s. Add a delay so that their deletion is not skipped.
          Thread.sleep(30000);
          Stopwatch sw = Stopwatch.createStarted();
          gc(repo, ui, pm, sw);
        } catch (IOException | ParseException | InterruptedException ex) {
          ui.message("GC on " + project.get() + " failed with error: " + ex);
        } finally {
          pm.endTask();
        }
      } catch (IOException ex) {
        ui.message("GC on " + project.get() + " failed with error: " + ex);
      }
    }
  }
}