diff options
3 files changed, 43 insertions, 4 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 990e387..cf83df8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -27,6 +27,7 @@ import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.util.logging.NamedFluentLogger; import com.google.inject.Inject; import com.google.inject.Provider; @@ -59,6 +60,7 @@ public class ReplicationQueue private final DynamicItem<EventDispatcher> dispatcher; private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency private final ReplicationTasksStorage replicationTasksStorage; + private final ProjectCache projectCache; private volatile boolean running; private volatile boolean replaying; private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; @@ -71,13 +73,15 @@ public class ReplicationQueue Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, - ReplicationTasksStorage rts) { + ReplicationTasksStorage rts, + ProjectCache pc) { replConfig = rc; workQueue = wq; dispatcher = dis; destinations = rd; stateLog = sl; replicationTasksStorage = rts; + projectCache = pc; beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); } @@ -194,7 +198,12 @@ public class ReplicationQueue replaying = true; for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { try { - fire(new URIish(t.uri), Project.nameKey(t.project), t.ref); + Project.NameKey projectName = Project.nameKey(t.project); + if (!projectCache.get(projectName).isPresent()) { + repLog.atSevere().log("Cannot replicate %s; Local repository does not exist"); + replicationTasksStorage.cancelWaiting(t); + } + fire(new URIish(t.uri), projectName, t.ref); } catch (URISyntaxException e) { repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 3947ebc..a0e9094 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java @@ -148,6 +148,10 @@ public class ReplicationTasksStorage { } } + public void cancelWaiting(ReplicateRefUpdate update) { + new Task(update).cancelWaiting(); + } + public List<ReplicateRefUpdate> listWaiting() { return list(createDir(waitingUpdates)); } @@ -237,9 +241,22 @@ public class ReplicationTasksStorage { } public void finish() { + logger.atFine().log("DELETE %s %s", running, updateLog()); + deleteTask(running); + } + + /** + * Should only be used when it's impossible for the waiting task to become running, such as when + * the project no longer exists + */ + public void cancelWaiting() { + logger.atFine().log("DELETE %s %s", waiting, updateLog()); + deleteTask(waiting); + } + + private void deleteTask(Path p) { try { - logger.atFine().log("DELETE %s %s", running, updateLog()); - Files.delete(running); + Files.delete(p); } catch (IOException e) { logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java index d9fbbe5..46ce31f 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java @@ -281,6 +281,19 @@ public class ReplicationTasksStorageTaskTest { } @Test + public void canCancelWaitingTask() throws Exception { + Task task = tasksStorage.new Task(REF_UPDATE); + + task.create(); + assertIsWaiting(task); + + task.cancelWaiting(); + + assertNotWaiting(task); + assertNotRunning(task); + } + + @Test public void illegalFinishNonRunningTaskIsGraceful() throws Exception { Task task = tasksStorage.new Task(REF_UPDATE); task.finish(); |