Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Commits on Source (2)
......@@ -9,5 +9,5 @@ edition = "2021"
serde_json ="1.0"
ollama-rs = "0.1.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.196", features = ["derive"] }
serde = "1.0.196"
chrono = "0.4.19"
\ No newline at end of file
......@@ -2,20 +2,31 @@
# To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest
FROM /usr/share/ollama/.ollama/models/blobs/sha256-e8a35b5937a5e6d5c35d1f2a15f161e07eefe5e5bb0a3cdd42998ee79b057730
FROM mistral:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]"
PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0
PARAMETER temperature 0.2
# context window size in token
PARAMETER num_ctx 4096
# system message to specify the behavior of the llm
SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate bye formating your answers as follow:
'{"origin":<num port>, "control": <your field to discuss with other router>, "data": <data to relay>}'
Do not respond anithing exept the formated responsse as depicted before.
You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
{"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
"""
\ No newline at end of file
# Modelfile generated by "ollama show"
# To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest
FROM phi:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]"
PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0.2
# context window size in token
PARAMETER num_ctx 4096
# system message to specify the behavior of the llm
SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
{"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
"""
\ No newline at end of file
# Modelfile generated by "ollama show"
# To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest
FROM solar:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]"
PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0.2
# context window size in token
PARAMETER num_ctx 4096
# system message to specify the behavior of the llm
SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
{"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
"""
\ No newline at end of file
use serde_json::Value;
use serde::{
Serialize,
Deserialize
};
// use serde::{
// Serialize,
// Deserialize,
// };
pub const REQUEST_BODY: &str = "{\"origin\": \"-1\", \"control\": \"{1}\", \"data\": \"{2}\"}";
pub const REQUEST_BODY: &str = "{\"destination\":\"\", \"origin\": \"1\", \"control\": \"{1}\", \"data\": \"{2}\", \"identity\":\"{3}\"}, \"sender_identity\":\"{4}\"";
/*
* Node struct
......@@ -14,7 +14,13 @@ pub const REQUEST_BODY: &str = "{\"origin\": \"-1\", \"control\": \"{1}\", \"dat
*/
pub struct Node {
pub id: String,
pub chan: std::sync::mpsc::Sender<Value>
pub chan_s: std::sync::mpsc::Sender<Value>,
pub chan_r: std::sync::mpsc::Receiver<Value>
}
pub struct Com {
pub chan_s: std::sync::mpsc::Sender<Value>,
pub chan_r: std::sync::mpsc::Receiver<Value>
}
/*
......@@ -23,11 +29,11 @@ pub struct Node {
* @param id: String message id
* @param message: String message
*/
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message {
pub id: String,
pub message: String
}
// #[derive(Serialize, Deserialize, Debug, Clone)]
// pub struct Message {
// // pub id: String,
// pub message: String
// }
#[derive(Clone)]
pub struct Flags {
......
......@@ -60,7 +60,7 @@ fn print_help() {
println!("exit => exit program");
println!("list => list avaible nodes");
println!("<node> quit => quit node");
println!("command:\n send <message> => send message to llm\n quit => quit node");
println!("command:\n send <message> => send message to llm\n quit => quit node \n sendjson <json> => send json to node");
println!("help => display this message");
println!("----------------------------------------");
}
......@@ -118,7 +118,7 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
"exit" => {
println!("exiting the program");
for node in nodes.iter() {
node.chan.send(Value::String("quit".to_string())).unwrap();
node.chan_s.send(Value::String("quit".to_string())).unwrap();
}
return Ok(());
}
......@@ -139,8 +139,13 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
if flags.debug == true {
println!("control: {}, data: {}", control, data);
}
if control == "sendjson" {
node.chan_s.send(serde_json::Value::String(make_request(data, command))).unwrap();
} else {
node.chan_s.send(serde_json::Value::String(command.to_string())).unwrap();
}
did_something = true;
node.chan.send(serde_json::Value::String(make_request(control, data))).unwrap();
// check if the command is quit, need to remove the node if true
if command == "quit" {
index_to_remove = Some(index);
......@@ -162,6 +167,15 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
}
Err(_) => print_help(),
}
for node in nodes.iter() {
match node.chan_r.try_recv() {
Ok(value) => {
println!("{}: {}", node.id, value.to_string());
}
Err(_) => (),
}
}
}
}
......@@ -225,7 +239,7 @@ fn get_args(args: std::env::Args, arg: &mut String, flags: &mut data::Flags) ->
Ok(())
}
fn make_request(control: &str, data: &str) -> String {
pub fn make_request(control: &str, data: &str) -> String {
return data::REQUEST_BODY.replace("{1}", control).replace("{2}", data);
}
......@@ -270,13 +284,19 @@ fn main() -> std::io::Result<()> {
if let Value::Object(map) = &parsed {
for (key, value) in map {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let new_node: data::Node = data::Node {
id: key.to_string(),
chan: tx.clone(),
chan_s: tx,
chan_r: rx2,
};
let new_com: data::Com = data::Com {
chan_s: tx2,
chan_r: rx,
};
if let Value::Array(_array) = value {
node::make_node(Arc::clone(&barrier), key.to_string() , value.clone(), rx, flags.clone());
node::make_node(Arc::clone(&barrier), key.to_string() , value.clone(), new_com, flags.clone());
}
nodes.push(new_node);
}
......@@ -289,4 +309,3 @@ fn main() -> std::io::Result<()> {
Ok(())
}
//https://github.com/pepperoni21/ollama-rs
......@@ -17,7 +17,9 @@ use std::sync::{
Arc
};
use crate::data;
use crate::data::{
self
};
const BUFF_SIZE: usize = 4096;
const MODEL: &str = "mymist:latest";
......@@ -29,12 +31,12 @@ const LOCALHOST: &str = "http://localhost";
* @param barrier: Arc<Barrier> barrier to sync the threads
* @param node_id: String node id
* @param array: Value array of the node
* @param rx: std::sync::mpsc::Receiver<Value> receiver to comunicate from the master thread
* @param chanel: data::Com channel to comunicate with the node
* @param flags: data::Flags flags
*
* @return
*/
pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std::sync::mpsc::Receiver<Value>, flags: data::Flags) {
pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, chanel: data::Com, flags: data::Flags) {
if flags.debug == true {
println!("|----------------make_node: {}----------------|", node_id);
......@@ -91,8 +93,8 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
.expect("failed to make model available to the container");
if flags.bypass == false {
println!("{} -> arbitrary wait for the container to init model (30s)", node_id);
thread::sleep(time::Duration::from_secs(30));
println!("{} -> arbitrary wait for the container to init model", node_id);
thread::sleep(time::Duration::from_secs(5));
}
Command::new("sh")
.arg("-c")
......@@ -107,6 +109,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
}
let node_socket = UdpSocket::bind(array[0].get("node").unwrap().as_str().unwrap()).expect("Could not bind node address");
let node_port = node_socket.local_addr().unwrap().port();
let mut comm_vec: Vec<SocketAddr> = Vec::new();
let llm = Ollama::new(LOCALHOST.to_string(), llm_string.split(":").collect::<Vec<&str>>()[1].parse::<u16>().unwrap());
......@@ -137,7 +140,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
barrier.wait();
// node_loop(barrier, node_id, rx, node_socket, llm, array[2].get("neighbours"), debug).unwrap();
node_loop(barrier, node_id, rx, node_socket, llm, comm_vec, flags.debug).unwrap();
node_loop(barrier, node_id, chanel, node_socket, llm, comm_vec, flags.debug).unwrap();
});
}
......@@ -147,7 +150,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
*
* @param barrier: Arc<Barrier> barrier to sync the threads
* @param node_id: String node id
* @param rx: std::sync::mpsc::Receiver<Value> receiver to comunicate from the master thread
* @param chanel: data::Com channel to comunicate with the node
* @param node_socket: std::net::UdpSocket node socket
* @param llm: Ollama Ollama instance
* @param comm_vec: Vec<SocketAddr> vector of neighbours
......@@ -155,18 +158,21 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
*
* @return std::io::Result<()>
*/
fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Receiver<Value>, node_socket: std::net::UdpSocket, llm: Ollama,comm_vec: Vec<SocketAddr>, debug: bool) -> std::io::Result<()> {
fn node_loop( barrier: Arc<Barrier>, node_id: String, chanel: data::Com, node_socket: std::net::UdpSocket, llm: Ollama,comm_vec: Vec<SocketAddr>, debug: bool) -> std::io::Result<()> {
let mut msg_received = false;
let mut msg_master = false;
let mut buf = [0; BUFF_SIZE];
let mut data: data::Message = data::Message { id: "init".to_string(), message: "".to_string() };
let mut response: data::Message = data::Message { id: "".to_string(), message: "".to_string() };
// let mut data: data::Message = data::Message { message: "".to_string() };
// let mut response: data::Message = data::Message { message: "".to_string() };
let mut data: String = "".to_string();
let mut response: String = "".to_string();
let start_time = Utc::now().timestamp();
let rt = Runtime::new().unwrap();
loop {
// listen to master
let elapsed = Utc::now().timestamp() - start_time;
match rx.try_recv() {
match chanel.chan_r.try_recv() {
Ok(Value::String(msg)) => {
if debug == true {
println!("{} -> {}", node_id, msg);
......@@ -182,15 +188,16 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
}
match first.trim().replace(" ", "").as_str() {
// "send" => {
// msg_received = true;
// data.message = rest.to_string();
"send" => {
msg_received = true;
msg_master = true;
data = format!("{}{}{}",rest.to_string(), ". You are ", node_id);
// if debug == true {
// println!("{} : send received -> {}", node_id, data.message);
// }
if debug == true {
println!("{} : send received -> {}", node_id, data);
}
// },
},
"quit" => {
drop(node_socket);
if cfg!(target_os = "windows") {
......@@ -218,7 +225,7 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
},
_ => {
// println!("{} : Invalid command: {}", node_id, first);
data.message = msg.to_string();
data= msg.to_string();
msg_received = true;
}
......@@ -249,10 +256,8 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
let json_end = buf.iter().position(|&b| b == b'}');
if let Some(end) = json_end {
let json_slice = &buf[..=end];
data = serde_json::from_slice(json_slice).unwrap();
if debug == true {
println!("{:?}:{} received -> {}", elapsed, node_id, data.message);
}
data = std::str::from_utf8(json_slice).unwrap().to_string();
msg_received = true;
} else {
println!("Invalid JSON data");
......@@ -269,32 +274,67 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
//listen to neighbours
if msg_received == true {
if debug == true {
println!("{:?}:{} request -> {}", elapsed, node_id, data.message);
println!("{:?}:{} request -> {}", elapsed, node_id, data);
}
response.message = llm.generate(GenerationRequest::new(MODEL.to_string(), data.message.clone())).await.clone().unwrap().response;
println!("{:?}: {} response -> {}", elapsed, node_id, response.message);
response.id = node_id.clone();
response = llm.generate(GenerationRequest::new(MODEL.to_string(), data.clone())).await.clone().unwrap().response;
if debug == true {
println!("{:?}:{} response -> {}", elapsed, node_id, response.message);
println!("{:?}:{} response -> {}", elapsed, node_id, response);
}
let send = serde_json::to_string(&response).unwrap();
for comm in comm_vec.iter() {
let comm_str = comm.to_string();
let addr = comm_str.trim_matches(|c| c == '[' || c == ']' || c == '\"');
println!("{:?}:{} sending to -> {}", elapsed, node_id, addr);
match node_socket.send_to(send.as_bytes(), addr){
Ok(_) => {
if debug == true {
println!("{:?}:{} sent -> {}", elapsed, node_id, send);
}
},
Err(e) => {
println!("{} : Error: {}", node_id, e);
let mut value: serde_json::Value = serde_json::Value::Null;
let mut port_d: u32 = 1;
match serde_json::from_str(&response) {
Ok(v) => {
value = v;
port_d = value["destination"].as_str().unwrap().parse::<u32>().unwrap();
if debug == true {
println!("{:?}:{} destination -> {}", elapsed, node_id, port_d);
}
},
Err(_) => {
println!("{} : Error: parsing response from LLM {}", node_id, response);
}
}
if port_d == 1 {
println!("{:?}:{} Sending to master", elapsed, node_id);
chanel.chan_s.send(serde_json::Value::String(response.to_string())).unwrap();
} else {
for comm in comm_vec.iter() {
let comm_str = comm.to_string();
let addr = comm_str.trim_matches(|c| c == '[' || c == ']' || c == '\"');
let port = addr.split(":").collect::<Vec<&str>>()[1].parse::<u32>().unwrap();
if port == port_d {
println!("{:?}:{} sending to -> {}", elapsed, node_id, addr);
value["origin"] = serde_json::Value::String(node_socket.local_addr().unwrap().to_string().split(":").collect::<Vec<&str>>()[1].to_string());
if msg_master == true {
value["identity_master"] = serde_json::Value::String(node_id.to_string());
}
value["identity"] = serde_json::Value::String(node_id.clone());
response = value.to_string();
let send = serde_json::to_string(&response).unwrap();
match node_socket.send_to(send.as_bytes(), addr){
Ok(_) => {
if debug == true {
println!("{:?}:{} sent -> {}", elapsed, node_id, send);
}
},
Err(e) => {
println!("{} : Error: {}", node_id, e);
}
}
}
}
}
}
msg_received = false;
});
}
......