Removed .close()
call from container-managed entity manager
This commit is contained in:
@@ -96,74 +96,70 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
@TransactionAttribute(TransactionAttributeType.REQUIRED)
|
||||
@Schedule(hour = "4", minute = "00", persistent = false)
|
||||
public void syncDB() {
|
||||
try {
|
||||
logger.info("Starting repository cleanup");
|
||||
//Removes from DB the packages that have been deleted from filesystem
|
||||
logger.info("Searching for packages that are no more in the filesystem");
|
||||
List<String> resultList = Queries.listAllPackageFiles(em)
|
||||
.getResultList();
|
||||
logger.info("Got list of filenames from db");
|
||||
Set<String> knownPkg = resultList.stream().filter(fileName -> {
|
||||
Path file = ctx.getFile(fileName);
|
||||
boolean result = Files.exists(file);
|
||||
if (!result) {
|
||||
logger.info("Removing package {} which was not found in filesystem", file.getFileName());
|
||||
Queries.getPackageByFileName(em, file.getFileName().toString())
|
||||
.getResultList()
|
||||
.forEach(pkgData -> deletePkgData(em, pkgData));
|
||||
}
|
||||
return result;
|
||||
}).collect(Collectors.toUnmodifiableSet());
|
||||
logger.info("Searching for new packages or packages that were modified after being added to the database");
|
||||
CompletionService<PkgData> completionService = new ExecutorCompletionService<>(executor);
|
||||
final Set<Future<PkgData>> inProgress = new HashSet<>();
|
||||
final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5;
|
||||
Sup<Stream<Path>> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> {
|
||||
String name = file.getFileName().toString();
|
||||
return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz");
|
||||
});
|
||||
long[] count = new long[] {0};
|
||||
long totalPackages = fileListStreamSupplier.get().count();
|
||||
logger.info("Starting repository cleanup");
|
||||
//Removes from DB the packages that have been deleted from filesystem
|
||||
logger.info("Searching for packages that are no more in the filesystem");
|
||||
List<String> resultList = Queries.listAllPackageFiles(em)
|
||||
.getResultList();
|
||||
logger.info("Got list of filenames from db");
|
||||
Set<String> knownPkg = resultList.stream().filter(fileName -> {
|
||||
Path file = ctx.getFile(fileName);
|
||||
boolean result = Files.exists(file);
|
||||
if (!result) {
|
||||
logger.info("Removing package {} which was not found in filesystem", file.getFileName());
|
||||
Queries.getPackageByFileName(em, file.getFileName().toString())
|
||||
.getResultList()
|
||||
.forEach(pkgData -> deletePkgData(em, pkgData));
|
||||
}
|
||||
return result;
|
||||
}).collect(Collectors.toUnmodifiableSet());
|
||||
logger.info("Searching for new packages or packages that were modified after being added to the database");
|
||||
CompletionService<PkgData> completionService = new ExecutorCompletionService<>(executor);
|
||||
final Set<Future<PkgData>> inProgress = new HashSet<>();
|
||||
final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5;
|
||||
Sup<Stream<Path>> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> {
|
||||
String name = file.getFileName().toString();
|
||||
return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz");
|
||||
});
|
||||
long[] count = new long[]{0};
|
||||
long totalPackages = fileListStreamSupplier.get().count();
|
||||
|
||||
Con<Boolean> persistPackages = (Boolean drain) -> {
|
||||
while ((drain && inProgress.size() > 0) || inProgress.size() > maxInProgress) {
|
||||
Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS))
|
||||
.ifPresent((Con<Future<PkgData>>) future -> {
|
||||
inProgress.remove(future);
|
||||
PkgData pkgData;
|
||||
try {
|
||||
pkgData = future.get();
|
||||
} catch (ExecutionException ee) {
|
||||
throw ee.getCause();
|
||||
}
|
||||
persistPackage(em, pkgData, ++count[0], totalPackages);
|
||||
});
|
||||
}
|
||||
};
|
||||
fileListStreamSupplier.get().forEach((Con<Path>) file -> {
|
||||
if (!knownPkg.contains(file.getFileName().toString()) || ((Sup<Boolean>) () -> {
|
||||
Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult();
|
||||
return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli();
|
||||
}).get()) {
|
||||
inProgress.add(completionService.submit(() -> {
|
||||
try {
|
||||
return PkgDataImpl.parseFile(file, CompressionFormatImpl.guess(file));
|
||||
} catch (Exception ex) {
|
||||
logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex);
|
||||
throw ex;
|
||||
}
|
||||
}));
|
||||
}
|
||||
persistPackages.accept(false);
|
||||
});
|
||||
persistPackages.accept(true);
|
||||
logger.info("Removing obsolete packages");
|
||||
deleteOld(em);
|
||||
logger.info("Repository cleanup completed successfully");
|
||||
packageCache.invalidateCache();
|
||||
} finally {
|
||||
em.close();
|
||||
}
|
||||
Con<Boolean> persistPackages = (Boolean drain) -> {
|
||||
while ((drain && inProgress.size() > 0) || inProgress.size() > maxInProgress) {
|
||||
Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS))
|
||||
.ifPresent((Con<Future<PkgData>>) future -> {
|
||||
inProgress.remove(future);
|
||||
PkgData pkgData;
|
||||
try {
|
||||
pkgData = future.get();
|
||||
} catch (ExecutionException ee) {
|
||||
throw ee.getCause();
|
||||
}
|
||||
persistPackage(em, pkgData, ++count[0], totalPackages);
|
||||
});
|
||||
}
|
||||
};
|
||||
fileListStreamSupplier.get().forEach((Con<Path>) file -> {
|
||||
if (!knownPkg.contains(file.getFileName().toString()) || ((Sup<Boolean>) () -> {
|
||||
Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult();
|
||||
return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli();
|
||||
}).get()) {
|
||||
inProgress.add(completionService.submit(() -> {
|
||||
try {
|
||||
return PkgDataImpl.parseFile(file, CompressionFormatImpl.guess(file));
|
||||
} catch (Exception ex) {
|
||||
logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex);
|
||||
throw ex;
|
||||
}
|
||||
}));
|
||||
}
|
||||
persistPackages.accept(false);
|
||||
});
|
||||
persistPackages.accept(true);
|
||||
logger.info("Removing obsolete packages");
|
||||
deleteOld(em);
|
||||
logger.info("Repository cleanup completed successfully");
|
||||
packageCache.invalidateCache();
|
||||
}
|
||||
|
||||
private void persistPackage(EntityManager em, PkgData pkgData, long count, long totalPackages) {
|
||||
@@ -194,6 +190,7 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
em.remove(pkg);
|
||||
Files.delete(ctx.getFile(pkg));
|
||||
}
|
||||
|
||||
private void deleteOld(EntityManager em) {
|
||||
Instant cutoff = Instant.now().minus(365 * 3, ChronoUnit.DAYS);
|
||||
Queries.getOldPackages2Delete(em, cutoff, 3L)
|
||||
@@ -266,7 +263,7 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
public Set<String> missingFiles(Collection<String> fileNames) {
|
||||
Stream<String> result = fileNames.stream();
|
||||
Set<String> existing = Queries.getExistingFiles(em, fileNames)
|
||||
.getResultStream().collect(CollectionUtils.toUnmodifiableTreeSet());
|
||||
.getResultStream().collect(CollectionUtils.toUnmodifiableTreeSet());
|
||||
return result.filter(JWO.not(existing::contains))
|
||||
.collect(CollectionUtils.toUnmodifiableTreeSet());
|
||||
}
|
||||
@@ -281,18 +278,19 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
PkgId pkgId = tuple.get(0, PkgId.class);
|
||||
String filename = tuple.get(1, String.class);
|
||||
long size = tuple.get(2, Long.class);
|
||||
String md5sum = tuple.get(3, String.class);;
|
||||
String md5sum = tuple.get(3, String.class);
|
||||
;
|
||||
PkgTuple pkgTuple = new PkgTuple();
|
||||
pkgTuple.setFileName(filename);
|
||||
pkgTuple.setSize(size);
|
||||
pkgTuple.setMd5sum(md5sum);
|
||||
return Tuple2.newInstance(pkgId, pkgTuple);
|
||||
}).collect(
|
||||
CollectionUtils.toUnmodifiableTreeMap(
|
||||
Tuple2<PkgId, PkgTuple>::get_1,
|
||||
Tuple2<PkgId, PkgTuple>::get_2,
|
||||
PkgIdComparator.getComparator()
|
||||
)
|
||||
CollectionUtils.toUnmodifiableTreeMap(
|
||||
Tuple2<PkgId, PkgTuple>::get_1,
|
||||
Tuple2<PkgId, PkgTuple>::get_2,
|
||||
PkgIdComparator.getComparator()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -308,12 +306,12 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
List<PkgData> savedFiles = searchByFileName(fileName);
|
||||
if (savedFiles.size() > 0) return false;
|
||||
else {
|
||||
try(OutputStream output = Files.newOutputStream(file)) {
|
||||
try (OutputStream output = Files.newOutputStream(file)) {
|
||||
JWO.copy(input, output, 0x10000);
|
||||
PkgData pkg = PkgDataImpl.parseFile(file,
|
||||
CompressionFormatImpl.guess(Paths.get(fileName)));
|
||||
CompressionFormatImpl.guess(Paths.get(fileName)));
|
||||
pkg.setFileName(fileName);
|
||||
Optional.ofNullable(em.find(PkgData.class, pkg.getId())).ifPresent((Con<PkgData>) (pkgData -> {
|
||||
Optional.ofNullable(em.find(PkgData.class, pkg.getId())).ifPresent((Con<PkgData>) (pkgData -> {
|
||||
em.remove(pkgData);
|
||||
Files.delete(ctx.getRepoFolder().resolve(pkgData.getFileName()));
|
||||
}));
|
||||
@@ -324,7 +322,7 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
|
||||
return true;
|
||||
} catch (Throwable t) {
|
||||
Files.delete(file);
|
||||
throw t;
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user