diff --git a/pom.xml b/pom.xml
index 983257f..804edf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.teragrep
rlp_03
- 1.7.6
+ 7.0.2
diff --git a/src/main/java/com/teragrep/rlp_08/Main.java b/src/main/java/com/teragrep/rlp_08/Main.java
index 205f543..8a7d93a 100644
--- a/src/main/java/com/teragrep/rlp_08/Main.java
+++ b/src/main/java/com/teragrep/rlp_08/Main.java
@@ -4,29 +4,39 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingTimeWindowMovingAverages;
-import com.teragrep.rlp_03.FrameProcessor;
-import com.teragrep.rlp_03.Server;
-import com.teragrep.rlp_03.SyslogFrameProcessor;
+import com.teragrep.rlp_03.channel.socket.SocketFactory;
+import com.teragrep.rlp_03.channel.socket.TLSFactory;
+import com.teragrep.rlp_03.eventloop.EventLoop;
+import com.teragrep.rlp_03.eventloop.EventLoopFactory;
+import com.teragrep.rlp_03.frame.RelpFrame;
+import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
+import com.teragrep.rlp_03.frame.delegate.FrameContext;
+import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
+import com.teragrep.rlp_03.server.Server;
+import com.teragrep.rlp_03.server.ServerFactory;
+import com.teragrep.rlp_03.channel.socket.PlainFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import static com.codahale.metrics.MetricRegistry.name;
class Main {
private static final Meter totalRecords = new Meter(new SlidingTimeWindowMovingAverages());
private static final Meter totalBytes = new Meter(new SlidingTimeWindowMovingAverages());
- private static int threads;
- private static Consumer byteConsumer;
- public static void main(String[] args) throws IOException, InterruptedException, GeneralSecurityException {
+
+ public static void main(String[] args) throws Exception {
int port = Integer.parseInt(System.getProperty("port", "1601"));
boolean tls = Boolean.parseBoolean(System.getProperty("tls", "false"));
- threads = Integer.parseInt(System.getProperty("threads", "1"));
+ int threads = Integer.parseInt(System.getProperty("threads", "1"));
System.out.println(
" _____ _ _ _ _ _ _ _ _ \n" +
"|_ _| |__ (_)___ __ _(_) | | | | ___ ___ ___ __ _| | |\n" +
@@ -42,14 +52,9 @@ public static void main(String[] args) throws IOException, InterruptedException,
" |___/ \n"
);
System.out.println("----");
- System.out.println("Listening on port " + port + " using " + threads + " threads");
-
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
int metricsInterval = Integer.parseInt(System.getProperty("metricsInterval", "0"));
if(metricsInterval > 0) {
- byteConsumer = bytes -> {
- totalBytes.mark(bytes.length);
- totalRecords.mark();
- };
MetricRegistry metricRegistry = new MetricRegistry();
metricRegistry.register(name("total", "records"), totalRecords);
metricRegistry.register(name("total", "bytes"), totalBytes);
@@ -59,44 +64,51 @@ public static void main(String[] args) throws IOException, InterruptedException,
reporter.start(metricsInterval, TimeUnit.SECONDS);
System.out.println("Metrics are printed every " + metricsInterval + " seconds");
}
- else {
- byteConsumer = bytes -> {};
- }
-
- if (tls) {
- System.out.println("Starting TLS server");
- tlsServer(port);
- } else {
- System.out.println("Starting plain server");
- plainServer(port);
- }
+ Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(createSyslogConsumer(metricsInterval));
+ EventLoopFactory eventLoopFactory = new EventLoopFactory();
+ EventLoop eventLoop = eventLoopFactory.create();
+ Thread eventLoopThread = new Thread(eventLoop);
+ eventLoopThread.start();
+ ServerFactory serverFactory = new ServerFactory(
+ eventLoop,
+ executorService,
+ createSocketFactory(tls),
+ frameDelegateSupplier
+ );
+ System.out.println("Starting " + (tls ? "tls" : "plain") + "server with <" + threads + "> thread(s) at port <" + port + ">");
+ Server server = serverFactory.create(port);
+ Thread.sleep(Long.MAX_VALUE);
+ eventLoop.stop();
+ eventLoopThread.join();
+ executorService.shutdown();
}
- private static void plainServer(int port) throws IOException, InterruptedException {
- FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer);
- Server relpServer = new Server(port, syslogFrameProcessor);
- relpServer.setNumberOfThreads(threads);
- relpServer.start();
- Thread.sleep(Long.MAX_VALUE);
+ private static Consumer createSyslogConsumer(int metricsInterval) {
+ if(metricsInterval > 0) {
+ return frameContext -> {
+ totalBytes.mark(frameContext.relpFrame().payloadLength().size());
+ totalRecords.mark();
+ };
+ }
+ else {
+ return frameContext -> {
+ };
+ }
}
- private static void tlsServer(int port) throws InterruptedException, GeneralSecurityException, IOException {
- FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer);
- InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks");
- SSLContext sslContext;
- try {
- sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3");
- } catch (IOException e) {
- throw new RuntimeException("SSL.demoContext Error: " + e);
+ private static SocketFactory createSocketFactory(boolean useTls) throws GeneralSecurityException, IOException {
+ if(useTls) {
+ InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks");
+ SSLContext sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3");
+ Function sslEngineFunction = sslContext1 -> {
+ SSLEngine sslEngine = sslContext1.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ return sslEngine;
+ };
+ return new TLSFactory(sslContext, sslEngineFunction);
+ }
+ else {
+ return new PlainFactory();
}
- Function sslEngineFunction = sslCtx -> {
- SSLEngine sslEngine = sslCtx.createSSLEngine();
- sslEngine.setUseClientMode(false);
- return sslEngine;
- };
- Server relpServer = new Server(port, syslogFrameProcessor, sslContext, sslEngineFunction);
- relpServer.setNumberOfThreads(threads);
- relpServer.start();
- Thread.sleep(Long.MAX_VALUE);
}
}