*
* Example1 (scenario loaded from the file): * * Scenario: * * {@code * * * test * * select from some_table * * * ........ * * * * test * * ........ * * } * * * Example2 (scenario created programmatically): * * {@code * * Scenario scenario = new Scenario(); * * Source source1 = new Source(); * source1.setName("test"); * source1.setSql("select * from some_table"); * source1.setParallel(true) * scenario.addSource(source1); * * Destination destination1 = new Destination(); * destination1.setName(source1.getName()); * destination1.setSource(source1); * destination1.setParallel(true); * destination1.setConnectionName(EtlConfig.DEST_CONNECTION_NAME); * scenario.addDestination(destination1); * } *
* The ETL scenario for this example is configured to extract data from 3 tables in the java db database and use it to create json files. * It can create files in any supported format without any scenario modification. *
* Scenario demonstrates how to load data for each destination in parallel thread. Because data are streamed * there is no need to extract in parallel. *
* There are no input files. Output files are: APP_HOME/data/EMPLOYEE.json, APP_HOME/data/CONFIG_PROPERTY.json and * APP_HOME/data/CONFIG_VALUE.json *
* The logger is manually set to the INFO mode which increases verbosity of the etl engine. * */ public class ParallelExtractAndLoad { /** * Configures ETL engine and executes ETL scenario. * * @param args the command line arguments */ public static void main(String[] args) { ParallelExtractAndLoad engine = new ParallelExtractAndLoad(); try { // instantiates ETL configuration EtlConfig etlConfig = new EtlConfig(); // creates embedded ETL process EtlProcess etlProcess = new EtlProcess(EtlProcess.EtlMode.EMBEDDED); // print out framework version System.out.println(SystemConfig.instance().getTitle( EtlConfig.DEFAULT_TITLE) + " " + SystemConfig.instance().getSystemProperty( SystemConfig.VERSION)); // creates configuration which contains source and destination // connections, creates and executes ETL scenario. EtlResponse response = engine .createScenarioAndConfigurationExecuteScenario(etlConfig, etlProcess); // print out formatted output from the ETL response System.out.println(engine.getMessage(response)); } catch (Exception ex) { System.out.println(Utils.getStackTraceAsString(ex)); } System.exit(0); } /** * Programmatically creates connection aliases for the ETL process, programmatically creates ETL scenario. * Executes ETl scenario. * * @param config the ETL config * @param etlProcess the ETl process * @return ETL response * @throws Exception in case of any error */ private EtlResponse createScenarioAndConfigurationExecuteScenario( EtlConfig config, EtlProcess etlProcess) throws Exception { // initializes ETl config config.init(); // creates source alias Alias source = new Alias(); source.setName("Java DB"); source.setUrl("jdbc:derby:{app.root.data}/demo/javadb"); source.setJdbcDriverClass("org.apache.derby.jdbc.EmbeddedDriver"); // creates destination alias Alias destination = new Alias(); destination.setName("JSON files"); destination .setConnectorClassName("com.toolsverse.etl.connector.json.JsonConnector"); destination.setUrl("{app.root.data}/*.json"); // adds aliases. ETL process will create connections from these aliases config.addAliasToMap(EtlConfig.SOURCE_CONNECTION_NAME, source); config.addAliasToMap(EtlConfig.DEST_CONNECTION_NAME, destination); // programmatically create ETL scenario Scenario scenario = new Scenario(); scenario.setName("Extract data from database and load into files in any supported format"); scenario.setDriver(GenericFileDriver.class.getName()); // sources Source source1 = new Source(); source1.setName("EMPLOYEE"); source1.setSql("select * from employee"); source1.setConnectionName(EtlConfig.SOURCE_CONNECTION_NAME); scenario.addSource(source1); Source source2 = new Source(); source2.setName("CONFIG_PROPERTY"); source2.setSql("select * from config_property"); source2.setConnectionName(EtlConfig.SOURCE_CONNECTION_NAME); scenario.addSource(source2); Source source3 = new Source(); source3.setName("CONFIG_VALUE"); source3.setSql("select * from config_value"); source3.setConnectionName(EtlConfig.SOURCE_CONNECTION_NAME); scenario.addSource(source3); // destinations Destination destination1 = new Destination(); destination1.setName(source1.getName()); destination1.setSource(source1); destination1.setStream(true); destination1.setParallel(true); destination1.setConnectionName(EtlConfig.DEST_CONNECTION_NAME); scenario.addDestination(destination1); Destination destination2 = new Destination(); destination2.setName(source2.getName()); destination2.setSource(source2); destination2.setStream(true); destination2.setParallel(true); destination2.setConnectionName(EtlConfig.DEST_CONNECTION_NAME); scenario.addDestination(destination2); Destination destination3 = new Destination(); destination3.setName(source3.getName()); destination3.setSource(source3); destination3.setStream(true); destination3.setParallel(true); destination3.setConnectionName(EtlConfig.DEST_CONNECTION_NAME); scenario.addDestination(destination3); // this is important! scenario.setReady(true); scenario.setAction(EtlConfig.EXTRACT_LOAD); // creates ETl request using given config, scenario and log level EtlRequest request = new EtlRequest(config, scenario, Logger.INFO); // executes ETL process return etlProcess.execute(request); } /** * Creates the formatted message from the ETL response * * @param response the ETl response * * @return formatted message from the ETL response */ private String getMessage(EtlResponse response) { String msg = ""; String start = response != null ? response.getStartTime().toString() : new Date().toString(); String end = response != null && response.getEndTime() != null ? response .getEndTime().toString() : new Date().toString(); String diff = response != null ? String .valueOf(Utils.getDateDiff(response.getEndTime(), response.getStartTime(), Calendar.SECOND)) : "0"; if (response.getRetCode() == EtlConfig.RETURN_OK) { msg = Utils .format("\n" + EtlResource.FINISH_SUCCESS_MSG.getValue() + "\n" + "Started at %2, finished at %3, total execution time %4 seconds.", start, end, diff); if (!Utils.isNothing(response.getWarnings())) { msg = msg + "\n" + EtlResource.WARING_NOTHING_WAS_EXECUTED.getValue(); } } else if (response.getRetCode() == EtlConfig.RETURN_NO_DRIVERS) { msg = "\n" + EtlResource.NO_DRIVERS_MSG.getValue(); } else { msg = Utils .format("\n" + EtlResource.FINISH_ERROR_MSG.getValue() + "\n" + "Started at %2, finished at %3, total execution time %4 seconds.", start, end, diff) + "\n" + Utils.getStackTraceAsString(response.getException()); } return msg; } }