Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.akto.dto.graph.SvcToSvcGraphNode;
import com.akto.dto.metrics.MetricData;
import com.akto.dto.monitoring.ModuleInfo;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.akto.dto.agentic_sessions.SessionDocument;
import com.akto.dto.notifications.SlackWebhook;
import com.akto.dto.runtime_filters.RuntimeFilter;
Expand Down Expand Up @@ -190,6 +191,9 @@ public class DbAction extends ActionSupport {
@Getter @Setter
private List<SessionDocument> sessionDocuments;

@Getter @Setter
private AgentQueryData agentQueryData;

private static final LoggerMaker loggerMaker = new LoggerMaker(DbAction.class, LogDb.DB_ABS);

public List<BulkUpdates> getWritesForTestingRunIssues() {
Expand Down Expand Up @@ -3635,6 +3639,16 @@ public String storeSpans() {
return Action.SUCCESS.toUpperCase();
}

public String storeAgentQueryData() {
try {
DbLayer.storeAgentQueryData(agentQueryData);
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error in storeAgentQueryData " + e.toString());
return Action.ERROR.toUpperCase();
}
return Action.SUCCESS.toUpperCase();
}

private TestingRunWebhook testingRunWebhook;

public TestingRunWebhook getTestingRunWebhook() {
Expand Down
10 changes: 10 additions & 0 deletions apps/database-abstractor/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2378,6 +2378,16 @@
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>
<action name="api/storeAgentQueryData" class="com.akto.action.DbAction" method="storeAgentQueryData">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

</package>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.akto.dao.agentic_sessions;

import com.akto.dao.AccountsContextDao;
import com.akto.dao.MCollection;
import com.akto.dao.context.Context;
import com.akto.dto.HttpResponseParams;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import org.bson.conversions.Bson;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class AgentQueryDataDao extends AccountsContextDao<AgentQueryData> {

public static final AgentQueryDataDao instance = new AgentQueryDataDao();

private static final long TTL_DAYS = 14;

public void createIndicesIfAbsent() {
// TTL index: auto-delete records older than 14 days
Bson ttlIndex = Indexes.ascending(AgentQueryData.TIME_STAMP);
IndexOptions ttlOptions = new IndexOptions()
.name("timeStamp_ttl")
.expireAfter(TTL_DAYS * 24 * 60 * 60, TimeUnit.SECONDS);
MCollection.createIndexIfAbsent(getDBName(), getCollName(), ttlIndex, ttlOptions);

// Compound index for cron queries: fetch by user within a time window
String[] compoundFields = new String[]{
AgentQueryData.SERVICE_ID,
AgentQueryData.UNIQUE_USER_ID,
AgentQueryData.TIME_STAMP
};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), compoundFields, false);
}

public AgentQueryData createAgentQueryDataFromHttpResponseParams(HttpResponseParams httpResponseParam) {
AgentQueryData agentQueryData = new AgentQueryData();

Map<String, List<String>> requestHeaders = httpResponseParam.getRequestParams().getHeaders();

agentQueryData.setServiceId(getFirstHeader(requestHeaders, "host"));
agentQueryData.setUserName(getFirstHeader(requestHeaders, (AgentQueryData.HEADER_PREFIX + AgentQueryData.HEADER_USER_EMAIL)));
agentQueryData.setSessionIdentifier(getFirstHeader(requestHeaders, (AgentQueryData.HEADER_PREFIX + AgentQueryData.HEADER_SESSION_ID)));
agentQueryData.setConversationId(getFirstHeader(requestHeaders, (AgentQueryData.HEADER_PREFIX + AgentQueryData.HEADER_CONVERSATION_ID)));

agentQueryData.setQueryPayload(httpResponseParam.getRequestParams().getPayload());
agentQueryData.setResponsePayload(httpResponseParam.getPayload());
agentQueryData.setTimeStamp(Context.now());

return agentQueryData;
}

private static String getFirstHeader(Map<String, List<String>> headers, String name) {
if (headers == null) return null;
List<String> values = headers.get(name);
return (values != null && !values.isEmpty()) ? values.get(0) : null;
}

@Override
public String getCollName() {
return "agent_query_data";
}

@Override
public Class<AgentQueryData> getClassT() {
return AgentQueryData.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.akto.dto.agentic_sessions;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter

// use this in future for learning the user type, build the flow graph of the agent
public class AgentQueryData {

// DB field name constants
public static final String SERVICE_ID = "serviceId";
public static final String UNIQUE_USER_ID = "uniqueUserId";
public static final String SESSION_IDENTIFIER = "sessionIdentifier";
public static final String QUERY_PAYLOAD = "queryPayload";
public static final String TIME_STAMP = "timeStamp";
public static final String INPUT_TOKENS = "inputTokens";
public static final String OUTPUT_TOKENS = "outputTokens";

// Request header name constants
public static final String HEADER_PREFIX = "x-akto-installer-";
public static final String HEADER_USER_EMAIL = "user_email";
public static final String HEADER_SESSION_ID = "session_id";
public static final String HEADER_CONVERSATION_ID = "conversation_id";
public static final String HEADER_GENERATION_ID = "generation_id";

private String serviceId;
private String deviceId;
private String userName;
private String sessionIdentifier;
private String conversationId;
private String queryPayload;
private String responsePayload;
private long timeStamp;
private int inputTokens;
private int outputTokens;
}
15 changes: 15 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/ClientActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.akto.database_abstractor_authenticator.JwtAuthenticator;
import com.akto.dto.*;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.akto.dto.billing.Organization;
import com.akto.dto.billing.Tokens;
import com.akto.dto.bulk_updates.BulkUpdates;
Expand Down Expand Up @@ -4460,4 +4461,18 @@ public void storeTestingRunWebhook(TestingRunWebhook testingRunWebhook) {
return;
}
}

@Override
public void storeAgentQueryData(AgentQueryData agentQueryData) {
Map<String, List<String>> headers = buildHeaders();
BasicDBObject obj = new BasicDBObject();
obj.put("agentQueryData", agentQueryData);
OriginalHttpRequest request = new OriginalHttpRequest(url + "/storeAgentQueryData", "", "POST", obj.toString(), headers, "");
try {
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error in storeAgentQueryData" + e, LoggerMaker.LogDb.RUNTIME);
return;
}
}
}
3 changes: 3 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/DataActor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.akto.data_actor;

import com.akto.dto.*;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.akto.dto.billing.Organization;
import com.akto.dto.billing.Tokens;
import com.akto.dto.dependency_flow.Node;
Expand Down Expand Up @@ -402,4 +403,6 @@ public void updateNewNodesInBatches(List<SvcToSvcGraphNode> updateNodes) {
public abstract void storeSpans(List<Span> spans);

public abstract void storeTestingRunWebhook(TestingRunWebhook testingRunWebhook);

public abstract void storeAgentQueryData(AgentQueryData agentQueryData);
}
6 changes: 6 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/DbActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.akto.dto.*;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.akto.dto.billing.Organization;
import com.akto.dto.billing.Tokens;
import com.akto.dto.dependency_flow.Node;
Expand Down Expand Up @@ -744,4 +745,9 @@ public void storeSpans(List<Span> spans) {
public void storeTestingRunWebhook(TestingRunWebhook testingRunWebhook) {
DbLayer.storeTestingRunWebhook(testingRunWebhook);
}

@Override
public void storeAgentQueryData(AgentQueryData agentQueryData) {
DbLayer.storeAgentQueryData(agentQueryData);
}
}
8 changes: 7 additions & 1 deletion libs/utils/src/main/java/com/akto/data_actor/DbLayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.akto.bulk_update_util.ApiInfoBulkUpdate;
import com.akto.dao.*;
import com.akto.dao.agentic_sessions.AgentQueryDataDao;
import com.akto.dao.filter.MergedUrlsDao;
import com.akto.dao.graph.SvcToSvcGraphEdgesDao;
import com.akto.dao.graph.SvcToSvcGraphNodesDao;
Expand Down Expand Up @@ -76,6 +77,7 @@
import com.akto.dao.traffic_metrics.TrafficMetricsDao;
import com.akto.dao.upload.FileUploadsDao;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.agentic_sessions.AgentQueryData;
import com.akto.dto.billing.Organization;
import com.akto.dto.billing.Tokens;
import com.akto.dto.dependency_flow.Node;
Expand Down Expand Up @@ -2762,4 +2764,8 @@ public static void bulkUpsertAgenticSessionContext(List<SessionDocument> session
SessionDocumentDao.instance.getMCollection().bulkWrite(bulkUpdates);
}
}
}

public static void storeAgentQueryData(AgentQueryData agentQueryData) {
AgentQueryDataDao.instance.insertOne(agentQueryData);
}
}
Loading