diff --git a/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/FileSystemSynchronizer.java b/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/FileSystemSynchronizer.java new file mode 100644 index 0000000..3a64170 --- /dev/null +++ b/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/FileSystemSynchronizer.java @@ -0,0 +1,9 @@ +package net.woggioni.jpacrepo.api.service; + +import jakarta.annotation.security.PermitAll; +import jakarta.ejb.Remote; +@Remote +@PermitAll +public interface FileSystemSynchronizer { + void syncDb(); +} diff --git a/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/PacmanServiceRemote.java b/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/PacmanServiceRemote.java index 984500f..f542783 100644 --- a/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/PacmanServiceRemote.java +++ b/jpacrepo-api/src/main/java/net/woggioni/jpacrepo/api/service/PacmanServiceRemote.java @@ -21,8 +21,6 @@ import java.util.Set; @Remote @PermitAll public interface PacmanServiceRemote { - void syncDB(); - void deletePackage(String filename); List searchName(@Nonnull String name); diff --git a/jpacrepo-impl/src/main/java/net/woggioni/jpacrepo/impl/model/PkgDataParser.java b/jpacrepo-impl/src/main/java/net/woggioni/jpacrepo/impl/model/PkgDataParser.java index f968797..b8beeed 100644 --- a/jpacrepo-impl/src/main/java/net/woggioni/jpacrepo/impl/model/PkgDataParser.java +++ b/jpacrepo-impl/src/main/java/net/woggioni/jpacrepo/impl/model/PkgDataParser.java @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.text.ParseException; import java.time.Instant; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -45,6 +46,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import static net.woggioni.jwo.JWO.iterable2Stream; +import static net.woggioni.jwo.JWO.not; + @Slf4j @RequiredArgsConstructor(access = AccessLevel.PRIVATE) @@ -78,10 +82,22 @@ class NamedEntityFinder { var query = em.createQuery(tq); query.setMaxResults(1); return Optional.of(query.getResultList()) - .filter(JWO.not(List::isEmpty)) + .filter(not(List::isEmpty)) .map(it -> it.get(0)); } + public Set queryNamedEntities(Collection names) { + if(log.isDebugEnabled()) { + log.debug("Querying named entities '{}' with names {}", cls.getName(), String.join(", ", names)); + } + var cb = em.getCriteriaBuilder(); + var tq = cb.createQuery(cls); + var root = tq.from(cls); + tq.select(root).where(root.get(NamedEntity_.NAME).in(names)); + var query = em.createQuery(tq); + return query.getResultStream().collect(CollectionUtils.toUnmodifiableSet()); + } + public T getByName(String name) { return cache.computeIfAbsent( @@ -94,6 +110,13 @@ class NamedEntityFinder { }) ); } + + public void persistNewEntities(Iterable entities) { + for(T entity : entities) { + em.persist(entity); + cache.put(entity.getName(), entity); + } + } } public class PkgDataParser { @@ -127,6 +150,94 @@ public class PkgDataParser { .collect(Collectors.toUnmodifiableSet()); } + public List getNewPkgBases(Iterable pkgDatas) { + Set allPackagers = iterable2Stream(pkgDatas) + .map(PkgData::getBase).collect(Collectors.toUnmodifiableSet()); + + Set existingPackagers = pkgBaseFinder.queryNamedEntities( + allPackagers.stream() + .map(PkgBase::getName) + .collect(CollectionUtils.toUnmodifiableList()) + ); + + return allPackagers + .stream() + .filter(not(existingPackagers::contains)) + .toList(); + } + + public List getNewLicenses(Iterable pkgDatas) { + Set allLicenses = iterable2Stream(pkgDatas) + .map(PkgData::getLicense) + .flatMap(Set::stream) + .collect(Collectors.toUnmodifiableSet()); + + Set existingLicenses = licenseFinder.queryNamedEntities( + allLicenses.stream() + .map(License::getName) + .collect(CollectionUtils.toUnmodifiableList()) + ); + + return allLicenses + .stream() + .filter(not(existingLicenses::contains)) + .toList(); + } + public List getNewPackagers(Iterable pkgDatas) { + Set allPackagers = iterable2Stream(pkgDatas) + .map(PkgData::getPackager).collect(Collectors.toUnmodifiableSet()); + + Set existingPackagers = packagerFinder.queryNamedEntities( + allPackagers.stream() + .map(Packager::getName) + .collect(CollectionUtils.toUnmodifiableList()) + ); + + return allPackagers + .stream() + .filter(not(existingPackagers::contains)) + .toList(); + } + + public List getNewDependencies(Iterable pkgDatas) { + Set allDependencies = iterable2Stream(pkgDatas) + .flatMap(pkgData -> JWO.streamCat( + pkgData.getDepend().stream(), + pkgData.getMakedepend().stream(), + pkgData.getMakepkgopt().stream(), + pkgData.getOptdepend().stream(), + pkgData.getBackup().stream(), + pkgData.getConflict().stream(), + pkgData.getReplaces().stream(), + pkgData.getProvides().stream() + ) + ).collect(Collectors.toUnmodifiableSet()); + + Set existingDeps = dependencyFinder.queryNamedEntities( + allDependencies.stream() + .map(Dependency::getName) + .collect(CollectionUtils.toUnmodifiableList()) + ); + + return allDependencies + .stream() + .filter(not(existingDeps::contains)) + .toList(); + } + + public void addNewDependencies(Iterable dependencies) { + dependencyFinder.persistNewEntities(dependencies); + } + public void addNewPackagers(Iterable packagers) { + packagerFinder.persistNewEntities(packagers); + } + public void addNewPkgBases(Iterable pkgBases) { + pkgBaseFinder.persistNewEntities(pkgBases); + } + public void addNewLicenses(Iterable licenses) { + licenseFinder.persistNewEntities(licenses); + } + public PkgData hydrateJPA(PkgData pkgData) { pkgData.setBase(hydrate(pkgBaseFinder, pkgData.getBase())); pkgData.setDepend(hydrate(dependencyFinder, pkgData.getDepend())); diff --git a/src/main/java/net/woggioni/jpacrepo/service/PackageSynchronizerEJB.java b/src/main/java/net/woggioni/jpacrepo/service/PackageSynchronizerEJB.java new file mode 100644 index 0000000..952745d --- /dev/null +++ b/src/main/java/net/woggioni/jpacrepo/service/PackageSynchronizerEJB.java @@ -0,0 +1,192 @@ +package net.woggioni.jpacrepo.service; + +import jakarta.annotation.Resource; +import jakarta.ejb.Asynchronous; +import jakarta.ejb.ConcurrencyManagement; +import jakarta.ejb.ConcurrencyManagementType; +import jakarta.ejb.Lock; +import jakarta.ejb.LockType; +import jakarta.ejb.Remote; +import jakarta.ejb.Schedule; +import jakarta.ejb.Singleton; +import jakarta.ejb.TransactionManagement; +import jakarta.ejb.TransactionManagementType; +import jakarta.enterprise.concurrent.ManagedExecutorService; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import jakarta.transaction.Status; +import jakarta.transaction.UserTransaction; +import lombok.SneakyThrows; +import net.woggioni.jpacrepo.api.model.PkgData; +import net.woggioni.jpacrepo.api.service.FileSystemSynchronizer; +import net.woggioni.jpacrepo.cache.PackageCache; +import net.woggioni.jpacrepo.config.AppConfig; +import net.woggioni.jpacrepo.impl.model.CompressionFormatImpl; +import net.woggioni.jpacrepo.impl.model.PkgDataParser; +import net.woggioni.jpacrepo.service.jpa.Queries; +import net.woggioni.jwo.Con; +import net.woggioni.jwo.Sup; +import org.slf4j.Logger; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Singleton +@Lock(LockType.READ) +@TransactionManagement(TransactionManagementType.BEAN) +@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER) +@Remote({FileSystemSynchronizer.class}) +public class PackageSynchronizerEJB implements FileSystemSynchronizer { + + @PersistenceContext(name = "jpacrepo_pu") + private EntityManager em; + + @Resource + private UserTransaction tx; + + @Inject + private AppConfig ctx; + + @Inject + private Logger logger; + + @Resource(name = "DefaultManagedExecutorService") + private ManagedExecutorService executor; + + @Inject + private PackageCache packageCache; + + private void deletePkgData(EntityManager em, PkgData pkgData) { + em.remove(pkgData); + } + + @Override + @SneakyThrows + @Asynchronous + @Schedule(hour = "4", minute = "00", persistent = false) + public void syncDb() { + tx.setTransactionTimeout(3600 * 12); + tx.begin(); + try { + logger.info("Starting repository cleanup"); + //Removes from DB the packages that have been deleted from filesystem + logger.info("Searching for packages that are no more in the filesystem"); + List resultList = Queries.listAllPackageFiles(em) + .getResultList(); + logger.info("Got list of filenames from db"); + Set knownPkg = resultList.stream().filter(fileName -> { + Path file = ctx.getFile(fileName); + boolean result = Files.exists(file); + if (!result) { + logger.info("Removing package {} which was not found in filesystem", file.getFileName()); + Queries.getPackageByFileName(em, file.getFileName().toString()) + .getResultList() + .forEach(pkgData -> deletePkgData(em, pkgData)); + } + return result; + }).collect(Collectors.toUnmodifiableSet()); + logger.info("Searching for new packages or packages that were modified after being added to the database"); + CompletionService completionService = new ExecutorCompletionService<>(executor); + final Set> inProgress = new HashSet<>(); + final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5; + Sup> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> { + String name = file.getFileName().toString(); + return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz"); + }); + long[] count = new long[]{0}; + long totalPackages = fileListStreamSupplier.get().count(); + var parser = new PkgDataParser(em); +// List stack = new ArrayList<>(); + Con persistPackages = (Boolean drain) -> { + while (inProgress.size() > maxInProgress || (drain && !inProgress.isEmpty())) { + Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS)) + .ifPresent((Con>) future -> { + inProgress.remove(future); + PkgData pkgData; + try { + pkgData = future.get(); + } catch (ExecutionException ee) { + throw ee.getCause(); + } + persistPackage(em, parser, pkgData, ++count[0], totalPackages); +// stack.add(pkgData); +// if(stack.size() >= 1000 || drain) { +// parser.addNewDependencies(parser.getNewDependencies(stack)); +// parser.addNewPackagers(parser.getNewPackagers(stack)); +// parser.addNewPkgBases(parser.getNewPkgBases(stack)); +// parser.addNewLicenses(parser.getNewLicenses(stack)); +// while(!stack.isEmpty()) { +// persistPackage(em, parser, stack.removeLast(), ++count[0], totalPackages); +// } +// } + }); + } + }; + fileListStreamSupplier.get().forEach((Con) file -> { + if (!knownPkg.contains(file.getFileName().toString()) || ((Sup) () -> { + Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult(); + return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli(); + }).get()) { + inProgress.add(completionService.submit(() -> { + try { + var pkgData = PkgDataParser.parseFile(file, CompressionFormatImpl.guess(file)); + if (logger.isDebugEnabled()) { + logger.debug("Parsed package file {}", file); + } + return pkgData; + } catch (Exception ex) { + logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex); + throw ex; + } + })); + } + persistPackages.accept(false); + }); + persistPackages.accept(true); + logger.info("Removing obsolete packages"); + deleteOld(em); + tx.commit(); + logger.info("Repository cleanup completed successfully"); + packageCache.invalidateCache(); + } catch (Exception ex) { + if(tx.getStatus() == Status.STATUS_ACTIVE) { + tx.rollback(); + } + } + } + + private void persistPackage(EntityManager em, PkgDataParser parser, PkgData pkgData, long count, long totalPackages) { + if (Queries.countPackagesByHash(em, pkgData.getMd5sum()).getSingleResult() == 0) { + Queries.getPkgDataIdByFileName(em, pkgData.getFileName()) + .getResultStream() + .map(id -> em.find(PkgData.class, id)) + .forEach(p -> deletePkgData(em, p)); + em.persist(parser.hydrateJPA(pkgData)); + logger.info("({}/{}) Persisting package {}", count, totalPackages, pkgData.getFileName()); + } + } + + private void deleteOld(EntityManager em) { + Instant cutoff = Instant.now().minus(365 * 3, ChronoUnit.DAYS); + Queries.getOldPackages2Delete(em, cutoff, 3L) + .getResultStream() + .forEach(filename -> { + PacmanServiceEJB.deletePackage(ctx, em, filename); + logger.info("Package {} has been deleted", filename); + }); + } +} diff --git a/src/main/java/net/woggioni/jpacrepo/service/PacmanServiceEJB.java b/src/main/java/net/woggioni/jpacrepo/service/PacmanServiceEJB.java index b1b88cb..4648a4c 100644 --- a/src/main/java/net/woggioni/jpacrepo/service/PacmanServiceEJB.java +++ b/src/main/java/net/woggioni/jpacrepo/service/PacmanServiceEJB.java @@ -2,21 +2,17 @@ package net.woggioni.jpacrepo.service; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import jakarta.annotation.Resource; -import jakarta.ejb.Asynchronous; import jakarta.ejb.ConcurrencyManagement; import jakarta.ejb.ConcurrencyManagementType; import jakarta.ejb.Local; import jakarta.ejb.Lock; import jakarta.ejb.LockType; import jakarta.ejb.Remote; -import jakarta.ejb.Schedule; import jakarta.ejb.Singleton; import jakarta.ejb.TransactionAttribute; import jakarta.ejb.TransactionAttributeType; import jakarta.ejb.TransactionManagement; import jakarta.ejb.TransactionManagementType; -import jakarta.enterprise.concurrent.ManagedExecutorService; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; @@ -38,30 +34,19 @@ import net.woggioni.jwo.CollectionUtils; import net.woggioni.jwo.Con; import net.woggioni.jwo.Hash; import net.woggioni.jwo.JWO; -import net.woggioni.jwo.Sup; import net.woggioni.jwo.Tuple2; import org.slf4j.Logger; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.NavigableMap; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.function.Predicate.not; @@ -83,129 +68,27 @@ public class PacmanServiceEJB implements PacmanServiceLocal { @Inject private Logger logger; - @Resource(name = "DefaultManagedExecutorService") - private ManagedExecutorService executor; - @Inject private PackageCache packageCache; - private void deletePkgData(EntityManager em, PkgData pkgData) { - em.remove(pkgData); - } - - @Override - @SneakyThrows - @Asynchronous - @TransactionAttribute(TransactionAttributeType.REQUIRED) - @Schedule(hour = "4", minute = "00", persistent = false) - public void syncDB() { - logger.info("Starting repository cleanup"); - //Removes from DB the packages that have been deleted from filesystem - logger.info("Searching for packages that are no more in the filesystem"); - List resultList = Queries.listAllPackageFiles(em) - .getResultList(); - logger.info("Got list of filenames from db"); - Set knownPkg = resultList.stream().filter(fileName -> { - Path file = ctx.getFile(fileName); - boolean result = Files.exists(file); - if (!result) { - logger.info("Removing package {} which was not found in filesystem", file.getFileName()); - Queries.getPackageByFileName(em, file.getFileName().toString()) - .getResultList() - .forEach(pkgData -> deletePkgData(em, pkgData)); - } - return result; - }).collect(Collectors.toUnmodifiableSet()); - logger.info("Searching for new packages or packages that were modified after being added to the database"); - CompletionService completionService = new ExecutorCompletionService<>(executor); - final Set> inProgress = new HashSet<>(); - final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5; - Sup> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> { - String name = file.getFileName().toString(); - return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz"); - }); - long[] count = new long[]{0}; - long totalPackages = fileListStreamSupplier.get().count(); - var parser = new PkgDataParser(em); - - Con persistPackages = (Boolean drain) -> { - while ((drain && !inProgress.isEmpty()) || inProgress.size() > maxInProgress) { - Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS)) - .ifPresent((Con>) future -> { - inProgress.remove(future); - PkgData pkgData; - try { - pkgData = future.get(); - } catch (ExecutionException ee) { - throw ee.getCause(); - } - persistPackage(em, parser, pkgData, ++count[0], totalPackages); - }); - } - }; - fileListStreamSupplier.get().forEach((Con) file -> { - if (!knownPkg.contains(file.getFileName().toString()) || ((Sup) () -> { - Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult(); - return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli(); - }).get()) { - inProgress.add(completionService.submit(() -> { - try { - var pkgData = PkgDataParser.parseFile(file, CompressionFormatImpl.guess(file)); - if(logger.isDebugEnabled()) { - logger.debug("Parsed package file {}", file); - } - return pkgData; - } catch (Exception ex) { - logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex); - throw ex; - } - })); - } - persistPackages.accept(false); - }); - persistPackages.accept(true); - logger.info("Removing obsolete packages"); - deleteOld(em); - logger.info("Repository cleanup completed successfully"); - packageCache.invalidateCache(); - } - - private void persistPackage(EntityManager em, PkgDataParser parser, PkgData pkgData, long count, long totalPackages) { - if (Queries.countPackagesByHash(em, pkgData.getMd5sum()).getSingleResult() == 0) { - Queries.getPackageByFileName(em, pkgData.getFileName()) - .getResultList() - .forEach(p -> deletePkgData(em, p)); - em.persist(parser.hydrateJPA(pkgData)); - logger.info("({}/{}) Persisting package {}", count, totalPackages, pkgData.getFileName()); - } - } - - @Override @TransactionAttribute(TransactionAttributeType.REQUIRED) public void deletePackage(String filename) { - deletePackage(em, filename); + deletePackage(ctx, em, filename); logger.info("Package {} has been deleted", filename); } @SneakyThrows - private void deletePackage(EntityManager em, String filename) { + static void deletePackage(AppConfig ctx, EntityManager em, String filename) { List savedFiles = Queries.getPackageByFileName(em, filename).getResultList(); - if (savedFiles.size() == 0) { + if (savedFiles.isEmpty()) { throw JWO.newThrowable(IllegalArgumentException.class, "Package with name %s not found", filename); } - PkgData pkg = savedFiles.get(0); + PkgData pkg = savedFiles.getFirst(); em.remove(pkg); Files.delete(ctx.getFile(pkg)); } - private void deleteOld(EntityManager em) { - Instant cutoff = Instant.now().minus(365 * 3, ChronoUnit.DAYS); - Queries.getOldPackages2Delete(em, cutoff, 3L) - .getResultStream() - .forEach(this::deletePackage); - } - @Override @TransactionAttribute(TransactionAttributeType.SUPPORTS) public long countResults(String name, String version, String arch) { diff --git a/src/main/java/net/woggioni/jpacrepo/service/jpa/Queries.java b/src/main/java/net/woggioni/jpacrepo/service/jpa/Queries.java index d29e8fe..2b19338 100644 --- a/src/main/java/net/woggioni/jpacrepo/service/jpa/Queries.java +++ b/src/main/java/net/woggioni/jpacrepo/service/jpa/Queries.java @@ -54,6 +54,17 @@ public class Queries { return query; } + public static TypedQuery getPkgDataIdByFileName(EntityManager em, String fileName) { + CriteriaBuilder cb = em.getCriteriaBuilder(); + Metamodel metamodel = em.getMetamodel(); + EntityType entity = metamodel.entity(PkgData.class); + CriteriaQuery criteriaQuery = cb.createQuery(Long.class); + Root root = criteriaQuery.from(entity); + Predicate predicate = cb.equal(root.get(PkgData_.fileName), fileName); + criteriaQuery.select(root.get(PkgData_.id)).where(predicate); + return em.createQuery(criteriaQuery); + } + public static TypedQuery getUpdateTimestampByFileName(EntityManager em, String fileName) { CriteriaBuilder cb = em.getCriteriaBuilder(); Metamodel metamodel = em.getMetamodel(); diff --git a/src/main/resources/META-INF/persistence.xml b/src/main/resources/META-INF/persistence.xml index cac01cb..7831e98 100644 --- a/src/main/resources/META-INF/persistence.xml +++ b/src/main/resources/META-INF/persistence.xml @@ -24,6 +24,7 @@ +