From db786b73c951aa372072dad6bb2c43a7f65d54b9 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Sun, 12 Apr 2015 11:46:24 +0200 Subject: [PATCH] introduced multithreading in initial database creation --- build.gradle | 1 + jpacrepo.iml | 2 +- src/main/java/org/jpacrepo/model/PkgData.java | 10 ++ .../java/org/jpacrepo/pacbase/Hasher.java | 27 ++- .../java/org/jpacrepo/pacbase/Parser.java | 8 +- .../org/jpacrepo/service/PacmanService.java | 2 +- .../jpacrepo/service/PacmanServiceEJB.java | 157 +++++++++++++----- .../jpacrepo/service/PacmanWebService.java | 41 ++--- src/main/resources/META-INF/persistence.xml | 4 +- src/test/java/ParseTest.java | 9 +- 10 files changed, 171 insertions(+), 90 deletions(-) diff --git a/build.gradle b/build.gradle index d3bf562..04e0142 100644 --- a/build.gradle +++ b/build.gradle @@ -25,6 +25,7 @@ dependencies { compile 'org.tukaani:xz:1.5' compile 'javax:javaee-api:7.0' compile 'commons-io:commons-io:2.4' + compile 'commons-codec:commons-codec:1.10' testCompile 'com.thoughtworks.xstream:xstream:1.4.8' testCompile 'org.jboss.resteasy:resteasy-jaxrs:3.0.11.Final' testCompile 'org.jboss.resteasy:resteasy-client:3.0.11.Final' diff --git a/jpacrepo.iml b/jpacrepo.iml index f322343..22f3292 100644 --- a/jpacrepo.iml +++ b/jpacrepo.iml @@ -50,7 +50,6 @@ - @@ -58,5 +57,6 @@ + \ No newline at end of file diff --git a/src/main/java/org/jpacrepo/model/PkgData.java b/src/main/java/org/jpacrepo/model/PkgData.java index fe53135..7a28870 100644 --- a/src/main/java/org/jpacrepo/model/PkgData.java +++ b/src/main/java/org/jpacrepo/model/PkgData.java @@ -69,4 +69,14 @@ public class PkgData @ElementCollection(fetch = FetchType.EAGER) public List backup; + + @Temporal(TemporalType.TIMESTAMP) + public Date updTimestamp; + + @PreUpdate + @PrePersist + private void writeTimestamp() + { + updTimestamp = new Date(); + } } diff --git a/src/main/java/org/jpacrepo/pacbase/Hasher.java b/src/main/java/org/jpacrepo/pacbase/Hasher.java index 8cd85d3..ce9c800 100644 --- a/src/main/java/org/jpacrepo/pacbase/Hasher.java +++ b/src/main/java/org/jpacrepo/pacbase/Hasher.java @@ -1,19 +1,11 @@ package org.jpacrepo.pacbase; -import org.apache.commons.compress.archivers.ArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; -import org.jpacrepo.model.PkgData; -import org.jpacrepo.model.PkgName; +import org.apache.commons.codec.binary.Hex; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.*; /** * Created by walter on 05/04/15. @@ -27,8 +19,7 @@ public class Hasher try { md5 = MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) + } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } @@ -37,11 +28,11 @@ public class Hasher public static String computeMD5(InputStream is) throws IOException { md5.reset(); - while(is.available()>0) + byte[] buffer2 = new byte[1000000]; + while (is.available() > 0) { - byte[] buffer2 = new byte[1000000]; - is.read(buffer2, 0, 1000000); - md5.update(buffer2); + int read = is.read(buffer2, 0, 1000000); + md5.update(buffer2,0,read); } is.close(); return bytesToHex(md5.digest()); @@ -49,9 +40,11 @@ public class Hasher final protected static char[] hexArray = "0123456789ABCDEF".toCharArray(); - public static String bytesToHex(byte[] bytes) { + public static String bytesToHex(byte[] bytes) + { char[] hexChars = new char[bytes.length * 2]; - for ( int j = 0; j < bytes.length; j++ ) { + for (int j = 0; j < bytes.length; j++) + { int v = bytes[j] & 0xFF; hexChars[j * 2] = hexArray[v >>> 4]; hexChars[j * 2 + 1] = hexArray[v & 0x0F]; diff --git a/src/main/java/org/jpacrepo/pacbase/Parser.java b/src/main/java/org/jpacrepo/pacbase/Parser.java index 3d6e524..d084682 100644 --- a/src/main/java/org/jpacrepo/pacbase/Parser.java +++ b/src/main/java/org/jpacrepo/pacbase/Parser.java @@ -1,18 +1,14 @@ package org.jpacrepo.pacbase; -import org.jpacrepo.model.PkgData; -import org.jpacrepo.model.PkgName; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; +import org.jpacrepo.model.PkgData; +import org.jpacrepo.model.PkgName; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; import java.nio.charset.Charset; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.*; /** diff --git a/src/main/java/org/jpacrepo/service/PacmanService.java b/src/main/java/org/jpacrepo/service/PacmanService.java index 287f13b..ac589ed 100644 --- a/src/main/java/org/jpacrepo/service/PacmanService.java +++ b/src/main/java/org/jpacrepo/service/PacmanService.java @@ -9,6 +9,6 @@ import javax.ejb.Remote; @Remote public interface PacmanService { - public void syncDB(); + public void deletePackage(String filename) throws Exception; } diff --git a/src/main/java/org/jpacrepo/service/PacmanServiceEJB.java b/src/main/java/org/jpacrepo/service/PacmanServiceEJB.java index 9d648c4..037ff56 100644 --- a/src/main/java/org/jpacrepo/service/PacmanServiceEJB.java +++ b/src/main/java/org/jpacrepo/service/PacmanServiceEJB.java @@ -1,12 +1,12 @@ package org.jpacrepo.service; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.RegexFileFilter; import org.jpacrepo.context.ApplicationContext; import org.jpacrepo.context.DefaultConfiguration; import org.jpacrepo.model.PkgData; import org.jpacrepo.model.PkgName; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.filefilter.DirectoryFileFilter; -import org.apache.commons.io.filefilter.RegexFileFilter; import org.jpacrepo.pacbase.Parser; import javax.annotation.PostConstruct; @@ -15,16 +15,17 @@ import javax.ejb.Singleton; import javax.ejb.Startup; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; +import javax.enterprise.concurrent.ManagedThreadFactory; import javax.inject.Inject; +import javax.naming.InitialContext; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.persistence.TypedQuery; import javax.transaction.*; import java.io.File; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,16 +41,24 @@ public class PacmanServiceEJB implements PacmanService @DefaultConfiguration private ApplicationContext ctx; + @Resource + private UserTransaction ut; + + @Resource(lookup = "java:jboss/ee/concurrency/factory/default") + private ManagedThreadFactory mtf; + private Logger logger = Logger.getLogger(PacmanServiceEJB.class.getName()); private String nameQuery = "SELECT pname FROM PkgName pname WHERE id = :name"; - private String fileQuery = "SELECT pdata FROM PkgData pdata WHERE name.id = :name AND version = :version AND arch = :arch"; + private String hashQuery = "SELECT pdata FROM PkgData pdata WHERE md5sum = :md5sum"; + + private Map knownPkg; @PostConstruct public void syncDB() { - Set knownPkg = new HashSet<>(); + knownPkg = new HashMap<>(); //Elimina i pacchetti sul DB che non esistono più nel filesystem try { @@ -58,7 +67,7 @@ public class PacmanServiceEJB implements PacmanService List listaDB = em.createQuery("SELECT p FROM PkgData p", PkgData.class).getResultList(); for (PkgData p : listaDB) { - knownPkg.add(p.fileName); + knownPkg.put(p.fileName, p); File file = ctx.getFile(p); if (!file.exists()) { @@ -73,56 +82,54 @@ public class PacmanServiceEJB implements PacmanService } //Aggiunge sul DB i pacchetti presenti nel filesystem + int cores = Runtime.getRuntime().availableProcessors(); + Collection ls = FileUtils.listFiles(new File(ctx.getSystemProperties().getProperty("RepoFolder")), new RegexFileFilter(".*\\.pkg\\.tar\\.xz"), DirectoryFileFilter.DIRECTORY); - File f = null; - try + Deque stack = new LinkedList<>(ls); + Thread[] works = new Thread[cores]; + for(int i=0; i fquery = em.createQuery(fileQuery, PkgData.class); TypedQuery nquery = em.createQuery(nameQuery, PkgName.class); + TypedQuery hquery = em.createQuery(hashQuery, PkgData.class); PkgData data = Parser.parseFile(file); - - fquery.setParameter("name", data.name.id); - fquery.setParameter("version", data.version); - fquery.setParameter("arch", data.arch); - - List savedFiles = fquery.getResultList(); - + hquery.setParameter("md5sum", data.md5sum); + List savedFiles = hquery.getResultList(); if (savedFiles.size() > 0) { return; } + else + { + TypedQuery fquery = em.createQuery("SELECT p FROM PkgData p WHERE fileName = :fileName", PkgData.class); + fquery.setParameter("fileName", file.getName()); + savedFiles = fquery.getResultList(); + if(savedFiles.size()>0) + { + em.remove(savedFiles.get(0)); + } + } nquery.setParameter("name", data.name.id); List savedName = nquery.getResultList(); @@ -134,4 +141,76 @@ public class PacmanServiceEJB implements PacmanService logger.log(Level.INFO, String.format("Persisting package %s", file.getName())); } + @Override + public void deletePackage(String filename) throws Exception + { + ut.begin(); + TypedQuery fquery = em.createQuery("SELECT p FROM PkgData p WHERE fileName = :fileName", PkgData.class); + fquery.setParameter("fileName", filename); + List savedFiles = fquery.getResultList(); + if(savedFiles.size()==0) + { + ut.rollback(); + throw new RuntimeException(String.format("Package with name %s not found", filename)); + } + PkgData pkg = fquery.getResultList().get(0); + try + { + Files.delete(ctx.getFile(pkg).toPath()); + } + catch (IOException e) + { + ut.rollback(); + throw new RuntimeException(e); + } + + em.remove(pkg); + ut.commit(); + } + + private class SyncWorker implements Runnable + { + Deque ls; + + public SyncWorker(Deque ls) + { + this.ls = ls; + } + + @Override + public void run() + { + while(ls.size() > 0) + { + File file; + synchronized (ls) + { + file = ls.pop(); + } + try + { + PkgData p = knownPkg.get(file.getName()); + if (p == null || file.lastModified() > p.updTimestamp.getTime()) + { + ut.begin(); + parseFile(file); + ut.commit(); + } + } + catch (Exception e) + { + logger.log(Level.INFO, String.format("Error parsing %s", file.getAbsolutePath())); + try + { + ut.rollback(); + } + catch (SystemException e1) + { + throw new RuntimeException(e1); + } + throw new RuntimeException(e); + } + } + } + } } \ No newline at end of file diff --git a/src/main/java/org/jpacrepo/service/PacmanWebService.java b/src/main/java/org/jpacrepo/service/PacmanWebService.java index f500768..67be12e 100644 --- a/src/main/java/org/jpacrepo/service/PacmanWebService.java +++ b/src/main/java/org/jpacrepo/service/PacmanWebService.java @@ -120,8 +120,8 @@ public class PacmanWebService FileInputStream input = new FileInputStream(ctx.getFile(pkg)); try { - int bytes; - while ((bytes = input.read()) != -1) + byte[] bytes = new byte[128000]; + while ((input.read(bytes)) != -1) { output.write(bytes); } @@ -161,15 +161,21 @@ public class PacmanWebService throw new BadRequestException(); File file = new File(ctx.getRepoFolder(), filename); - FileOutputStream fos = new FileOutputStream(file); - IOUtils.copy(input, fos); - fos.close(); - FileInputStream fis = new FileInputStream(file); - String hash = Hasher.computeMD5(fis); - TypedQuery hquery = em.createQuery(hashQuery, PkgData.class); - hquery.setParameter("md5sum", hash); + try + { + FileOutputStream fos = new FileOutputStream(file); + IOUtils.copy(input, fos); + fos.close(); + } + catch (Exception e) + { + Files.delete(file.toPath()); + throw e; + } - List savedFiles = hquery.getResultList(); + TypedQuery fquery = em.createQuery(fileNameQuery, PkgData.class); + fquery.setParameter("fileName", filename); + List savedFiles = fquery.getResultList(); if (savedFiles.size() > 0) { @@ -179,6 +185,11 @@ public class PacmanWebService else { PkgData pkg = Parser.parseFile(file); + + TypedQuery hquery = em.createQuery(hashQuery, PkgData.class); + hquery.setParameter("md5sum", pkg.md5sum); + savedFiles = hquery.getResultList(); + TypedQuery nquery = em.createQuery(nameQuery, PkgName.class); nquery.setParameter("name", pkg.name.id); List savedName = nquery.getResultList(); @@ -186,16 +197,6 @@ public class PacmanWebService { pkg.name = savedName.get(0); } - File destinationFile = new File(ctx.getSystemProperties().getProperty("RepoFolder"),filename); - try - { - Files.move(file.toPath(), destinationFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - } - catch (Exception e) - { - file.delete(); - throw e; - } em.persist(pkg); log.log(Level.INFO, String.format("Persisiting package %s", pkg.fileName)); diff --git a/src/main/resources/META-INF/persistence.xml b/src/main/resources/META-INF/persistence.xml index f5aae31..33c0a11 100644 --- a/src/main/resources/META-INF/persistence.xml +++ b/src/main/resources/META-INF/persistence.xml @@ -7,8 +7,8 @@ java:/ejb/postgres/wildfly - - + + diff --git a/src/test/java/ParseTest.java b/src/test/java/ParseTest.java index cb92b6c..84efa3a 100644 --- a/src/test/java/ParseTest.java +++ b/src/test/java/ParseTest.java @@ -35,7 +35,7 @@ public class ParseTest } @Test - public void invokeStatelessBean() throws NamingException, IOException + public void invokeStatelessBean() throws Exception { // Let's lookup the remote stateless calculator @@ -45,7 +45,8 @@ public class ParseTest prop.put(Context.URL_PKG_PREFIXES, "org.jboss.ejb.client.naming"); prop.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); - prop.put(Context.PROVIDER_URL, "http-remoting://odroid-u3:8080"); + prop.put(Context.PROVIDER_URL, "http-remoting://localhost:8080"); +// prop.put(Context.PROVIDER_URL, "http-remoting://odroid-u3:8080"); // prop.put(Context.PROVIDER_URL, "remote://odroid-u3:4447"); prop.put(Context.SECURITY_PRINCIPAL, "jpacrepo"); prop.put(Context.SECURITY_CREDENTIALS, "password01."); @@ -54,8 +55,8 @@ public class ParseTest Context ctx = new InitialContext(prop); traverseJndiNode("/", context); // final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/remote/PacmanServiceEJB!service.PacmanService"); - final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/PacmanServiceEJB!service.PacmanService"); - stateService.syncDB(); + final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/PacmanServiceEJB!org.jpacrepo.service.PacmanService"); + stateService.deletePackage("linux-3.19.3-3-x86_64.pkg.tar.xz"); } private static void traverseJndiNode(String nodeName, Context context) {