wip
This commit is contained in:
parent
78ce899036
commit
097653f175
|
@ -364,6 +364,15 @@ version = "1.9.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
|
||||
|
||||
[[package]]
|
||||
name = "cbor4ii"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.6"
|
||||
|
@ -1945,6 +1954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"cbor4ii",
|
||||
"futures",
|
||||
"futures-bounded",
|
||||
"futures-timer",
|
||||
|
@ -1952,6 +1962,7 @@ dependencies = [
|
|||
"libp2p-identity",
|
||||
"libp2p-swarm",
|
||||
"rand",
|
||||
"serde",
|
||||
"smallvec",
|
||||
"tracing",
|
||||
"void",
|
||||
|
|
10
Cargo.toml
10
Cargo.toml
|
@ -8,13 +8,13 @@ publish = false
|
|||
[package.metadata.release]
|
||||
release = false
|
||||
|
||||
[[bin]] # Bin to run the server
|
||||
name = "server"
|
||||
path = "app/server.rs"
|
||||
#
|
||||
[[bin]] # Bin to run the server
|
||||
name = "cli"
|
||||
path = "app/cli.rs"
|
||||
#
|
||||
# [[bin]] # Bin to run the client
|
||||
# name = "client"
|
||||
# path = "app/client.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
|
@ -23,7 +23,7 @@ cryptoxide = "0.4.4"
|
|||
ed25519-dalek = { version = "2.1.1", features = ["digest"] }
|
||||
futures = "0.3.30"
|
||||
hex = "0.4.3"
|
||||
libp2p = { version = "0.54.1", features = ["tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic", "identify", "ping", "relay", "dcutr", "rendezvous", "kad", "request-response"] }
|
||||
libp2p = { version = "0.54.1", features = ["tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic", "identify", "ping", "relay", "dcutr", "rendezvous", "kad", "request-response", "cbor"] }
|
||||
libp2p-identity = { version = "0.2.9", features = ["ed25519", "peerid"] }
|
||||
owo-colors = "4.1.0"
|
||||
quick-protobuf = "0.8.1"
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
use chrono::prelude::DateTime;
|
||||
use cryptoxide::ed25519;
|
||||
use uuid::Uuid;
|
||||
|
||||
use mr_wolf_tonic::clock_client::ClockClient;
|
||||
use mr_wolf_tonic::{AddReq, TimeReq};
|
||||
|
||||
pub mod mr_wolf_tonic {
|
||||
tonic::include_proto!("mrwolf");
|
||||
}
|
||||
|
||||
use mr_wolf::types::{mk_message, Witness};
|
||||
|
||||
fn mk_witness(keypair: [u8; 64], iou: u64) -> Witness {
|
||||
let nonce = Uuid::new_v4().as_bytes().clone();
|
||||
let message = mk_message(iou, &nonce);
|
||||
let sig = ed25519::signature(&message, &keypair);
|
||||
Witness { iou, nonce, sig }
|
||||
}
|
||||
|
||||
fn mk_request(keypair: [u8; 64], id: &str, iou: u64) -> tonic::Request<TimeReq> {
|
||||
let witness = mk_witness(keypair, iou);
|
||||
tonic::Request::new(TimeReq {
|
||||
id: id.to_string(),
|
||||
iou: witness.iou,
|
||||
nonce: witness.nonce.into_iter().collect::<Vec<u8>>(),
|
||||
sig: witness.sig.into_iter().collect::<Vec<u8>>(),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut client = ClockClient::connect("http://0.0.0.0:50051").await?;
|
||||
// Subscriber generates secret and declares pub_key
|
||||
let prv_key = [0u8; 32]; // private key only for example !
|
||||
let (keypair, pub_key_arr) = ed25519::keypair(&prv_key);
|
||||
let pub_key = pub_key_arr.into_iter().collect::<Vec<u8>>();
|
||||
|
||||
// Provider and subscriber agree terms.
|
||||
// Provider adds details to service
|
||||
// And hands back subscriber id
|
||||
let add_req = tonic::Request::new(AddReq {
|
||||
pot: 20000,
|
||||
pub_key,
|
||||
});
|
||||
|
||||
let add_res = client.add(add_req).await?;
|
||||
// Subscriber id
|
||||
let id = add_res.into_inner().id;
|
||||
|
||||
// Make a request
|
||||
let request = mk_request(keypair, &id, 1);
|
||||
let response = client.whats_the_time(request).await?;
|
||||
let start = response.into_inner().message;
|
||||
let start_time = DateTime::parse_from_str(&start, "New %+").unwrap();
|
||||
println!("ID={} UTC={:?}", id, start);
|
||||
for ii in 2..20000 {
|
||||
let request = mk_request(keypair, &id, ii);
|
||||
let response = client.whats_the_time(request).await?;
|
||||
let _utc = response.into_inner().message;
|
||||
}
|
||||
let request = mk_request(keypair, &id, 20000);
|
||||
let response = client.whats_the_time(request).await?;
|
||||
let end = response.into_inner().message;
|
||||
println!("ID={} UTC={:?}", id, end);
|
||||
let end_time = DateTime::parse_from_str(&end, "New %+").unwrap();
|
||||
println!(
|
||||
"ID={} DIFF={:?}",
|
||||
id,
|
||||
end_time.signed_duration_since(start_time)
|
||||
);
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
use std::{error::Error, time::Duration};
|
||||
|
||||
use cll2v0::messages::{MyRequest, MyResponse};
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
mdns, noise,
|
||||
request_response::{self, ProtocolSupport},
|
||||
swarm::{NetworkBehaviour, SwarmEvent},
|
||||
tcp, yamux, StreamProtocol, Swarm,
|
||||
};
|
||||
use libp2p_identity::ed25519::Keypair;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
// We create a custom network behaviour
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct MyBehaviour {
|
||||
mdns: mdns::tokio::Behaviour,
|
||||
req_res: request_response::cbor::Behaviour<MyRequest, MyResponse>,
|
||||
}
|
||||
|
||||
fn sign(kp: &Keypair, msg: &Vec<u8>) -> String {
|
||||
hex::encode(kp.sign(msg))
|
||||
}
|
||||
|
||||
fn mk_swarm(
|
||||
kp: &Keypair,
|
||||
protocol_support: ProtocolSupport,
|
||||
) -> Result<Swarm<MyBehaviour>, Box<dyn Error>> {
|
||||
let swarm = libp2p::SwarmBuilder::with_existing_identity(kp.clone().into())
|
||||
.with_tokio()
|
||||
.with_tcp(
|
||||
tcp::Config::default(),
|
||||
noise::Config::new,
|
||||
yamux::Config::default,
|
||||
)?
|
||||
.with_behaviour(|key| {
|
||||
let mdns =
|
||||
mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
|
||||
let protocol = [(StreamProtocol::new("/sign-me/1"), protocol_support)];
|
||||
let config = request_response::Config::default();
|
||||
let req_res =
|
||||
request_response::cbor::Behaviour::<MyRequest, MyResponse>::new(protocol, config);
|
||||
Ok(MyBehaviour { req_res, mdns })
|
||||
})?
|
||||
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
|
||||
.build();
|
||||
Ok(swarm)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let body: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0];
|
||||
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.try_init();
|
||||
|
||||
let keypair = libp2p_identity::ed25519::Keypair::generate();
|
||||
let key = keypair.public().to_bytes();
|
||||
|
||||
let is_client = match std::env::args().nth(1) {
|
||||
Some(arg) => {
|
||||
println!("Arg {}", arg);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
let protocol_support = if is_client {
|
||||
ProtocolSupport::Outbound
|
||||
} else {
|
||||
ProtocolSupport::Inbound
|
||||
};
|
||||
let mut swarm = mk_swarm(&keypair, protocol_support)?;
|
||||
// Tell the swarm to listen on all interfaces and a random, OS-assigned
|
||||
// port.
|
||||
swarm.listen_on("/ip4/192.168.1.51/tcp/0".parse()?)?;
|
||||
println!("ORISIG : {}", sign(&keypair, &body));
|
||||
|
||||
loop {
|
||||
match swarm.select_next_some().await {
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
|
||||
for (peer_id, multiaddr) in list {
|
||||
println!("add peer {:?}, {:?}", peer_id, multiaddr.clone());
|
||||
if is_client {
|
||||
let _ = swarm.dial(multiaddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
|
||||
for (peer_id, _multiaddr) in list {
|
||||
println!("mDNS discover peer has expired: {peer_id}");
|
||||
// swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::ReqRes(request_response::Event::Message {
|
||||
peer: _peer,
|
||||
message:
|
||||
libp2p::request_response::Message::Request {
|
||||
request, channel, ..
|
||||
},
|
||||
})) => {
|
||||
// println!("Req : {:?} {:?} {:?}", peer, channel, hex::encode(request.body.clone()), );
|
||||
println!("REQSIG : {}", hex::encode(request.sig.clone()),);
|
||||
let _ = swarm.behaviour_mut().req_res.send_response(
|
||||
channel,
|
||||
MyResponse {
|
||||
sig: keypair.sign(&request.body),
|
||||
},
|
||||
);
|
||||
}
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::ReqRes(request_response::Event::Message {
|
||||
peer: _peer,
|
||||
message: libp2p::request_response::Message::Response { response, .. },
|
||||
})) => {
|
||||
// println!("response : {:?} {:?} {:?}", peer, request_id, "" );
|
||||
println!("RESSIG : {}", hex::encode(response.sig.clone()),);
|
||||
}
|
||||
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
||||
//connection_id, endpoint, num_established, concurrent_dial_errors, established_in } => {
|
||||
println!("ConnectionEstablisted {}", peer_id);
|
||||
if is_client {
|
||||
swarm.behaviour_mut().req_res.send_request(
|
||||
&peer_id,
|
||||
MyRequest {
|
||||
key: key.clone(),
|
||||
body: body.clone(),
|
||||
sig: keypair.sign(&body),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
e => {
|
||||
println!("OTHER {:?}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
pub mod db;
|
||||
pub mod hash;
|
||||
pub mod keys;
|
||||
pub mod messages;
|
||||
pub mod protobuf;
|
||||
pub mod server;
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct MyRequest {
|
||||
pub key: [u8; 32],
|
||||
pub body: Vec<u8>,
|
||||
pub sig: Vec<u8>, // FIXME :: fixed length array not supported by serde [u8; 64],
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct MyResponse {
|
||||
// sig: [u8; 64],
|
||||
pub sig: Vec<u8>, // FIXME :: fixed length array not supported by serde [u8; 64],
|
||||
}
|
|
@ -0,0 +1,301 @@
|
|||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Integration tests for the `Behaviour`.
|
||||
|
||||
use std::{io, iter};
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::identity::PeerId;
|
||||
use libp2p::request_response;
|
||||
use libp2p::request_response::ProtocolSupport;
|
||||
use libp2p::swarm::{StreamProtocol, Swarm, SwarmEvent};
|
||||
// use libp2p::swarm_test::SwarmExt;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
async fn is_response_outbound() {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.try_init();
|
||||
let ping = Ping("ping".to_string().into_bytes());
|
||||
let offline_peer = PeerId::random();
|
||||
|
||||
let mut swarm1 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(
|
||||
[(
|
||||
StreamProtocol::new("/ping/1"),
|
||||
request_response::ProtocolSupport::Full,
|
||||
)],
|
||||
request_response::Config::default(),
|
||||
)
|
||||
});
|
||||
|
||||
let request_id1 = swarm1
|
||||
.behaviour_mut()
|
||||
.send_request(&offline_peer, ping.clone());
|
||||
|
||||
match swarm1
|
||||
.next_swarm_event()
|
||||
.await
|
||||
.try_into_behaviour_event()
|
||||
.unwrap()
|
||||
{
|
||||
request_response::Event::OutboundFailure {
|
||||
peer,
|
||||
request_id: req_id,
|
||||
error: _error,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(&offline_peer, &peer);
|
||||
assert_eq!(req_id, request_id1);
|
||||
}
|
||||
e => panic!("Peer: Unexpected event: {e:?}"),
|
||||
}
|
||||
|
||||
let request_id2 = swarm1.behaviour_mut().send_request(&offline_peer, ping);
|
||||
|
||||
assert!(!swarm1
|
||||
.behaviour()
|
||||
.is_pending_outbound(&offline_peer, &request_id1));
|
||||
assert!(swarm1
|
||||
.behaviour()
|
||||
.is_pending_outbound(&offline_peer, &request_id2));
|
||||
}
|
||||
|
||||
async fn ping_protocol() {
|
||||
let ping = Ping("ping".to_string().into_bytes());
|
||||
let pong = Pong("pong".to_string().into_bytes());
|
||||
|
||||
let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
|
||||
let cfg = request_response::Config::default();
|
||||
|
||||
let mut swarm1 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
|
||||
});
|
||||
let peer1_id = *swarm1.local_peer_id();
|
||||
let mut swarm2 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
|
||||
});
|
||||
let peer2_id = *swarm2.local_peer_id();
|
||||
|
||||
swarm1.listen().with_memory_addr_external().await;
|
||||
swarm2.connect(&mut swarm1).await;
|
||||
|
||||
let expected_ping = ping.clone();
|
||||
let expected_pong = pong.clone();
|
||||
|
||||
let peer1 = async move {
|
||||
loop {
|
||||
match swarm1.next_swarm_event().await.try_into_behaviour_event() {
|
||||
Ok(request_response::Event::Message {
|
||||
peer,
|
||||
message:
|
||||
request_response::Message::Request {
|
||||
request, channel, ..
|
||||
},
|
||||
..
|
||||
}) => {
|
||||
assert_eq!(&request, &expected_ping);
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
swarm1
|
||||
.behaviour_mut()
|
||||
.send_response(channel, pong.clone())
|
||||
.unwrap();
|
||||
}
|
||||
Ok(request_response::Event::ResponseSent { peer, .. }) => {
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
}
|
||||
Ok(e) => {
|
||||
panic!("Peer1: Unexpected event: {e:?}")
|
||||
}
|
||||
Err(..) => {}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let num_pings: u8 = rand::thread_rng().gen_range(1..100);
|
||||
|
||||
let peer2 = async {
|
||||
let mut count = 0;
|
||||
|
||||
let mut req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
|
||||
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));
|
||||
|
||||
loop {
|
||||
match swarm2
|
||||
.next_swarm_event()
|
||||
.await
|
||||
.try_into_behaviour_event()
|
||||
.unwrap()
|
||||
{
|
||||
request_response::Event::Message {
|
||||
peer,
|
||||
message:
|
||||
request_response::Message::Response {
|
||||
request_id,
|
||||
response,
|
||||
},
|
||||
..
|
||||
} => {
|
||||
count += 1;
|
||||
assert_eq!(&response, &expected_pong);
|
||||
assert_eq!(&peer, &peer1_id);
|
||||
assert_eq!(req_id, request_id);
|
||||
if count >= num_pings {
|
||||
return;
|
||||
} else {
|
||||
req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
|
||||
}
|
||||
}
|
||||
e => panic!("Peer2: Unexpected event: {e:?}"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async_std::task::spawn(Box::pin(peer1));
|
||||
peer2.await;
|
||||
}
|
||||
|
||||
async fn emits_inbound_connection_closed_failure() {
|
||||
let ping = Ping("ping".to_string().into_bytes());
|
||||
|
||||
let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
|
||||
let cfg = request_response::Config::default();
|
||||
|
||||
let mut swarm1 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
|
||||
});
|
||||
let peer1_id = *swarm1.local_peer_id();
|
||||
let mut swarm2 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
|
||||
});
|
||||
let peer2_id = *swarm2.local_peer_id();
|
||||
|
||||
swarm1.listen().with_memory_addr_external().await;
|
||||
swarm2.connect(&mut swarm1).await;
|
||||
|
||||
swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
|
||||
|
||||
// Wait for swarm 1 to receive request by swarm 2.
|
||||
let _channel = loop {
|
||||
futures::select!(
|
||||
event = swarm1.select_next_some() => match event {
|
||||
SwarmEvent::Behaviour(request_response::Event::Message {
|
||||
peer,
|
||||
message: request_response::Message::Request { request, channel, .. },
|
||||
..
|
||||
}) => {
|
||||
assert_eq!(&request, &ping);
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
break channel;
|
||||
},
|
||||
SwarmEvent::Behaviour(ev) => panic!("Peer1: Unexpected event: {ev:?}"),
|
||||
_ => {}
|
||||
},
|
||||
event = swarm2.select_next_some() => {
|
||||
if let SwarmEvent::Behaviour(ev) = event {
|
||||
panic!("Peer2: Unexpected event: {ev:?}");
|
||||
}
|
||||
}
|
||||
)
|
||||
};
|
||||
|
||||
// Drop swarm 2 in order for the connection between swarm 1 and 2 to close.
|
||||
drop(swarm2);
|
||||
|
||||
loop {
|
||||
match swarm1.select_next_some().await {
|
||||
SwarmEvent::Behaviour(request_response::Event::InboundFailure {
|
||||
error: request_response::InboundFailure::ConnectionClosed,
|
||||
..
|
||||
}) => break,
|
||||
SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {e:?}"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We expect the substream to be properly closed when response channel is dropped.
|
||||
/// Since the ping protocol used here expects a response, the sender considers this
|
||||
/// early close as a protocol violation which results in the connection being closed.
|
||||
/// If the substream were not properly closed when dropped, the sender would instead
|
||||
/// run into a timeout waiting for the response.
|
||||
async fn emits_inbound_connection_closed_if_channel_is_dropped() {
|
||||
let ping = Ping("ping".to_string().into_bytes());
|
||||
|
||||
let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
|
||||
let cfg = request_response::Config::default();
|
||||
|
||||
let mut swarm1 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
|
||||
});
|
||||
let peer1_id = *swarm1.local_peer_id();
|
||||
let mut swarm2 = Swarm::new_ephemeral(|_| {
|
||||
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
|
||||
});
|
||||
let peer2_id = *swarm2.local_peer_id();
|
||||
|
||||
swarm1.listen().with_memory_addr_external().await;
|
||||
swarm2.connect(&mut swarm1).await;
|
||||
|
||||
swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
|
||||
|
||||
// Wait for swarm 1 to receive request by swarm 2.
|
||||
let event = loop {
|
||||
futures::select!(
|
||||
event = swarm1.select_next_some() => {
|
||||
if let SwarmEvent::Behaviour(request_response::Event::Message {
|
||||
peer,
|
||||
message: request_response::Message::Request { request, channel, .. },
|
||||
..
|
||||
}) = event {
|
||||
assert_eq!(&request, &ping);
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
|
||||
drop(channel);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
event = swarm2.select_next_some() => {
|
||||
if let SwarmEvent::Behaviour(ev) = event {
|
||||
break ev;
|
||||
}
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
let error = match event {
|
||||
request_response::Event::OutboundFailure { error, .. } => error,
|
||||
e => panic!("unexpected event from peer 2: {e:?}"),
|
||||
};
|
||||
|
||||
assert!(matches!(
|
||||
error,
|
||||
request_response::OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof,
|
||||
));
|
||||
}
|
||||
|
||||
// Simple Ping-Pong Protocol
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct Ping(Vec<u8>);
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct Pong(Vec<u8>);
|
206
src/server.rs
206
src/server.rs
|
@ -1,206 +0,0 @@
|
|||
use std::{error::Error, time::Duration};
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
mdns, noise, ping,
|
||||
request_response::{self, Codec},
|
||||
swarm::SwarmEvent,
|
||||
tcp, yamux, Multiaddr,
|
||||
};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
// We create a custom network behaviour
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct MyBehaviour {
|
||||
req_res: request_response::Behaviour,
|
||||
mdns: mdns::tokio::Behaviour,
|
||||
}
|
||||
|
||||
struct MyRequest {
|
||||
key: [u8; 32],
|
||||
body: Vec<u8>,
|
||||
sig: [u8; 64],
|
||||
}
|
||||
|
||||
struct MyResponse {
|
||||
sig: [u8; 64],
|
||||
}
|
||||
|
||||
struct MyCodec;
|
||||
|
||||
impl Codec for MyCodec {
|
||||
#[doc = " The type of protocol(s) or protocol versions being negotiated."]
|
||||
type Protocol = String;
|
||||
|
||||
#[doc = " The type of inbound and outbound requests."]
|
||||
type Request = MyRequest;
|
||||
|
||||
#[doc = " The type of inbound and outbound responses."]
|
||||
type Response = MyResponse;
|
||||
|
||||
#[doc = " Reads a request from the given I/O stream according to the"]
|
||||
#[doc = " negotiated protocol."]
|
||||
#[must_use]
|
||||
#[allow(
|
||||
elided_named_lifetimes,
|
||||
clippy::type_complexity,
|
||||
clippy::type_repetition_in_bounds
|
||||
)]
|
||||
fn read_request<'life0, 'life1, 'life2, 'async_trait, T>(
|
||||
&'life0 mut self,
|
||||
_protocol: &'life1 Self::Protocol,
|
||||
io: &'life2 mut T,
|
||||
) -> ::core::pin::Pin<
|
||||
Box<
|
||||
dyn ::core::future::Future<Output = io::Result<Self::Request>>
|
||||
+ ::core::marker::Send
|
||||
+ 'async_trait,
|
||||
>,
|
||||
>
|
||||
where
|
||||
T: AsyncRead + Unpin + Send,
|
||||
T: 'async_trait,
|
||||
'life0: 'async_trait,
|
||||
'life1: 'async_trait,
|
||||
'life2: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[doc = " Reads a response from the given I/O stream according to the"]
|
||||
#[doc = " negotiated protocol."]
|
||||
#[must_use]
|
||||
#[allow(
|
||||
elided_named_lifetimes,
|
||||
clippy::type_complexity,
|
||||
clippy::type_repetition_in_bounds
|
||||
)]
|
||||
fn read_response<'life0, 'life1, 'life2, 'async_trait, T>(
|
||||
&'life0 mut self,
|
||||
protocol: &'life1 Self::Protocol,
|
||||
io: &'life2 mut T,
|
||||
) -> ::core::pin::Pin<
|
||||
Box<
|
||||
dyn ::core::future::Future<Output = io::Result<Self::Response>>
|
||||
+ ::core::marker::Send
|
||||
+ 'async_trait,
|
||||
>,
|
||||
>
|
||||
where
|
||||
T: AsyncRead + Unpin + Send,
|
||||
T: 'async_trait,
|
||||
'life0: 'async_trait,
|
||||
'life1: 'async_trait,
|
||||
'life2: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[doc = " Writes a request to the given I/O stream according to the"]
|
||||
#[doc = " negotiated protocol."]
|
||||
#[must_use]
|
||||
#[allow(
|
||||
elided_named_lifetimes,
|
||||
clippy::type_complexity,
|
||||
clippy::type_repetition_in_bounds
|
||||
)]
|
||||
fn write_request<'life0, 'life1, 'life2, 'async_trait, T>(
|
||||
&'life0 mut self,
|
||||
protocol: &'life1 Self::Protocol,
|
||||
io: &'life2 mut T,
|
||||
req: Self::Request,
|
||||
) -> ::core::pin::Pin<
|
||||
Box<
|
||||
dyn ::core::future::Future<Output = io::Result<()>>
|
||||
+ ::core::marker::Send
|
||||
+ 'async_trait,
|
||||
>,
|
||||
>
|
||||
where
|
||||
T: AsyncWrite + Unpin + Send,
|
||||
T: 'async_trait,
|
||||
'life0: 'async_trait,
|
||||
'life1: 'async_trait,
|
||||
'life2: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[doc = " Writes a response to the given I/O stream according to the"]
|
||||
#[doc = " negotiated protocol."]
|
||||
#[must_use]
|
||||
#[allow(
|
||||
elided_named_lifetimes,
|
||||
clippy::type_complexity,
|
||||
clippy::type_repetition_in_bounds
|
||||
)]
|
||||
fn write_response<'life0, 'life1, 'life2, 'async_trait, T>(
|
||||
&'life0 mut self,
|
||||
protocol: &'life1 Self::Protocol,
|
||||
io: &'life2 mut T,
|
||||
res: Self::Response,
|
||||
) -> ::core::pin::Pin<
|
||||
Box<
|
||||
dyn ::core::future::Future<Output = io::Result<()>>
|
||||
+ ::core::marker::Send
|
||||
+ 'async_trait,
|
||||
>,
|
||||
>
|
||||
where
|
||||
T: AsyncWrite + Unpin + Send,
|
||||
T: 'async_trait,
|
||||
'life0: 'async_trait,
|
||||
'life1: 'async_trait,
|
||||
'life2: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.try_init();
|
||||
|
||||
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
|
||||
.with_tokio()
|
||||
.with_tcp(
|
||||
tcp::Config::default(),
|
||||
noise::Config::new,
|
||||
yamux::Config::default,
|
||||
)?
|
||||
.with_behaviour(|key| {
|
||||
let mdns =
|
||||
mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
|
||||
let req_res = request_response::Behaviour::with_codec(MyCodec, protocol, config);
|
||||
Ok(MyBehaviour { req_res, mdns })
|
||||
})?
|
||||
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
|
||||
.build();
|
||||
|
||||
// Tell the swarm to listen on all interfaces and a random, OS-assigned
|
||||
// port.
|
||||
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
|
||||
|
||||
// Dial the peer identified by the multi-address given as the second
|
||||
// command-line argument, if any.
|
||||
if let Some(addr) = std::env::args().nth(1) {
|
||||
let remote: Multiaddr = addr.parse()?;
|
||||
swarm.dial(remote)?;
|
||||
println!("Dialed {addr}")
|
||||
}
|
||||
|
||||
loop {
|
||||
match swarm.select_next_some().await {
|
||||
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
|
||||
SwarmEvent::Behaviour(event) => println!("{event:?}"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue