-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnetflow_botnet_test.py
More file actions
98 lines (79 loc) · 3.13 KB
/
netflow_botnet_test.py
File metadata and controls
98 lines (79 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
"""
Netflow Botnet Test - Giả lập gửi traffic botnet vào Kafka để test
Đọc một mẫu botnet từ dataset và gửi vào Kafka topic 'netflow'
"""
import json
import pandas as pd
from confluent_kafka import Producer
import time
import sys
# ==================== Botnet IPs ====================
infected_ips = ['147.32.84.165', '147.32.84.191', '147.32.84.192']
# ==================== Kafka Producer Setup ====================
producer_config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'botnet-test-producer'
}
producer = Producer(producer_config)
def delivery_report(err, msg):
"""Callback được gọi khi message được gửi"""
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
# ==================== Load Dataset ====================
print("🔄 Đang đọc dataset...")
filepath = '../NetFlow_Project/data/extracted_by_CIC/capture20110818-2.truncated.csv'
df = pd.read_csv(filepath)
print(f"✓ Dataset đã được load: {df.shape[0]:,} dòng")
# Tạo label
def is_botnet(row):
src_ip = str(row['src_ip'])
dst_ip = str(row['dst_ip'])
return (src_ip in infected_ips or dst_ip in infected_ips)
df['is_botnet'] = df.apply(is_botnet, axis=1)
# Lấy các mẫu botnet
botnet_samples = df[df['is_botnet'] == True].copy()
if len(botnet_samples) == 0:
print(" Không tìm thấy mẫu botnet nào trong dataset!")
sys.exit(1)
# ==================== Chọn mẫu để test ====================
# Lấy 5 mẫu botnet ngẫu nhiên
num_samples = min(5, len(botnet_samples))
test_samples = botnet_samples.sample(n=num_samples, random_state=42)
print(f"\n{'='*80}")
print(f"GỬI {num_samples} MẪU BOTNET VÀO KAFKA")
print(f"{'='*80}\n")
# ==================== Gửi vào Kafka ====================
for idx, (_, row) in enumerate(test_samples.iterrows(), 1):
# Xóa cột is_botnet trước khi gửi
row_dict = row.drop('is_botnet').to_dict()
# Chuyển đổi các giá trị numpy thành Python native types
for key, value in row_dict.items():
if pd.isna(value):
row_dict[key] = 0
elif isinstance(value, (pd.Int64Dtype, pd.Float64Dtype)):
row_dict[key] = float(value) if '.' in str(value) else int(value)
elif hasattr(value, 'item'): # numpy types
row_dict[key] = value.item()
# Chuyển thành JSON
netflow_json = json.dumps(row_dict)
# In thông tin
print(f"📤 Gửi mẫu #{idx}:")
print(f" Src: {row_dict.get('src_ip')}:{row_dict.get('src_port')}")
print(f" Dst: {row_dict.get('dst_ip')}:{row_dict.get('dst_port')}")
print(f" Protocol: {row_dict.get('protocol')}")
print(f" 🔴 BOTNET SAMPLE")
# Gửi vào Kafka
producer.produce(
topic='netflow',
value=netflow_json.encode('utf-8'),
callback=delivery_report
)
# Flush để đảm bảo message được gửi
producer.flush()
print()
# Chờ 3 giây trước khi gửi mẫu tiếp theo
if idx < num_samples:
time.sleep(5)