diff options
5 files changed, 43 insertions, 42 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index e2f9515..8054eb4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -26,9 +26,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; -import com.google.gerrit.common.data.GroupReference; import com.google.gerrit.entities.AccountGroup; import com.google.gerrit.entities.BranchNameKey; +import com.google.gerrit.entities.GroupReference; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; import com.google.gerrit.exceptions.StorageException; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java index c3556af..431c7d2 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java @@ -28,10 +28,4 @@ public class RemoteSiteUser extends CurrentUser { public GroupMembership getEffectiveGroups() { return effectiveGroups; } - - @Override - public Object getCacheKey() { - // Never cache a remote user - return new Object(); - } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java index 5b4204e..28f2fba 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java @@ -14,7 +14,7 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.common.data.AccessSection; +import com.google.gerrit.entities.AccessSection; import com.google.gerrit.entities.Project; import java.util.Collections; import java.util.List; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 5a4d56c..94b3f31 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -203,10 +203,6 @@ public class ReplicationQueue try { replaying = true; for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { - if (t == null) { - repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage"); - continue; - } try { fire(new URIish(t.uri), Project.nameKey(t.project), t.ref); } catch (URISyntaxException e) { diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index e1b1c0a..fe5e7cb 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java @@ -24,14 +24,15 @@ import com.google.inject.Inject; import com.google.inject.ProvisionException; import com.google.inject.Singleton; import java.io.IOException; -import java.nio.file.DirectoryIteratorException; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; @@ -64,6 +65,24 @@ public class ReplicationTasksStorage { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); public static class ReplicateRefUpdate { + public static Optional<ReplicateRefUpdate> createOptionally(Path file) { + try { + return Optional.of(create(file)); + } catch (NoSuchFileException e) { + logger.atFine().log("File %s not found while reading task", file); + } catch (IOException e) { + if (!e.getMessage().contains("not a regular file")) { + logger.atSevere().withCause(e).log("Error while reading task %s", file); + } + } + return Optional.empty(); + } + + public static ReplicateRefUpdate create(Path file) throws IOException { + String json = new String(Files.readAllBytes(file), UTF_8); + return GSON.fromJson(json, ReplicateRefUpdate.class); + } + public final String project; public final String ref; public final String uri; @@ -142,34 +161,26 @@ public class ReplicationTasksStorage { return list(createDir(runningUpdates)); } - private List<ReplicateRefUpdate> list(Path tasks) { - List<ReplicateRefUpdate> results = new ArrayList<>(); - try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) { - for (Path path : events) { - if (Files.isRegularFile(path)) { - try { - String json = new String(Files.readAllBytes(path), UTF_8); - results.add(GSON.fromJson(json, ReplicateRefUpdate.class)); - } catch (NoSuchFileException ex) { - logger.atFine().log( - "File %s not found while listing waiting tasks (likely in-flight or completed by another node)", - path); - } catch (IOException e) { - logger.atSevere().withCause(e).log("Error when firing pending event %s", path); - } - } else if (Files.isDirectory(path)) { - try { - results.addAll(list(path)); - } catch (DirectoryIteratorException d) { - // iterating over the sub-directories is expected to have dirs disappear - Nfs.throwIfNotStaleFileHandle(d.getCause()); - } - } - } - } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while listing tasks"); + private List<ReplicateRefUpdate> list(Path taskDir) { + return streamRecursive(taskDir).collect(Collectors.toList()); + } + + private Stream<ReplicateRefUpdate> streamRecursive(Path dir) { + return walk(dir) + .map(path -> ReplicateRefUpdate.createOptionally(path)) + .filter(Optional::isPresent) + .map(Optional::get); + } + + private static Stream<Path> walk(Path path) { + try { + return Stream.concat(Stream.of(path), Files.list(path).flatMap(sub -> walk(sub))); + } catch (NotDirectoryException e) { + return Stream.of(path); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error while walking directory %s", path); + return Stream.empty(); } - return results; } @SuppressWarnings("deprecation") |