diff --git a/pom.xml b/pom.xml index 983257f..804edf4 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ com.teragrep rlp_03 - 1.7.6 + 7.0.2 diff --git a/src/main/java/com/teragrep/rlp_08/Main.java b/src/main/java/com/teragrep/rlp_08/Main.java index 205f543..8a7d93a 100644 --- a/src/main/java/com/teragrep/rlp_08/Main.java +++ b/src/main/java/com/teragrep/rlp_08/Main.java @@ -4,29 +4,39 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SlidingTimeWindowMovingAverages; -import com.teragrep.rlp_03.FrameProcessor; -import com.teragrep.rlp_03.Server; -import com.teragrep.rlp_03.SyslogFrameProcessor; +import com.teragrep.rlp_03.channel.socket.SocketFactory; +import com.teragrep.rlp_03.channel.socket.TLSFactory; +import com.teragrep.rlp_03.eventloop.EventLoop; +import com.teragrep.rlp_03.eventloop.EventLoopFactory; +import com.teragrep.rlp_03.frame.RelpFrame; +import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; +import com.teragrep.rlp_03.frame.delegate.FrameContext; +import com.teragrep.rlp_03.frame.delegate.FrameDelegate; +import com.teragrep.rlp_03.server.Server; +import com.teragrep.rlp_03.server.ServerFactory; +import com.teragrep.rlp_03.channel.socket.PlainFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.IOException; import java.io.InputStream; import java.security.GeneralSecurityException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static com.codahale.metrics.MetricRegistry.name; class Main { private static final Meter totalRecords = new Meter(new SlidingTimeWindowMovingAverages()); private static final Meter totalBytes = new Meter(new SlidingTimeWindowMovingAverages()); - private static int threads; - private static Consumer byteConsumer; - public static void main(String[] args) throws IOException, InterruptedException, GeneralSecurityException { + + public static void main(String[] args) throws Exception { int port = Integer.parseInt(System.getProperty("port", "1601")); boolean tls = Boolean.parseBoolean(System.getProperty("tls", "false")); - threads = Integer.parseInt(System.getProperty("threads", "1")); + int threads = Integer.parseInt(System.getProperty("threads", "1")); System.out.println( " _____ _ _ _ _ _ _ _ _ \n" + "|_ _| |__ (_)___ __ _(_) | | | | ___ ___ ___ __ _| | |\n" + @@ -42,14 +52,9 @@ public static void main(String[] args) throws IOException, InterruptedException, " |___/ \n" ); System.out.println("----"); - System.out.println("Listening on port " + port + " using " + threads + " threads"); - + ExecutorService executorService = Executors.newFixedThreadPool(threads); int metricsInterval = Integer.parseInt(System.getProperty("metricsInterval", "0")); if(metricsInterval > 0) { - byteConsumer = bytes -> { - totalBytes.mark(bytes.length); - totalRecords.mark(); - }; MetricRegistry metricRegistry = new MetricRegistry(); metricRegistry.register(name("total", "records"), totalRecords); metricRegistry.register(name("total", "bytes"), totalBytes); @@ -59,44 +64,51 @@ public static void main(String[] args) throws IOException, InterruptedException, reporter.start(metricsInterval, TimeUnit.SECONDS); System.out.println("Metrics are printed every " + metricsInterval + " seconds"); } - else { - byteConsumer = bytes -> {}; - } - - if (tls) { - System.out.println("Starting TLS server"); - tlsServer(port); - } else { - System.out.println("Starting plain server"); - plainServer(port); - } + Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(createSyslogConsumer(metricsInterval)); + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + EventLoop eventLoop = eventLoopFactory.create(); + Thread eventLoopThread = new Thread(eventLoop); + eventLoopThread.start(); + ServerFactory serverFactory = new ServerFactory( + eventLoop, + executorService, + createSocketFactory(tls), + frameDelegateSupplier + ); + System.out.println("Starting " + (tls ? "tls" : "plain") + "server with <" + threads + "> thread(s) at port <" + port + ">"); + Server server = serverFactory.create(port); + Thread.sleep(Long.MAX_VALUE); + eventLoop.stop(); + eventLoopThread.join(); + executorService.shutdown(); } - private static void plainServer(int port) throws IOException, InterruptedException { - FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer); - Server relpServer = new Server(port, syslogFrameProcessor); - relpServer.setNumberOfThreads(threads); - relpServer.start(); - Thread.sleep(Long.MAX_VALUE); + private static Consumer createSyslogConsumer(int metricsInterval) { + if(metricsInterval > 0) { + return frameContext -> { + totalBytes.mark(frameContext.relpFrame().payloadLength().size()); + totalRecords.mark(); + }; + } + else { + return frameContext -> { + }; + } } - private static void tlsServer(int port) throws InterruptedException, GeneralSecurityException, IOException { - FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer); - InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks"); - SSLContext sslContext; - try { - sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3"); - } catch (IOException e) { - throw new RuntimeException("SSL.demoContext Error: " + e); + private static SocketFactory createSocketFactory(boolean useTls) throws GeneralSecurityException, IOException { + if(useTls) { + InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks"); + SSLContext sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3"); + Function sslEngineFunction = sslContext1 -> { + SSLEngine sslEngine = sslContext1.createSSLEngine(); + sslEngine.setUseClientMode(false); + return sslEngine; + }; + return new TLSFactory(sslContext, sslEngineFunction); + } + else { + return new PlainFactory(); } - Function sslEngineFunction = sslCtx -> { - SSLEngine sslEngine = sslCtx.createSSLEngine(); - sslEngine.setUseClientMode(false); - return sslEngine; - }; - Server relpServer = new Server(port, syslogFrameProcessor, sslContext, sslEngineFunction); - relpServer.setNumberOfThreads(threads); - relpServer.start(); - Thread.sleep(Long.MAX_VALUE); } }