Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlp_03</artifactId>
<version>1.7.6</version>
<version>7.0.2</version>
</dependency>
<!-- logging -->
<dependency>
Expand Down
106 changes: 59 additions & 47 deletions src/main/java/com/teragrep/rlp_08/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingTimeWindowMovingAverages;
import com.teragrep.rlp_03.FrameProcessor;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.SyslogFrameProcessor;
import com.teragrep.rlp_03.channel.socket.SocketFactory;
import com.teragrep.rlp_03.channel.socket.TLSFactory;
import com.teragrep.rlp_03.eventloop.EventLoop;
import com.teragrep.rlp_03.eventloop.EventLoopFactory;
import com.teragrep.rlp_03.frame.RelpFrame;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import com.teragrep.rlp_03.channel.socket.PlainFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.codahale.metrics.MetricRegistry.name;

class Main {
private static final Meter totalRecords = new Meter(new SlidingTimeWindowMovingAverages());
private static final Meter totalBytes = new Meter(new SlidingTimeWindowMovingAverages());
private static int threads;
private static Consumer<byte[]> byteConsumer;
public static void main(String[] args) throws IOException, InterruptedException, GeneralSecurityException {

public static void main(String[] args) throws Exception {
int port = Integer.parseInt(System.getProperty("port", "1601"));
boolean tls = Boolean.parseBoolean(System.getProperty("tls", "false"));
threads = Integer.parseInt(System.getProperty("threads", "1"));
int threads = Integer.parseInt(System.getProperty("threads", "1"));
System.out.println(
" _____ _ _ _ _ _ _ _ _ \n" +
"|_ _| |__ (_)___ __ _(_) | | | | ___ ___ ___ __ _| | |\n" +
Expand All @@ -42,14 +52,9 @@ public static void main(String[] args) throws IOException, InterruptedException,
" |___/ \n"
);
System.out.println("----");
System.out.println("Listening on port " + port + " using " + threads + " threads");

ExecutorService executorService = Executors.newFixedThreadPool(threads);
int metricsInterval = Integer.parseInt(System.getProperty("metricsInterval", "0"));
if(metricsInterval > 0) {
byteConsumer = bytes -> {
totalBytes.mark(bytes.length);
totalRecords.mark();
};
MetricRegistry metricRegistry = new MetricRegistry();
metricRegistry.register(name("total", "records"), totalRecords);
metricRegistry.register(name("total", "bytes"), totalBytes);
Expand All @@ -59,44 +64,51 @@ public static void main(String[] args) throws IOException, InterruptedException,
reporter.start(metricsInterval, TimeUnit.SECONDS);
System.out.println("Metrics are printed every " + metricsInterval + " seconds");
}
else {
byteConsumer = bytes -> {};
}

if (tls) {
System.out.println("Starting TLS server");
tlsServer(port);
} else {
System.out.println("Starting plain server");
plainServer(port);
}
Supplier<FrameDelegate> frameDelegateSupplier = () -> new DefaultFrameDelegate(createSyslogConsumer(metricsInterval));
EventLoopFactory eventLoopFactory = new EventLoopFactory();
EventLoop eventLoop = eventLoopFactory.create();
Thread eventLoopThread = new Thread(eventLoop);
eventLoopThread.start();
ServerFactory serverFactory = new ServerFactory(
eventLoop,
executorService,
createSocketFactory(tls),
frameDelegateSupplier
);
System.out.println("Starting " + (tls ? "tls" : "plain") + "server with <" + threads + "> thread(s) at port <" + port + ">");
Server server = serverFactory.create(port);
Thread.sleep(Long.MAX_VALUE);
eventLoop.stop();
eventLoopThread.join();
executorService.shutdown();
}

private static void plainServer(int port) throws IOException, InterruptedException {
FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer);
Server relpServer = new Server(port, syslogFrameProcessor);
relpServer.setNumberOfThreads(threads);
relpServer.start();
Thread.sleep(Long.MAX_VALUE);
private static Consumer<FrameContext> createSyslogConsumer(int metricsInterval) {
if(metricsInterval > 0) {
return frameContext -> {
totalBytes.mark(frameContext.relpFrame().payloadLength().size());
totalRecords.mark();
};
}
else {
return frameContext -> {
};
}
}

private static void tlsServer(int port) throws InterruptedException, GeneralSecurityException, IOException {
FrameProcessor syslogFrameProcessor = new SyslogFrameProcessor(byteConsumer);
InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks");
SSLContext sslContext;
try {
sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3");
} catch (IOException e) {
throw new RuntimeException("SSL.demoContext Error: " + e);
private static SocketFactory createSocketFactory(boolean useTls) throws GeneralSecurityException, IOException {
if(useTls) {
InputStream keyStoreStream = Main.class.getClassLoader().getResourceAsStream("keystore-server.jks");
SSLContext sslContext = SSLDemoContextFactory.authenticatedContext(keyStoreStream, "changeit", "TLSv1.3");
Function<SSLContext, SSLEngine> sslEngineFunction = sslContext1 -> {
SSLEngine sslEngine = sslContext1.createSSLEngine();
sslEngine.setUseClientMode(false);
return sslEngine;
};
return new TLSFactory(sslContext, sslEngineFunction);
}
else {
return new PlainFactory();
}
Function<SSLContext, SSLEngine> sslEngineFunction = sslCtx -> {
SSLEngine sslEngine = sslCtx.createSSLEngine();
sslEngine.setUseClientMode(false);
return sslEngine;
};
Server relpServer = new Server(port, syslogFrameProcessor, sslContext, sslEngineFunction);
relpServer.setNumberOfThreads(threads);
relpServer.start();
Thread.sleep(Long.MAX_VALUE);
}
}