# How to Create a New Slips Module
## What is SLIPS and why are modules useful
Slips is a machine learning-based intrusion prevention system for Linux and MacOS, developed at the Stratosphere Laboratories from the Czech Technical University in Prague. Slips reads network traffic flows from several sources, applies multiple detections (including machine learning detections) and detects infected computers and attackers in the network. It is easy to extend the functionality of Slips by writing a new module. This blog shows how to create a new module for Slips from scratch.
## Goal of this Blog
This blog creates an example module to detect when any private IP address communicates with another private IP address. What we want is to know if, for example, the IP 192.168.4.2, is communicating with the IP 192.168.4.87. This simple idea, but still useful, is going to be the purpose of our module. Also, it will generate an alert for Slips to consider this situation. Our module will be called ```local_connection_detector```.
### High-level View of how a Module Works
All modules implement the ```IModule``` interface located at ```slips_files/common/abstracts/module.py```
Abstract methods in this interface must be implemented in every new slips module, the rest are optional.
Below is a detailed desciption of each abstract method.
The Module consists of the ```init()``` function for initializations, like subscribing to channels, reading API files, etc.
The main function of each module is the ```main()```,
this function is run in a loop that keeps looping as long as
Slips is running so that the module doesn't terminate.
In case of errors in the module, the ```main()``` function should return 1 which will cause
the module to immediately terminate.
any initializations that should be run only once should be placed in the ```init()``` function
OR the ```pre_main()```. the ```pre_main()``` is a function that acts as a hook for the main function. it runs only
once and then the main starts running in a loop.
the ```pre_main()``` is the place for initialization logic that cannot be done in the init, for example
dropping the root privileges from a module. we'll discuss this in detail later.
Printing in all modules is handled by a common ```print()``` method, the one implemented in the ```IModule``` interface.
All this common print() does is acts as a proxy between the module responsible for printing, ```output.py```, and all slips modules.
### Conclusion
Due to the high modularity of slips, adding a new slips module is as easy as modifying a few lines in our
template module, and slips handles running
your module and integrating it for you.
This is the [list of the modules](https://stratospherelinuxips.readthedocs.io/en/develop/detection_modules.html#detection-modules)
Slips currently have. You can enhance them, add detections, suggest new ideas using
[our Discord](https://discord.com/invite/zu5HwMFy5C) or by opening
a PR.
For more info about the threat levels, [check the docs](https://stratospherelinuxips.readthedocs.io/en/develop/architecture.html#threat-levels)
Detailed explanation of [IDEA categories here](https://idea.cesnet.cz/en/classifications)
Detailed explanation of [Slips profiles and timewindows here](https://idea.cesnet.cz/en/classifications)
[Contributing guidelines](https://stratospherelinuxips.readthedocs.io/en/develop/contributing.html)
## Complete Code
Here is the whole local_connection_detector.py code for copy/paste.
```python
import ipaddress
import json
from slips_files.common.flow_classifier import FlowClassifier
from slips_files.core.structures.evidence import
(
Evidence,
ProfileID,
TimeWindow,
Victim,
Attacker,
ThreatLevel,
EvidenceType,
IoCType,
Direction,
)
from slips_files.common.parsers.config_parser import ConfigParser
from slips_files.common.slips_utils import utils
from slips_files.common.abstracts.imodule import IModule
class LocalConnectionDetector(
IModule
):
# Name: short name of the module. Do not use spaces
name = 'local_connection_detector'
description = 'detects connections to other devices in your local network'
authors = ['Template Author']
def init(
self
):
# To which channels do you want to subscribe? When a message
# arrives on the channel the module will receive a msg
# You can find the full list of channels at
# slips_files/core/database/redis_db/database.py
self.c1 = self.db.subscribe(
'new_flow'
)
self.channels = {
'new_flow': self.c1,
}
# to be able to convert flows from dict format to objects
self.classifier = FlowClassifier()
def pre_main(
self
):
"""
Initializations that run only once before the main() function runs in a loop
"""
utils.drop_root_privs_permanently()
def main(
self
):
"""Main loop function"""
if msg := self.get_msg(
'new_flow'
):
msg = json.loads(
msg['data']
)
# convert the given dict flow to a flow object
flow = self.classifier.convert_to_flow_obj(
msg["flow"]
)
saddr = flow.saddr
daddr = flow.daddr
timestamp = flow.starttime
srcip_obj = ipaddress.ip_address(
saddr
)
dstip_obj = ipaddress.ip_address(
daddr
)
if srcip_obj.is_private and dstip_obj.is_private:
# on a scale of 0 to 1, how confident you are of this evidence
confidence = 0.8
# how dangerous is this evidence? info, low, medium, high, critical?
threat_level = ThreatLevel.HIGH
# the name of your evidence, you can put any descriptive string here
# this is the type we just created
evidence_type = EvidenceType.CONNECTION_TO_LOCAL_DEVICE
# which ip is the attacker here?
attacker = Attacker(
direction=Direction.SRC,
# who's the attacker the src or the dst?
attacker_type=IoCType.IP,
# is it an IP? is it a domain? etc.
value=saddr
# the actual ip/domain/url of the attacker, in our case, this is the IP
)
victim = Victim(
direction=Direction.SRC,
ioc_type=IoCType.IP,
value=daddr,
)
# describe the evidence
description = f'A connection to a local device {daddr}'
# the current profile is the source ip,
# this comes in the msg received in the channel
# the profile this evidence should be in, should be the profile of the attacker
# because this is evidence that this profile is attacker others right?
profile = ProfileID(
ip=saddr
)
# Profiles are split into timewindows, each timewindow is 1h,
# this if of the timewindwo comes in the msg received in the channel
twid_number = int(
msg['twid'].replace(
"timewindow",
''
)
)
timewindow = TimeWindow(
number=twid_number
)
# list of uids of the flows that are part of this evidence
uid_list = [flow.uid]
# no use the above info to create the evidence obj
evidence = Evidence(
evidence_type=evidence_type,
attacker=attacker,
threat_level=threat_level,
description=description,
victim=victim,
profile=profile,
timewindow=timewindow,
uid=uid_list,
# when did this evidence happen? use the
# flow's ts detected by zeek
# this comes in the msg received in the channel
timestamp=timestamp,
confidence=confidence
)
self.db.set_evidence(
evidence
)
self.print(
"Done setting evidence!!!"
)
```
All good, you can find your evidence now in alerts.json and alerts.log of the output directory.
## Line by Line Explanation of the Module
This section is for more detailed explanation of what each line of the module does.
In order to print in your module, you simply use the following line
self.print("some text", 1, 0)
and the text will be sent to the output process for logging and printing to the terminal.
---
Now here's the ```pre_main()``` function, all initializations like dropping root privs, checking for API keys, etc
should be done here
```python
utils.drop_root_privs_permanently()
```
the above line is responsible for dropping root privileges,
so if slips starts with sudo and the module doesn't need the root permissions, we drop them.
---
Now here's the ```main()``` function, this is the main function of each module,
it's the one that gets executed in a loop when the module starts.
All the code in this function is run in a loop as long as the module is up.
in case of an error, the module's main should return non-zero and
the module will finish execution and terminate.
if there's no errors, the module will keep looping until it runs out of msgs in the redis channels
and will call ```shutdown_gracefully()``` and terminate.
```python
if msg := self.get_msg('new_flow'):
```
The above line listens on the channel called ```new_flow``` that we subscribed to earlier.
The messages received in the channel are flows the slips read by the input process.
## Reading Input flows from an external module (Advanced)
Slips relies on input process for reading flows, either from an interface, a pcap, or zeek files, etc.
If you want to add your own module that reads flows from somehwere else,
for example from a simulation framework like the CYST module,
you can easily do that using the ```--input-module ``` parameter
Reading flows should be handeled by that module, then sent to the inputprocess for processing using the
```new_module_flow``` channel.
For now, this feature only supports reading flows in zeek json format, but feel free to extend it.
### How to shutdown_gracefully()
So, for example if you're training a ML model in your module,
and you want to save it before the module stops,
You should place your ```save_model()``` function in the ```shutdown_gracefully()``` function.
### Troubleshooting
- If the module does not start at all, make sure it is not disabled in the
```config/slips.yaml``` file.
- Check that the \_\_init\_\_.py file is present in module directory
- Read the output files (errors.log and slips.log) - if there were any errors
(eg. import errors), they would prevent the module from starting.
- If the module started, but did not receive any messages from
the channel, make sure that:
- The channel is properly subscribed to in the module
- Messages are being sent in this channel
- Other modules subscribed to the channel get the message
- The channel name is present in the supported_channels list in ```slips_files/core/database/redis_db/database.py```
### Testing
Slips has 2 kinds of tests, unit tests and integration tests.
integration tests are in ```tests/integration/```, In there we test all files in our ```dataset/``` dir.
Before pushing, run the unit tests and integration tests by:
1- Make sure you're in slips main dir
2- Run all tests ```./tests/run_all_tests.sh```
Slips supports the -P flag to run redis on your port of choice. this flag is
used so that slips can keep track of the ports it opened while testing and close them later.
### Adding your own unit tests
Slips uses ```pytest``` as the main testing framework, You can add your own unit tests by:
1- create a file called ```test_module_name.py``` in the ```tests/``` dir
2- create a method for initializing your module in ```tests/module_factory.py```
3- every function should start with ```test_```
4- go to the main slips dir and run ```./tests/run_all_tests.sh``` and every test file in the ```tests/``` dir will run
### Getting in touch
Feel free to join our [Discord server](https://discord.gg/zu5HwMFy5C) and ask questions, suggest new features or give us feedback.
PRs and Issues are welcomed in our repo.
### Conclusion
Adding a new feature to SLIPS is an easy task. The template is ready for everyone to use and there is not much to learn about Slips to be able to write a module.
If you wish to add a new module to the Slips repository, issue a pull request and wait for a review.