introduced multithreading in initial database creation

This commit is contained in:
2015-04-12 11:46:24 +02:00
parent 8a867831fb
commit db786b73c9
10 changed files with 171 additions and 90 deletions

View File

@@ -25,6 +25,7 @@ dependencies {
compile 'org.tukaani:xz:1.5'
compile 'javax:javaee-api:7.0'
compile 'commons-io:commons-io:2.4'
compile 'commons-codec:commons-codec:1.10'
testCompile 'com.thoughtworks.xstream:xstream:1.4.8'
testCompile 'org.jboss.resteasy:resteasy-jaxrs:3.0.11.Final'
testCompile 'org.jboss.resteasy:resteasy-client:3.0.11.Final'

View File

@@ -50,7 +50,6 @@
<orderEntry type="library" scope="TEST" name="Gradle: org.codehaus.jackson:jackson-xc:1.9.12" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: org.apache.httpcomponents:httpcore:4.2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: commons-logging:commons-logging:1.1.1" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: commons-codec:commons-codec:1.6" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: org.jboss.resteasy:resteasy-jaxb-provider:3.0.11.Final" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
@@ -58,5 +57,6 @@
<orderEntry type="library" scope="TEST" name="Gradle: javax.xml.bind:jaxb-api:2.2.7" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: com.sun.istack:istack-commons-runtime:2.16" level="project" />
<orderEntry type="library" scope="TEST" name="Gradle: javax.xml.bind:jsr173_api:1.0" level="project" />
<orderEntry type="library" exported="" name="Gradle: commons-codec:commons-codec:1.10" level="project" />
</component>
</module>

View File

@@ -69,4 +69,14 @@ public class PkgData
@ElementCollection(fetch = FetchType.EAGER)
public List<String> backup;
@Temporal(TemporalType.TIMESTAMP)
public Date updTimestamp;
@PreUpdate
@PrePersist
private void writeTimestamp()
{
updTimestamp = new Date();
}
}

View File

