improved performances of initial sync

This commit is contained in:
2024-02-11 14:05:10 +08:00
parent e3d99fa178
commit d4306555df
7 changed files with 329 additions and 124 deletions

View File

@@ -0,0 +1,192 @@
package net.woggioni.jpacrepo.service;
import jakarta.annotation.Resource;
import jakarta.ejb.Asynchronous;
import jakarta.ejb.ConcurrencyManagement;
import jakarta.ejb.ConcurrencyManagementType;
import jakarta.ejb.Lock;
import jakarta.ejb.LockType;
import jakarta.ejb.Remote;
import jakarta.ejb.Schedule;
import jakarta.ejb.Singleton;
import jakarta.ejb.TransactionManagement;
import jakarta.ejb.TransactionManagementType;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.transaction.Status;
import jakarta.transaction.UserTransaction;
import lombok.SneakyThrows;
import net.woggioni.jpacrepo.api.model.PkgData;
import net.woggioni.jpacrepo.api.service.FileSystemSynchronizer;
import net.woggioni.jpacrepo.cache.PackageCache;
import net.woggioni.jpacrepo.config.AppConfig;
import net.woggioni.jpacrepo.impl.model.CompressionFormatImpl;
import net.woggioni.jpacrepo.impl.model.PkgDataParser;
import net.woggioni.jpacrepo.service.jpa.Queries;
import net.woggioni.jwo.Con;
import net.woggioni.jwo.Sup;
import org.slf4j.Logger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Singleton
@Lock(LockType.READ)
@TransactionManagement(TransactionManagementType.BEAN)
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
@Remote({FileSystemSynchronizer.class})
public class PackageSynchronizerEJB implements FileSystemSynchronizer {
@PersistenceContext(name = "jpacrepo_pu")
private EntityManager em;
@Resource
private UserTransaction tx;
@Inject
private AppConfig ctx;
@Inject
private Logger logger;
@Resource(name = "DefaultManagedExecutorService")
private ManagedExecutorService executor;
@Inject
private PackageCache packageCache;
private void deletePkgData(EntityManager em, PkgData pkgData) {
em.remove(pkgData);
}
@Override
@SneakyThrows
@Asynchronous
@Schedule(hour = "4", minute = "00", persistent = false)
public void syncDb() {
tx.setTransactionTimeout(3600 * 12);
tx.begin();
try {
logger.info("Starting repository cleanup");
//Removes from DB the packages that have been deleted from filesystem
logger.info("Searching for packages that are no more in the filesystem");
List<String> resultList = Queries.listAllPackageFiles(em)
.getResultList();
logger.info("Got list of filenames from db");
Set<String> knownPkg = resultList.stream().filter(fileName -> {
Path file = ctx.getFile(fileName);
boolean result = Files.exists(file);
if (!result) {
logger.info("Removing package {} which was not found in filesystem", file.getFileName());
Queries.getPackageByFileName(em, file.getFileName().toString())
.getResultList()
.forEach(pkgData -> deletePkgData(em, pkgData));
}
return result;
}).collect(Collectors.toUnmodifiableSet());
logger.info("Searching for new packages or packages that were modified after being added to the database");
CompletionService<PkgData> completionService = new ExecutorCompletionService<>(executor);
final Set<Future<PkgData>> inProgress = new HashSet<>();
final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5;
Sup<Stream<Path>> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> {
String name = file.getFileName().toString();
return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz");
});
long[] count = new long[]{0};
long totalPackages = fileListStreamSupplier.get().count();
var parser = new PkgDataParser(em);
// List<PkgData> stack = new ArrayList<>();
Con<Boolean> persistPackages = (Boolean drain) -> {
while (inProgress.size() > maxInProgress || (drain && !inProgress.isEmpty())) {
Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS))
.ifPresent((Con<Future<PkgData>>) future -> {
inProgress.remove(future);
PkgData pkgData;
try {
pkgData = future.get();
} catch (ExecutionException ee) {
throw ee.getCause();
}
persistPackage(em, parser, pkgData, ++count[0], totalPackages);
// stack.add(pkgData);
// if(stack.size() >= 1000 || drain) {
// parser.addNewDependencies(parser.getNewDependencies(stack));
// parser.addNewPackagers(parser.getNewPackagers(stack));
// parser.addNewPkgBases(parser.getNewPkgBases(stack));
// parser.addNewLicenses(parser.getNewLicenses(stack));
// while(!stack.isEmpty()) {
// persistPackage(em, parser, stack.removeLast(), ++count[0], totalPackages);
// }
// }
});
}
};
fileListStreamSupplier.get().forEach((Con<Path>) file -> {
if (!knownPkg.contains(file.getFileName().toString()) || ((Sup<Boolean>) () -> {
Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult();
return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli();
}).get()) {
inProgress.add(completionService.submit(() -> {
try {
var pkgData = PkgDataParser.parseFile(file, CompressionFormatImpl.guess(file));
if (logger.isDebugEnabled()) {
logger.debug("Parsed package file {}", file);
}
return pkgData;
} catch (Exception ex) {
logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex);
throw ex;
}
}));
}
persistPackages.accept(false);
});
persistPackages.accept(true);
logger.info("Removing obsolete packages");
deleteOld(em);
tx.commit();
logger.info("Repository cleanup completed successfully");
packageCache.invalidateCache();
} catch (Exception ex) {
if(tx.getStatus() == Status.STATUS_ACTIVE) {
tx.rollback();
}
}
}
private void persistPackage(EntityManager em, PkgDataParser parser, PkgData pkgData, long count, long totalPackages) {
if (Queries.countPackagesByHash(em, pkgData.getMd5sum()).getSingleResult() == 0) {
Queries.getPkgDataIdByFileName(em, pkgData.getFileName())
.getResultStream()
.map(id -> em.find(PkgData.class, id))
.forEach(p -> deletePkgData(em, p));
em.persist(parser.hydrateJPA(pkgData));
logger.info("({}/{}) Persisting package {}", count, totalPackages, pkgData.getFileName());
}
}
private void deleteOld(EntityManager em) {
Instant cutoff = Instant.now().minus(365 * 3, ChronoUnit.DAYS);
Queries.getOldPackages2Delete(em, cutoff, 3L)
.getResultStream()
.forEach(filename -> {
PacmanServiceEJB.deletePackage(ctx, em, filename);
logger.info("Package {} has been deleted", filename);
});
}
}

