Подсчет файлов Apache Camel
Есть вот такой маршрут. Необходимо при обработке каждого 100 файла выводить лог. Не могу понять как организовать подсчет файлов. Файлы с разными расширениями и нужно выводить сколько какого обработалось + еще время обработки этой сотни файлов...
public void configure() {
from("file:data?noop=true")
.choice()
.when().simple("${file:name} endsWith 'xml'")
.log("Xml файл обрабатывается процессором")
.process(xmlFileProcessor)
.log("Xml файл отправляется в очередь")
.to("activemq:test")
.when().simple("${file:name} endsWith 'txt'")
.log("Txt файл обрабатывается процессором")
.process(txtFileProcessor)
.log("Txt файл отправляется в очередь")
.to("activemq:test")
.endChoice()
.otherwise()
.log("Файл с каким-то расширением обрабатывается процессором")
.process(otherwiseFileProcessor)
.log("Файл с каким-то расширением отправляется в очередь invalide")
.to("activemq:invalid-queue")
.log("Выбрасывается исключение")
.throwException(new BadExpansion("Неверное расширение"))
.when(PredicateBuilder.constant((countXml + countTxt + countOther) % 100 == 0))
.log("Количество файлов 100")
.log("Количество файлов txt " + countTxt + "\n" +
"Количество файлов xml " + countXml + "\n" +
"Количество остальных файлов " + countOther + "\n" +
"Время обработки сообщений " )
.end();
Со временем вообще не разобрался как сделать можно, а вот с подсчетом файлов была такая идея, реализовать классы процессы для каждого расширения типа вот этого
public class XmlFileProcessor implements Processor {
public static long countXmlFile = 0;
@Override
public void process(Exchange exchange) {
++countXmlFile;
}
}
Но он инкрементируется только в пределах класса.
Ответы (1 шт):
Автор решения: anton_63
→ Ссылка
Оказалось все куда проще. Нужно было сделать один класс процессор (а не плодить для каждого расширения свой) и в нем уже проводить все действия со всеми расширениями. Примерно так
public class ExtensionProcessor implements Processor {
private static Logger logger = LoggerFactory.getLogger(Main.class);
private static final long BATCH_FILES = 100;
private static final String EXT_TXT = "txt";
private static final String EXT_XML = "xml";
private static final String EXT_ANOTHER = "ext_another";
private static Map<String, Long> countByTypeFiles;
private static long startTimeHandlingBatchFiles = 0;
private static long countFiles = 0;
private static boolean isStartHandlingBatchFiles = true;
static {
countByTypeFiles = new HashMap<>();
countByTypeFiles.put(EXT_TXT, 0L);
countByTypeFiles.put(EXT_XML, 0L);
countByTypeFiles.put(EXT_ANOTHER, 0L);
}
@Override
public void process(Exchange exchange) throws Exception {
if(isStartHandlingBatchFiles) {
startTimeHandlingBatchFiles = System.currentTimeMillis();
isStartHandlingBatchFiles = false;
}
handlingMessage(exchange);
countFiles++;
if (countFiles % BATCH_FILES == 0){
logger.info("------------------------------------------------");
logger.info("Количество файлов txt " + countByTypeFiles.get(EXT_TXT));
logger.info("Количество файлов xml " + countByTypeFiles.get(EXT_XML));
logger.info("Количество остальных файлов " + countByTypeFiles.get(EXT_ANOTHER));
logger.info("Время обработки сообщений " + (System.currentTimeMillis() - startTimeHandlingBatchFiles));
logger.info("------------------------------------------------");
isStartHandlingBatchFiles = true;
}
}
private String getFileName(Exchange exchange) {
return exchange.getIn().getHeader(Exchange.FILE_NAME).toString();
}
private String getExtensionFile(String fileName) {
return FilenameUtils.getExtension(fileName);
}
private String getBody(Exchange exchange) {
return exchange.getIn().getBody(String.class);
}
private void handlingMessage(Exchange exchange) {
String fileName = getFileName(exchange);
String extensionFile = getExtensionFile(fileName);
if (extensionFile.equals(EXT_TXT) || extensionFile.equals(EXT_XML)) {
incrementCountFiles(extensionFile);
} else {
incrementCountFiles(EXT_ANOTHER);
}
if (extensionFile.equals(EXT_TXT)) {
String body = getBody(exchange);
saveBodyMessageDB(body);
}
}
private void incrementCountFiles(String extensionFile) {
long count = countByTypeFiles.get(extensionFile);
countByTypeFiles.put(extensionFile, ++count);
}
private void saveBodyMessageDB(String body) {
PropertyLoader propertyLoader = new PropertyLoader(RESOURCES);
try (Connection connection = DriverManager.getConnection(
propertyLoader.getConnectionURL(),
propertyLoader.getUser(),
propertyLoader.getPassword())) {
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(SQL)) {
preparedStatement.setString(1, body);
preparedStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
connection.rollback();
e.printStackTrace();
}
} catch (SQLException | IOException e) {
e.printStackTrace();
}
}
}