@@ -1,19 +1,11 @@
package org.jpacrepo.pacbase;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.jpacrepo.model.PkgData;
import org.jpacrepo.model.PkgName;
import org.apache.commons.codec.binary.Hex;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
/**
* Created by walter on 05/04/15.
@@ -27,8 +19,7 @@ public class Hasher
try
{
md5 = MessageDigest.getInstance("MD5");
}
catch (NoSuchAlgorithmException e)
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
@@ -37,11 +28,11 @@ public class Hasher
public static String computeMD5(InputStream is) throws IOException
{
md5.reset();
while(is.available()>0)
byte[] buffer2 = new byte[1000000];
while (is.available() > 0)
{
byte[] buffer2 = new byte[1000000];
is.read(buffer2, 0, 1000000);
md5.update(buffer2);
int read = is.read(buffer2, 0, 1000000);
md5.update(buffer2,0,read);
}
is.close();
return bytesToHex(md5.digest());
@@ -49,9 +40,11 @@ public class Hasher
final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
public static String bytesToHex(byte[] bytes) {
public static String bytesToHex(byte[] bytes)
{
char[] hexChars = new char[bytes.length * 2];
for ( int j = 0; j < bytes.length; j++ ) {
for (int j = 0; j < bytes.length; j++)
{
int v = bytes[j] & 0xFF;
hexChars[j * 2] = hexArray[v >>> 4];
hexChars[j * 2 + 1] = hexArray[v & 0x0F];

View File

@@ -1,18 +1,14 @@
package org.jpacrepo.pacbase;
import org.jpacrepo.model.PkgData;
import org.jpacrepo.model.PkgName;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.jpacrepo.model.PkgData;
import org.jpacrepo.model.PkgName;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
/**

View File

@@ -9,6 +9,6 @@ import javax.ejb.Remote;
@Remote
public interface PacmanService
{
public void syncDB();
public void deletePackage(String filename) throws Exception;
}

View File

@@ -1,12 +1,12 @@
package org.jpacrepo.service;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.jpacrepo.context.ApplicationContext;
import org.jpacrepo.context.DefaultConfiguration;
import org.jpacrepo.model.PkgData;
import org.jpacrepo.model.PkgName;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.jpacrepo.pacbase.Parser;
import javax.annotation.PostConstruct;
@@ -15,16 +15,17 @@ import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.inject.Inject;
import javax.naming.InitialContext;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.TypedQuery;
import javax.transaction.*;
import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.io.IOException;
import java.nio.file.Files;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,16 +41,24 @@ public class PacmanServiceEJB implements PacmanService
@DefaultConfiguration
private ApplicationContext ctx;
@Resource
private UserTransaction ut;
@Resource(lookup = "java:jboss/ee/concurrency/factory/default")
private ManagedThreadFactory mtf;
private Logger logger = Logger.getLogger(PacmanServiceEJB.class.getName());
private String nameQuery = "SELECT pname FROM PkgName pname WHERE id = :name";
private String fileQuery = "SELECT pdata FROM PkgData pdata WHERE name.id = :name AND version = :version AND arch = :arch";
private String hashQuery = "SELECT pdata FROM PkgData pdata WHERE md5sum = :md5sum";
private Map<String, PkgData> knownPkg;
@PostConstruct
public void syncDB()
{
Set<String> knownPkg = new HashSet<>();
knownPkg = new HashMap<>();
//Elimina i pacchetti sul DB che non esistono più nel filesystem
try
{
@@ -58,7 +67,7 @@ public class PacmanServiceEJB implements PacmanService
List<PkgData> listaDB = em.createQuery("SELECT p FROM PkgData p", PkgData.class).getResultList();
for (PkgData p : listaDB)
{
knownPkg.add(p.fileName);
knownPkg.put(p.fileName, p);
File file = ctx.getFile(p);
if (!file.exists())
{
@@ -73,56 +82,54 @@ public class PacmanServiceEJB implements PacmanService
}
//Aggiunge sul DB i pacchetti presenti nel filesystem
int cores = Runtime.getRuntime().availableProcessors();
Collection<File> ls = FileUtils.listFiles(new File(ctx.getSystemProperties().getProperty("RepoFolder")), new RegexFileFilter(".*\\.pkg\\.tar\\.xz"), DirectoryFileFilter.DIRECTORY);
File f = null;
try
Deque<File> stack = new LinkedList<>(ls);
Thread[] works = new Thread[cores];
for(int i=0; i<cores; i++)
{
for (File file : ls)
{
f = file;
if (!knownPkg.contains(file.getAbsolutePath()))
{
ut.begin();
parseFile(file);
ut.commit();
}
}
works[i] = mtf.newThread(new SyncWorker(stack));
}
catch (Exception e)
for(Thread t : works)
{
t.start();
}
for(Thread t : works)
{
logger.log(Level.ALL, String.format("Error parsing %s", f.getAbsolutePath()));
try
{
ut.rollback();
t.join();
}
catch (SystemException e1)
catch (InterruptedException e)
{
throw new RuntimeException(e1);
e.printStackTrace();
}
throw new RuntimeException(e);
}
}
@Resource
private UserTransaction ut;
private void parseFile(File file) throws Exception
{
TypedQuery<PkgData> fquery = em.createQuery(fileQuery, PkgData.class);
TypedQuery<PkgName> nquery = em.createQuery(nameQuery, PkgName.class);
TypedQuery<PkgData> hquery = em.createQuery(hashQuery, PkgData.class);
PkgData data = Parser.parseFile(file);
fquery.setParameter("name", data.name.id);
fquery.setParameter("version", data.version);
fquery.setParameter("arch", data.arch);
List<PkgData> savedFiles = fquery.getResultList();
hquery.setParameter("md5sum", data.md5sum);
List<PkgData> savedFiles = hquery.getResultList();
if (savedFiles.size() > 0)
{
return;
}
else
{
TypedQuery<PkgData> fquery = em.createQuery("SELECT p FROM PkgData p WHERE fileName = :fileName", PkgData.class);
fquery.setParameter("fileName", file.getName());
savedFiles = fquery.getResultList();
if(savedFiles.size()>0)
{
em.remove(savedFiles.get(0));
}
}
nquery.setParameter("name", data.name.id);
List<PkgName> savedName = nquery.getResultList();
@@ -134,4 +141,76 @@ public class PacmanServiceEJB implements PacmanService
logger.log(Level.INFO, String.format("Persisting package %s", file.getName()));
}
@Override
public void deletePackage(String filename) throws Exception
{
ut.begin();
TypedQuery<PkgData> fquery = em.createQuery("SELECT p FROM PkgData p WHERE fileName = :fileName", PkgData.class);
fquery.setParameter("fileName", filename);
List<PkgData> savedFiles = fquery.getResultList();
if(savedFiles.size()==0)
{
ut.rollback();
throw new RuntimeException(String.format("Package with name %s not found", filename));
}
PkgData pkg = fquery.getResultList().get(0);
try
{
Files.delete(ctx.getFile(pkg).toPath());
}
catch (IOException e)
{
ut.rollback();
throw new RuntimeException(e);
}
em.remove(pkg);
ut.commit();
}
private class SyncWorker implements Runnable
{
Deque<File> ls;
public SyncWorker(Deque<File> ls)
{
this.ls = ls;
}
@Override
public void run()
{
while(ls.size() > 0)
{
File file;
synchronized (ls)
{
file = ls.pop();
}
try
{
PkgData p = knownPkg.get(file.getName());
if (p == null || file.lastModified() > p.updTimestamp.getTime())
{
ut.begin();
parseFile(file);
ut.commit();
}
}
catch (Exception e)
{
logger.log(Level.INFO, String.format("Error parsing %s", file.getAbsolutePath()));
try
{
ut.rollback();
}
catch (SystemException e1)
{
throw new RuntimeException(e1);
}
throw new RuntimeException(e);
}
}
}
}
}

View File

@@ -120,8 +120,8 @@ public class PacmanWebService
FileInputStream input = new FileInputStream(ctx.getFile(pkg));
try
{
int bytes;
while ((bytes = input.read()) != -1)
byte[] bytes = new byte[128000];
while ((input.read(bytes)) != -1)
{
output.write(bytes);
}
@@ -161,15 +161,21 @@ public class PacmanWebService
throw new BadRequestException();
File file = new File(ctx.getRepoFolder(), filename);
FileOutputStream fos = new FileOutputStream(file);
IOUtils.copy(input, fos);
fos.close();
FileInputStream fis = new FileInputStream(file);
String hash = Hasher.computeMD5(fis);
TypedQuery<PkgData> hquery = em.createQuery(hashQuery, PkgData.class);
hquery.setParameter("md5sum", hash);
try
{
FileOutputStream fos = new FileOutputStream(file);
IOUtils.copy(input, fos);
fos.close();
}
catch (Exception e)
{
Files.delete(file.toPath());
throw e;
}
List<PkgData> savedFiles = hquery.getResultList();
TypedQuery<PkgData> fquery = em.createQuery(fileNameQuery, PkgData.class);
fquery.setParameter("fileName", filename);
List<PkgData> savedFiles = fquery.getResultList();
if (savedFiles.size() > 0)
{
@@ -179,6 +185,11 @@ public class PacmanWebService
else
{
PkgData pkg = Parser.parseFile(file);
TypedQuery<PkgData> hquery = em.createQuery(hashQuery, PkgData.class);
hquery.setParameter("md5sum", pkg.md5sum);
savedFiles = hquery.getResultList();
TypedQuery<PkgName> nquery = em.createQuery(nameQuery, PkgName.class);
nquery.setParameter("name", pkg.name.id);
List<PkgName> savedName = nquery.getResultList();
@@ -186,16 +197,6 @@ public class PacmanWebService
{
pkg.name = savedName.get(0);
}
File destinationFile = new File(ctx.getSystemProperties().getProperty("RepoFolder"),filename);
try
{
Files.move(file.toPath(), destinationFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception e)
{
file.delete();
throw e;
}
em.persist(pkg);
log.log(Level.INFO, String.format("Persisiting package %s", pkg.fileName));

View File

@@ -7,8 +7,8 @@
<persistence-unit name="jpacrepo_pu" transaction-type="JTA">
<jta-data-source>java:/ejb/postgres/wildfly</jta-data-source>
<properties>
<!--<property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/>-->
<property name="javax.persistence.schema-generation.database.action" value="none"/>
<property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/>
<!--<property name="javax.persistence.schema-generation.database.action" value="none"/>-->
<!--<property name="javax.persistence.schema-generation.database.action" value="create"/>-->
<property name="eclipselink.logging.level" value="INFO"/>
<property name="hibernate.default_schema" value="jpacrepo"/>

View File

@@ -35,7 +35,7 @@ public class ParseTest
}
@Test
public void invokeStatelessBean() throws NamingException, IOException
public void invokeStatelessBean() throws Exception
{
// Let's lookup the remote stateless calculator
@@ -45,7 +45,8 @@ public class ParseTest
prop.put(Context.URL_PKG_PREFIXES, "org.jboss.ejb.client.naming");
prop.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
prop.put(Context.PROVIDER_URL, "http-remoting://odroid-u3:8080");
prop.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
// prop.put(Context.PROVIDER_URL, "http-remoting://odroid-u3:8080");
// prop.put(Context.PROVIDER_URL, "remote://odroid-u3:4447");
prop.put(Context.SECURITY_PRINCIPAL, "jpacrepo");
prop.put(Context.SECURITY_CREDENTIALS, "password01.");
@@ -54,8 +55,8 @@ public class ParseTest
Context ctx = new InitialContext(prop);
traverseJndiNode("/", context);
// final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/remote/PacmanServiceEJB!service.PacmanService");
final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/PacmanServiceEJB!service.PacmanService");
stateService.syncDB();
final PacmanService stateService = (PacmanService) ctx.lookup("/jpacrepo-1.0/PacmanServiceEJB!org.jpacrepo.service.PacmanService");
stateService.deletePackage("linux-3.19.3-3-x86_64.pkg.tar.xz");
}
private static void traverseJndiNode(String nodeName, Context context) {