View File

@@ -2,21 +2,17 @@ package net.woggioni.jpacrepo.service;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.annotation.Resource;
import jakarta.ejb.Asynchronous;
import jakarta.ejb.ConcurrencyManagement;
import jakarta.ejb.ConcurrencyManagementType;
import jakarta.ejb.Local;
import jakarta.ejb.Lock;
import jakarta.ejb.LockType;
import jakarta.ejb.Remote;
import jakarta.ejb.Schedule;
import jakarta.ejb.Singleton;
import jakarta.ejb.TransactionAttribute;
import jakarta.ejb.TransactionAttributeType;
import jakarta.ejb.TransactionManagement;
import jakarta.ejb.TransactionManagementType;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
@@ -38,30 +34,19 @@ import net.woggioni.jwo.CollectionUtils;
import net.woggioni.jwo.Con;
import net.woggioni.jwo.Hash;
import net.woggioni.jwo.JWO;
import net.woggioni.jwo.Sup;
import net.woggioni.jwo.Tuple2;
import org.slf4j.Logger;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.function.Predicate.not;
@@ -83,129 +68,27 @@ public class PacmanServiceEJB implements PacmanServiceLocal {
@Inject
private Logger logger;
@Resource(name = "DefaultManagedExecutorService")
private ManagedExecutorService executor;
@Inject
private PackageCache packageCache;
private void deletePkgData(EntityManager em, PkgData pkgData) {
em.remove(pkgData);
}
@Override
@SneakyThrows
@Asynchronous
@TransactionAttribute(TransactionAttributeType.REQUIRED)
@Schedule(hour = "4", minute = "00", persistent = false)
public void syncDB() {
logger.info("Starting repository cleanup");
//Removes from DB the packages that have been deleted from filesystem
logger.info("Searching for packages that are no more in the filesystem");
List<String> resultList = Queries.listAllPackageFiles(em)
.getResultList();
logger.info("Got list of filenames from db");
Set<String> knownPkg = resultList.stream().filter(fileName -> {
Path file = ctx.getFile(fileName);
boolean result = Files.exists(file);
if (!result) {
logger.info("Removing package {} which was not found in filesystem", file.getFileName());
Queries.getPackageByFileName(em, file.getFileName().toString())
.getResultList()
.forEach(pkgData -> deletePkgData(em, pkgData));
}
return result;
}).collect(Collectors.toUnmodifiableSet());
logger.info("Searching for new packages or packages that were modified after being added to the database");
CompletionService<PkgData> completionService = new ExecutorCompletionService<>(executor);
final Set<Future<PkgData>> inProgress = new HashSet<>();
final int maxInProgress = Runtime.getRuntime().availableProcessors() * 5;
Sup<Stream<Path>> fileListStreamSupplier = () -> Files.list(ctx.getRepoFolder()).filter((Path file) -> {
String name = file.getFileName().toString();
return name.endsWith(".pkg.tar.xz") || name.endsWith(".pkg.tar.zst") || name.endsWith(".pkg.tar.gz");
});
long[] count = new long[]{0};
long totalPackages = fileListStreamSupplier.get().count();
var parser = new PkgDataParser(em);
Con<Boolean> persistPackages = (Boolean drain) -> {
while ((drain && !inProgress.isEmpty()) || inProgress.size() > maxInProgress) {
Optional.ofNullable(completionService.poll(1, TimeUnit.SECONDS))
.ifPresent((Con<Future<PkgData>>) future -> {
inProgress.remove(future);
PkgData pkgData;
try {
pkgData = future.get();
} catch (ExecutionException ee) {
throw ee.getCause();
}
persistPackage(em, parser, pkgData, ++count[0], totalPackages);
});
}
};
fileListStreamSupplier.get().forEach((Con<Path>) file -> {
if (!knownPkg.contains(file.getFileName().toString()) || ((Sup<Boolean>) () -> {
Instant result = Queries.getUpdateTimestampByFileName(em, file.getFileName().toString()).getSingleResult();
return Files.getLastModifiedTime(file).toMillis() > result.toEpochMilli();
}).get()) {
inProgress.add(completionService.submit(() -> {
try {
var pkgData = PkgDataParser.parseFile(file, CompressionFormatImpl.guess(file));
if(logger.isDebugEnabled()) {
logger.debug("Parsed package file {}", file);
}
return pkgData;
} catch (Exception ex) {
logger.error(String.format("Error parsing '%s'", file.toAbsolutePath()), ex);
throw ex;
}
}));
}
persistPackages.accept(false);
});
persistPackages.accept(true);
logger.info("Removing obsolete packages");
deleteOld(em);
logger.info("Repository cleanup completed successfully");
packageCache.invalidateCache();
}
private void persistPackage(EntityManager em, PkgDataParser parser, PkgData pkgData, long count, long totalPackages) {
if (Queries.countPackagesByHash(em, pkgData.getMd5sum()).getSingleResult() == 0) {
Queries.getPackageByFileName(em, pkgData.getFileName())
.getResultList()
.forEach(p -> deletePkgData(em, p));
em.persist(parser.hydrateJPA(pkgData));
logger.info("({}/{}) Persisting package {}", count, totalPackages, pkgData.getFileName());
}
}
@Override
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void deletePackage(String filename) {
deletePackage(em, filename);
deletePackage(ctx, em, filename);
logger.info("Package {} has been deleted", filename);
}
@SneakyThrows
private void deletePackage(EntityManager em, String filename) {
static void deletePackage(AppConfig ctx, EntityManager em, String filename) {
List<PkgData> savedFiles = Queries.getPackageByFileName(em, filename).getResultList();
if (savedFiles.size() == 0) {
if (savedFiles.isEmpty()) {
throw JWO.newThrowable(IllegalArgumentException.class, "Package with name %s not found", filename);
}
PkgData pkg = savedFiles.get(0);
PkgData pkg = savedFiles.getFirst();
em.remove(pkg);
Files.delete(ctx.getFile(pkg));
}
private void deleteOld(EntityManager em) {
Instant cutoff = Instant.now().minus(365 * 3, ChronoUnit.DAYS);
Queries.getOldPackages2Delete(em, cutoff, 3L)
.getResultStream()
.forEach(this::deletePackage);
}
@Override
@TransactionAttribute(TransactionAttributeType.SUPPORTS)
public long countResults(String name, String version, String arch) {

View File

@@ -54,6 +54,17 @@ public class Queries {
return query;
}
public static TypedQuery<Long> getPkgDataIdByFileName(EntityManager em, String fileName) {
CriteriaBuilder cb = em.getCriteriaBuilder();
Metamodel metamodel = em.getMetamodel();
EntityType<PkgData> entity = metamodel.entity(PkgData.class);
CriteriaQuery<Long> criteriaQuery = cb.createQuery(Long.class);
Root<PkgData> root = criteriaQuery.from(entity);
Predicate predicate = cb.equal(root.get(PkgData_.fileName), fileName);
criteriaQuery.select(root.get(PkgData_.id)).where(predicate);
return em.createQuery(criteriaQuery);
}
public static TypedQuery<Instant> getUpdateTimestampByFileName(EntityManager em, String fileName) {
CriteriaBuilder cb = em.getCriteriaBuilder();
Metamodel metamodel = em.getMetamodel();

View File

@@ -24,6 +24,7 @@
<property name="jakarta.persistence.schema-generation.drop-script-source"
value="META-INF/sql/DropSchema.sql"/>
<property name="hibernate.default_schema" value="jpacrepo"/>
<property name="hibernate.show_sql" value="false"/>
</properties>
</persistence-unit>