Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Commits on Source (2)
...@@ -9,5 +9,5 @@ edition = "2021" ...@@ -9,5 +9,5 @@ edition = "2021"
serde_json ="1.0" serde_json ="1.0"
ollama-rs = "0.1.7" ollama-rs = "0.1.7"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.196", features = ["derive"] } serde = "1.0.196"
chrono = "0.4.19" chrono = "0.4.19"
\ No newline at end of file
...@@ -2,20 +2,31 @@ ...@@ -2,20 +2,31 @@
# To build a new Modelfile based on this one, replace the FROM line with: # To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest # FROM mistral:latest
FROM /usr/share/ollama/.ollama/models/blobs/sha256-e8a35b5937a5e6d5c35d1f2a15f161e07eefe5e5bb0a3cdd42998ee79b057730 FROM mistral:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]""" TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]" PARAMETER stop "[INST]"
PARAMETER stop "[/INST]" PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent # temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0 PARAMETER temperature 0.2
# context window size in token # context window size in token
PARAMETER num_ctx 4096 PARAMETER num_ctx 4096
# system message to specify the behavior of the llm # system message to specify the behavior of the llm
SYSTEM """ SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate bye formating your answers as follow: You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
'{"origin":<num port>, "control": <your field to discuss with other router>, "data": <data to relay>}' {"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Do not respond anithing exept the formated responsse as depicted before. Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
""" """
\ No newline at end of file
# Modelfile generated by "ollama show"
# To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest
FROM phi:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]"
PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0.2
# context window size in token
PARAMETER num_ctx 4096
# system message to specify the behavior of the llm
SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
{"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
"""
\ No newline at end of file
# Modelfile generated by "ollama show"
# To build a new Modelfile based on this one, replace the FROM line with:
# FROM mistral:latest
FROM solar:latest
TEMPLATE """[INST] {{ .System }} {{ .Prompt }} [/INST]"""
PARAMETER stop "[INST]"
PARAMETER stop "[/INST]"
# temperature, higher is more creative, lower is mor coherent
PARAMETER temperature 0.2
# context window size in token
PARAMETER num_ctx 4096
# system message to specify the behavior of the llm
SYSTEM """
You are a router and your objective is to relay data to it's destination. You will only communicate in JSON by formating your answers as follow:
{"destination"="<num port>", "origin":"<num port>", "control":"<your field to discuss with other router>", "data":"<data to relay>", "identity":"<Your identity>", "source"="<identity of sender>"}
Only reply with the message template.
Do not command the incomming message or your answers.
Do not alter "origin" and "data".
You can use but not change the "origin" field as a clue for destination in later communication.
You can use the "destination" field to signale the next destination of the message, if the message need to be deliver to you, use 1 as a port.
You can use "control" to communicate the routing intention to the next router
You can read but not change the identity field to learn your id.
You can read the source field to learn the next hop to reach someone.
You may change the source field if the origin number is "1".
If the target is you, send to "1".
Mark is at port "8080".
In case of error, relay the message to the user.
"""
\ No newline at end of file
use serde_json::Value; use serde_json::Value;
use serde::{ // use serde::{
Serialize, // Serialize,
Deserialize // Deserialize,
}; // };
pub const REQUEST_BODY: &str = "{\"origin\": \"-1\", \"control\": \"{1}\", \"data\": \"{2}\"}"; pub const REQUEST_BODY: &str = "{\"destination\":\"\", \"origin\": \"1\", \"control\": \"{1}\", \"data\": \"{2}\", \"identity\":\"{3}\"}, \"sender_identity\":\"{4}\"";
/* /*
* Node struct * Node struct
...@@ -14,7 +14,13 @@ pub const REQUEST_BODY: &str = "{\"origin\": \"-1\", \"control\": \"{1}\", \"dat ...@@ -14,7 +14,13 @@ pub const REQUEST_BODY: &str = "{\"origin\": \"-1\", \"control\": \"{1}\", \"dat
*/ */
pub struct Node { pub struct Node {
pub id: String, pub id: String,
pub chan: std::sync::mpsc::Sender<Value> pub chan_s: std::sync::mpsc::Sender<Value>,
pub chan_r: std::sync::mpsc::Receiver<Value>
}
pub struct Com {
pub chan_s: std::sync::mpsc::Sender<Value>,
pub chan_r: std::sync::mpsc::Receiver<Value>
} }
/* /*
...@@ -23,11 +29,11 @@ pub struct Node { ...@@ -23,11 +29,11 @@ pub struct Node {
* @param id: String message id * @param id: String message id
* @param message: String message * @param message: String message
*/ */
#[derive(Serialize, Deserialize, Debug, Clone)] // #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message { // pub struct Message {
pub id: String, // // pub id: String,
pub message: String // pub message: String
} // }
#[derive(Clone)] #[derive(Clone)]
pub struct Flags { pub struct Flags {
......
...@@ -60,7 +60,7 @@ fn print_help() { ...@@ -60,7 +60,7 @@ fn print_help() {
println!("exit => exit program"); println!("exit => exit program");
println!("list => list avaible nodes"); println!("list => list avaible nodes");
println!("<node> quit => quit node"); println!("<node> quit => quit node");
println!("command:\n send <message> => send message to llm\n quit => quit node"); println!("command:\n send <message> => send message to llm\n quit => quit node \n sendjson <json> => send json to node");
println!("help => display this message"); println!("help => display this message");
println!("----------------------------------------"); println!("----------------------------------------");
} }
...@@ -118,7 +118,7 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl ...@@ -118,7 +118,7 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
"exit" => { "exit" => {
println!("exiting the program"); println!("exiting the program");
for node in nodes.iter() { for node in nodes.iter() {
node.chan.send(Value::String("quit".to_string())).unwrap(); node.chan_s.send(Value::String("quit".to_string())).unwrap();
} }
return Ok(()); return Ok(());
} }
...@@ -139,8 +139,13 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl ...@@ -139,8 +139,13 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
if flags.debug == true { if flags.debug == true {
println!("control: {}, data: {}", control, data); println!("control: {}, data: {}", control, data);
} }
if control == "sendjson" {
node.chan_s.send(serde_json::Value::String(make_request(data, command))).unwrap();
} else {
node.chan_s.send(serde_json::Value::String(command.to_string())).unwrap();
}
did_something = true; did_something = true;
node.chan.send(serde_json::Value::String(make_request(control, data))).unwrap();
// check if the command is quit, need to remove the node if true // check if the command is quit, need to remove the node if true
if command == "quit" { if command == "quit" {
index_to_remove = Some(index); index_to_remove = Some(index);
...@@ -162,6 +167,15 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl ...@@ -162,6 +167,15 @@ fn main_loop(barrier: Arc<Barrier>, nodes: &mut Vec<data::Node>, flags: data::Fl
} }
Err(_) => print_help(), Err(_) => print_help(),
} }
for node in nodes.iter() {
match node.chan_r.try_recv() {
Ok(value) => {
println!("{}: {}", node.id, value.to_string());
}
Err(_) => (),
}
}
} }
} }
...@@ -225,7 +239,7 @@ fn get_args(args: std::env::Args, arg: &mut String, flags: &mut data::Flags) -> ...@@ -225,7 +239,7 @@ fn get_args(args: std::env::Args, arg: &mut String, flags: &mut data::Flags) ->
Ok(()) Ok(())
} }
fn make_request(control: &str, data: &str) -> String { pub fn make_request(control: &str, data: &str) -> String {
return data::REQUEST_BODY.replace("{1}", control).replace("{2}", data); return data::REQUEST_BODY.replace("{1}", control).replace("{2}", data);
} }
...@@ -270,13 +284,19 @@ fn main() -> std::io::Result<()> { ...@@ -270,13 +284,19 @@ fn main() -> std::io::Result<()> {
if let Value::Object(map) = &parsed { if let Value::Object(map) = &parsed {
for (key, value) in map { for (key, value) in map {
let (tx, rx) = channel(); let (tx, rx) = channel();
let (tx2, rx2) = channel();
let new_node: data::Node = data::Node { let new_node: data::Node = data::Node {
id: key.to_string(), id: key.to_string(),
chan: tx.clone(), chan_s: tx,
chan_r: rx2,
};
let new_com: data::Com = data::Com {
chan_s: tx2,
chan_r: rx,
}; };
if let Value::Array(_array) = value { if let Value::Array(_array) = value {
node::make_node(Arc::clone(&barrier), key.to_string() , value.clone(), rx, flags.clone()); node::make_node(Arc::clone(&barrier), key.to_string() , value.clone(), new_com, flags.clone());
} }
nodes.push(new_node); nodes.push(new_node);
} }
...@@ -289,4 +309,3 @@ fn main() -> std::io::Result<()> { ...@@ -289,4 +309,3 @@ fn main() -> std::io::Result<()> {
Ok(()) Ok(())
} }
//https://github.com/pepperoni21/ollama-rs
...@@ -17,7 +17,9 @@ use std::sync::{ ...@@ -17,7 +17,9 @@ use std::sync::{
Arc Arc
}; };
use crate::data; use crate::data::{
self
};
const BUFF_SIZE: usize = 4096; const BUFF_SIZE: usize = 4096;
const MODEL: &str = "mymist:latest"; const MODEL: &str = "mymist:latest";
...@@ -29,12 +31,12 @@ const LOCALHOST: &str = "http://localhost"; ...@@ -29,12 +31,12 @@ const LOCALHOST: &str = "http://localhost";
* @param barrier: Arc<Barrier> barrier to sync the threads * @param barrier: Arc<Barrier> barrier to sync the threads
* @param node_id: String node id * @param node_id: String node id
* @param array: Value array of the node * @param array: Value array of the node
* @param rx: std::sync::mpsc::Receiver<Value> receiver to comunicate from the master thread * @param chanel: data::Com channel to comunicate with the node
* @param flags: data::Flags flags * @param flags: data::Flags flags
* *
* @return * @return
*/ */
pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std::sync::mpsc::Receiver<Value>, flags: data::Flags) { pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, chanel: data::Com, flags: data::Flags) {
if flags.debug == true { if flags.debug == true {
println!("|----------------make_node: {}----------------|", node_id); println!("|----------------make_node: {}----------------|", node_id);
...@@ -91,8 +93,8 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std: ...@@ -91,8 +93,8 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
.expect("failed to make model available to the container"); .expect("failed to make model available to the container");
if flags.bypass == false { if flags.bypass == false {
println!("{} -> arbitrary wait for the container to init model (30s)", node_id); println!("{} -> arbitrary wait for the container to init model", node_id);
thread::sleep(time::Duration::from_secs(30)); thread::sleep(time::Duration::from_secs(5));
} }
Command::new("sh") Command::new("sh")
.arg("-c") .arg("-c")
...@@ -107,6 +109,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std: ...@@ -107,6 +109,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
} }
let node_socket = UdpSocket::bind(array[0].get("node").unwrap().as_str().unwrap()).expect("Could not bind node address"); let node_socket = UdpSocket::bind(array[0].get("node").unwrap().as_str().unwrap()).expect("Could not bind node address");
let node_port = node_socket.local_addr().unwrap().port();
let mut comm_vec: Vec<SocketAddr> = Vec::new(); let mut comm_vec: Vec<SocketAddr> = Vec::new();
let llm = Ollama::new(LOCALHOST.to_string(), llm_string.split(":").collect::<Vec<&str>>()[1].parse::<u16>().unwrap()); let llm = Ollama::new(LOCALHOST.to_string(), llm_string.split(":").collect::<Vec<&str>>()[1].parse::<u16>().unwrap());
...@@ -137,7 +140,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std: ...@@ -137,7 +140,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
barrier.wait(); barrier.wait();
// node_loop(barrier, node_id, rx, node_socket, llm, array[2].get("neighbours"), debug).unwrap(); // node_loop(barrier, node_id, rx, node_socket, llm, array[2].get("neighbours"), debug).unwrap();
node_loop(barrier, node_id, rx, node_socket, llm, comm_vec, flags.debug).unwrap(); node_loop(barrier, node_id, chanel, node_socket, llm, comm_vec, flags.debug).unwrap();
}); });
} }
...@@ -147,7 +150,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std: ...@@ -147,7 +150,7 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
* *
* @param barrier: Arc<Barrier> barrier to sync the threads * @param barrier: Arc<Barrier> barrier to sync the threads
* @param node_id: String node id * @param node_id: String node id
* @param rx: std::sync::mpsc::Receiver<Value> receiver to comunicate from the master thread * @param chanel: data::Com channel to comunicate with the node
* @param node_socket: std::net::UdpSocket node socket * @param node_socket: std::net::UdpSocket node socket
* @param llm: Ollama Ollama instance * @param llm: Ollama Ollama instance
* @param comm_vec: Vec<SocketAddr> vector of neighbours * @param comm_vec: Vec<SocketAddr> vector of neighbours
...@@ -155,18 +158,21 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std: ...@@ -155,18 +158,21 @@ pub fn make_node(barrier: Arc<Barrier>, node_id: String , array: Value, rx: std:
* *
* @return std::io::Result<()> * @return std::io::Result<()>
*/ */
fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Receiver<Value>, node_socket: std::net::UdpSocket, llm: Ollama,comm_vec: Vec<SocketAddr>, debug: bool) -> std::io::Result<()> { fn node_loop( barrier: Arc<Barrier>, node_id: String, chanel: data::Com, node_socket: std::net::UdpSocket, llm: Ollama,comm_vec: Vec<SocketAddr>, debug: bool) -> std::io::Result<()> {
let mut msg_received = false; let mut msg_received = false;
let mut msg_master = false;
let mut buf = [0; BUFF_SIZE]; let mut buf = [0; BUFF_SIZE];
let mut data: data::Message = data::Message { id: "init".to_string(), message: "".to_string() }; // let mut data: data::Message = data::Message { message: "".to_string() };
let mut response: data::Message = data::Message { id: "".to_string(), message: "".to_string() }; // let mut response: data::Message = data::Message { message: "".to_string() };
let mut data: String = "".to_string();
let mut response: String = "".to_string();
let start_time = Utc::now().timestamp(); let start_time = Utc::now().timestamp();
let rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
loop { loop {
// listen to master // listen to master
let elapsed = Utc::now().timestamp() - start_time; let elapsed = Utc::now().timestamp() - start_time;
match rx.try_recv() { match chanel.chan_r.try_recv() {
Ok(Value::String(msg)) => { Ok(Value::String(msg)) => {
if debug == true { if debug == true {
println!("{} -> {}", node_id, msg); println!("{} -> {}", node_id, msg);
...@@ -182,15 +188,16 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei ...@@ -182,15 +188,16 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
} }
match first.trim().replace(" ", "").as_str() { match first.trim().replace(" ", "").as_str() {
// "send" => { "send" => {
// msg_received = true; msg_received = true;
// data.message = rest.to_string(); msg_master = true;
data = format!("{}{}{}",rest.to_string(), ". You are ", node_id);
// if debug == true { if debug == true {
// println!("{} : send received -> {}", node_id, data.message); println!("{} : send received -> {}", node_id, data);
// } }
// }, },
"quit" => { "quit" => {
drop(node_socket); drop(node_socket);
if cfg!(target_os = "windows") { if cfg!(target_os = "windows") {
...@@ -218,7 +225,7 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei ...@@ -218,7 +225,7 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
}, },
_ => { _ => {
// println!("{} : Invalid command: {}", node_id, first); // println!("{} : Invalid command: {}", node_id, first);
data.message = msg.to_string(); data= msg.to_string();
msg_received = true; msg_received = true;
} }
...@@ -249,10 +256,8 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei ...@@ -249,10 +256,8 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
let json_end = buf.iter().position(|&b| b == b'}'); let json_end = buf.iter().position(|&b| b == b'}');
if let Some(end) = json_end { if let Some(end) = json_end {
let json_slice = &buf[..=end]; let json_slice = &buf[..=end];
data = serde_json::from_slice(json_slice).unwrap(); data = std::str::from_utf8(json_slice).unwrap().to_string();
if debug == true {
println!("{:?}:{} received -> {}", elapsed, node_id, data.message);
}
msg_received = true; msg_received = true;
} else { } else {
println!("Invalid JSON data"); println!("Invalid JSON data");
...@@ -269,32 +274,67 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei ...@@ -269,32 +274,67 @@ fn node_loop( barrier: Arc<Barrier>, node_id: String, rx: std::sync::mpsc::Recei
//listen to neighbours //listen to neighbours
if msg_received == true { if msg_received == true {
if debug == true { if debug == true {
println!("{:?}:{} request -> {}", elapsed, node_id, data.message); println!("{:?}:{} request -> {}", elapsed, node_id, data);
} }
response.message = llm.generate(GenerationRequest::new(MODEL.to_string(), data.message.clone())).await.clone().unwrap().response;
println!("{:?}: {} response -> {}", elapsed, node_id, response.message); response = llm.generate(GenerationRequest::new(MODEL.to_string(), data.clone())).await.clone().unwrap().response;
response.id = node_id.clone();
if debug == true { if debug == true {
println!("{:?}:{} response -> {}", elapsed, node_id, response.message); println!("{:?}:{} response -> {}", elapsed, node_id, response);
} }
let send = serde_json::to_string(&response).unwrap(); let mut value: serde_json::Value = serde_json::Value::Null;
for comm in comm_vec.iter() { let mut port_d: u32 = 1;
let comm_str = comm.to_string();
let addr = comm_str.trim_matches(|c| c == '[' || c == ']' || c == '\"'); match serde_json::from_str(&response) {
println!("{:?}:{} sending to -> {}", elapsed, node_id, addr); Ok(v) => {
match node_socket.send_to(send.as_bytes(), addr){ value = v;
Ok(_) => { port_d = value["destination"].as_str().unwrap().parse::<u32>().unwrap();
if debug == true { if debug == true {
println!("{:?}:{} sent -> {}", elapsed, node_id, send); println!("{:?}:{} destination -> {}", elapsed, node_id, port_d);
}
},
Err(e) => {
println!("{} : Error: {}", node_id, e);
} }
},
Err(_) => {
println!("{} : Error: parsing response from LLM {}", node_id, response);
}
}
if port_d == 1 {
println!("{:?}:{} Sending to master", elapsed, node_id);
chanel.chan_s.send(serde_json::Value::String(response.to_string())).unwrap();
} else {
for comm in comm_vec.iter() {
let comm_str = comm.to_string();
let addr = comm_str.trim_matches(|c| c == '[' || c == ']' || c == '\"');
let port = addr.split(":").collect::<Vec<&str>>()[1].parse::<u32>().unwrap();
if port == port_d {
println!("{:?}:{} sending to -> {}", elapsed, node_id, addr);
value["origin"] = serde_json::Value::String(node_socket.local_addr().unwrap().to_string().split(":").collect::<Vec<&str>>()[1].to_string());
if msg_master == true {
value["identity_master"] = serde_json::Value::String(node_id.to_string());
}
value["identity"] = serde_json::Value::String(node_id.clone());
response = value.to_string();
let send = serde_json::to_string(&response).unwrap();
match node_socket.send_to(send.as_bytes(), addr){
Ok(_) => {
if debug == true {
println!("{:?}:{} sent -> {}", elapsed, node_id, send);
}
},
Err(e) => {
println!("{} : Error: {}", node_id, e);
}
}
}
} }
} }
} }
msg_received = false; msg_received = false;
}); });
} }
......