p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store

Kademlia 网络协议

  • Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。

  • 以下这些函数或方法是根据 Kademlia 网络协议设计的,它们实现了基本的网络操作,包括获取数据记录、获取数据提供者、存储数据记录和开始提供数据等功能(这里只展示了项目中用到的函数,常用函数可以看libp2p Kademlia DHT 规范,更多函数可见如下图中的源码部分)。

在这里插入图片描述

1. get_record

kademlia.get_record(key, Quorum::One);
  • 作用: 从 Kademlia 网络中获取与指定 key 相关的记录。
  • 参数:
    • key: 要获取记录的键。
    • Quorum::One: 获取记录时所需的一致性要求,这里是指只需要从一个节点获取记录即可。
  • 实现逻辑:
    • 根据 Kademlia 协议,节点首先根据 key 计算出其对应的 K-bucket 或者具体的节点 ID,然后向网络中查找负责该 key 的节点。
    • 节点通过网络查询和消息传递机制,从负责节点处获取存储的记录。
    • 返回获取到的记录或者执行相应的处理逻辑。

2. get_providers

kademlia.get_providers(key);
  • 作用: 获取能够提供与指定 key 相关数据的节点信息(即数据的提供者)。
  • 参数:
    • key: 要获取提供者信息的数据的键。
  • 实现逻辑:
    • 类似于 get_record,节点根据 key 计算出其对应的 K-bucket 或者节点 ID。
    • 节点向网络发送查询请求,询问哪些节点能够提供与 key 相关的数据。
    • 返回能够提供数据的节点列表或者执行相应的处理逻辑。

3. put_record

let record = Record {
    key,
    value,
    publisher: None,
    expires: None,
};
kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
  • 作用: 将指定的记录存储到 Kademlia 网络中。
  • 参数:
    • record: 包含要存储的数据信息的记录对象,包括 key(键)、value(值)、publisher(发布者,可能为空)、expires(过期时间,可能为空)等字段。
    • Quorum::One: 存储记录时的一致性要求,这里是指只需要将记录存储在一个节点即可。
  • 实现逻辑:
    • 节点根据 key 计算出对应的 K-bucket 或节点 ID。
    • 节点将 record 发送给负责存储该 key 的节点,并根据指定的一致性要求存储副本。
    • 返回存储成功或失败的结果,或者执行相应的处理逻辑。

4. start_providing

kademlia.start_providing(key).expect("Failed to start providing key");
  • 作用: 在 Kademlia 网络中开始提供指定 key 的数据。
  • 参数:
    • key: 要开始提供的数据的键。
  • 实现逻辑:
    • 节点将 key 注册为它可以提供的数据标识。
    • 当其他节点查询或需要该 key 的数据时,该节点将响应并提供相应的数据。
    • 返回启动提供成功或失败的结果,或者执行相应的处理逻辑。

kv数据库主体代码及注释

use async_std::io;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
    record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,
    Quorum, Record,
};
use libp2p::{
    development_transport, identity,
    mdns::{Mdns, MdnsConfig, MdnsEvent},
    swarm::SwarmEvent,
    NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::init();

    //  创建本地密钥,本地peer id和传输控制组件
    let local_key = identity::Keypair::generate_ed25519();
    let local_peer_id = PeerId::from(local_key.public());
    let transport = development_transport(local_key).await?;

    // 事件行为控制
    // We create a custom network behaviour that combines Kademlia and mDNS.
    #[derive(NetworkBehaviour)]// https://docs.rs/libp2p/latest/libp2p/swarm/trait.NetworkBehaviour.html
    #[behaviour(out_event = "MyBehaviourEvent")]//这个 "MyBehaviourEvent" 定义在下边的代码中
    // NetworkBehaviour这个trait将对所描述的结构体中的每个成员依次进行操作,例如 NetworkBehavior::poll它将首先轮询第一个结构成员,直到返回poll::Pending,然后再转到后面的成员。
    // 关于 #[behaviour(out_event = "MyBehaviourEvent")]中的out_event :The final out event. If we find a `#[behaviour(out_event = "Foo")]` attribute on the struct, we set `Foo` as the out event. Otherwise we use `()`.
    struct MyBehaviour {
        kademlia: Kademlia<MemoryStore>,
        mdns: Mdns,
    }


    #[allow(clippy::large_enum_variant)] //  #[allow()为Lint语法属性检查控制,https://doc.rust-lang.org/reference/attributes/diagnostics.html#lint-check-attributes    //关于large_enum_variant 详见https://rust-lang.github.io/rust-clippy/master/index.html#/large_enum_variant
    enum MyBehaviourEvent {
        Kademlia(KademliaEvent),
        Mdns(MdnsEvent),
    }
    // 实现(impl)块,用于为类型KademliaEvent实现了From trait,使其能够被转换为类型MyBehaviourEvent。
    impl From<KademliaEvent> for MyBehaviourEvent {
        fn from(event: KademliaEvent) -> Self {
            MyBehaviourEvent::Kademlia(event)
        }
    }
    // 实现(impl)块,用于为类型  MdnsEvent   实现了From trait,使其能够被转换为类型MyBehaviourEvent。
    impl From<MdnsEvent> for MyBehaviourEvent {
        fn from(event: MdnsEvent) -> Self {
            MyBehaviourEvent::Mdns(event)
        }
    }

    // Create a swarm to manage peers and events.
    let mut swarm = {
        // Create a Kademlia behaviour.
        let store = MemoryStore::new(local_peer_id);
        let kademlia = Kademlia::new(local_peer_id, store);
        let mdns = Mdns::new(MdnsConfig::default()).await?;
        let behaviour = MyBehaviour { kademlia, mdns };
        Swarm::new(transport, behaviour, local_peer_id)
    };

    // 从命令行读取指令并赋值给可变变量"stdin"
    let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

    // Listen on all interfaces and whatever port the OS assigns.
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

    // Kick it off.
    loop {
        select! {
            line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
            event = swarm.select_next_some() => match event { // swarm.select_next_some() 是一个方法,用于从一个事件流中获取下一个事件,后续送到match进行匹配
                SwarmEvent::NewListenAddr { address, .. } => {//当发生新的监听地址事件时
                    println!("Listening in {:?}", address);
                },
                SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {// 发生mDNS服务发现事件时
                    for (peer_id, multiaddr) in list {
                        swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
                    }
                }
                SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {// 当发出的 Kademlia 查询完成时
                    handle_query_result(&result);
                }
                _ => {} // 通配符模式,执行一个空的代码块
            }
        }
    }
}

// 下面是两个辅助函数,一个根据不同的查询结果类型执行不同的逻辑,另一个处理从命令行输入的命令
fn handle_query_result(result: &QueryResult) {
    match result {
     ...
    }
}

fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
    let mut args = line.split(' ');

    match args.next() {
    ...
    }
}

两个辅助函数

处理从命令行输入的命令

  • 这段 Rust 代码定义了一个函数 handle_input_line,用于处理从命令行读取的输入 line,并根据命令执行相应的操作。函数通过分割输入行来解析命令和参数,处理缺少参数的错误情况,并根据命令调用传入的 Kademlia 网络实例 (kademlia) 的相应方法。
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
    // 将输入行按空格分割为多个参数
    let mut args = line.split(' ');

    // 匹配第一个参数(命令)
    match args.next() {
        Some("GET") => {
            // 如果命令是 "GET"
            let key = {
                // 尝试获取下一个参数作为键
                match args.next() {
                    Some(key) => Key::new(&key), // 从字符串创建 Key 对象
                    None => {
                        // 如果未提供键,则打印错误并从函数返回
                        eprintln!("缺少键");
                        return;
                    }
                }
            };
            // 调用 Kademlia 网络的 get_record 方法,传入指定的键和 Quorum::One
            kademlia.get_record(key, Quorum::One);
        }
        Some("GET_PROVIDERS") => {
            // 如果命令是 "GET_PROVIDERS"
            let key = {
                // 尝试获取下一个参数作为键
                match args.next() {
                    Some(key) => Key::new(&key), // 从字符串创建 Key 对象
                    None => {
                        // 如果未提供键,则打印错误并从函数返回
                        eprintln!("缺少键");
                        return;
                    }
                }
            };
            // 调用 Kademlia 网络的 get_providers 方法,传入指定的键
            kademlia.get_providers(key);
        }
        Some("PUT") => {
            // 如果命令是 "PUT"
            let key = {
                // 尝试获取下一个参数作为键
                match args.next() {
                    Some(key) => Key::new(&key), // 从字符串创建 Key 对象
                    None => {
                        // 如果未提供键,则打印错误并从函数返回
                        eprintln!("缺少键");
                        return;
                    }
                }
            };
            let value = {
                // 尝试获取下一个参数作为值
                match args.next() {
                    Some(value) => value.as_bytes().to_vec(), // 将值转换为字节向量
                    None => {
                        // 如果未提供值,则打印错误并从函数返回
                        eprintln!("缺少值");
                        return;
                    }
                }
            };
            // 创建一个包含指定键、值及可选字段的 Record 对象
            let record = Record {
                key,
                value,
                publisher: None,
                expires: None,
            };
            // 在 Kademlia 网络中以 Quorum::One 一致性存储记录
            kademlia
                .put_record(record, Quorum::One)
                .expect("本地存储记录失败。");
        }
        Some("PUT_PROVIDER") => {
            // 如果命令是 "PUT_PROVIDER"
            let key = {
                // 尝试获取下一个参数作为键
                match args.next() {
                    Some(key) => Key::new(&key), // 从字符串创建 Key 对象
                    None => {
                        // 如果未提供键,则打印错误并从函数返回
                        eprintln!("缺少键");
                        return;
                    }
                }
            };
            // 在 Kademlia 网络中开始提供指定的键
            kademlia
                .start_providing(key)
                .expect("启动提供键失败");
        }
        _ => {
            // 如果命令不匹配预期的任何命令
            eprintln!("期望命令为 GET、GET_PROVIDERS、PUT 或 PUT_PROVIDER");
        }
    }

根据不同的查询结果类型执行不同的逻辑

fn handle_query_result(result: &QueryResult) {
    match result {
        QueryResult::GetProviders(Ok(ok)) => {
            for peer in &ok.providers {
                println!(
                    "Peer {:?} provides key {:?}",
                    peer,
                    std::str::from_utf8(ok.key.as_ref()).unwrap()
                );
            }
        }
        QueryResult::GetProviders(Err(err)) => {
            eprintln!("Failed to get providers: {:?}", err);
        }
        QueryResult::GetRecord(Ok(ok)) => {
            for PeerRecord {
                record: Record { key, value, .. },
                ..
            } in &ok.records
            {
                println!(
                    "Got record {:?} {:?}",
                    std::str::from_utf8(key.as_ref()).unwrap(),
                    std::str::from_utf8(&value).unwrap(),
                );
            }
        }
        QueryResult::GetRecord(Err(err)) => {
            eprintln!("Failed to get record: {:?}", err);
        }
        QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
            println!(
                "Successfully put record {:?}",
                std::str::from_utf8(key.as_ref()).unwrap()
            );
        }
        QueryResult::PutRecord(Err(err)) => {
            eprintln!("Failed to put record: {:?}", err);
        }
        QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
            println!(
                "Successfully put provider record {:?}",
                std::str::from_utf8(key.as_ref()).unwrap()
            );
        }
        QueryResult::StartProviding(Err(err)) => {
            eprintln!("Failed to put provider record: {:?}", err);
        }
        _ => {}
    }
}

fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
    let mut args = line.split(' ');

    match args.next() {
        Some("GET") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };
            kademlia.get_record(key, Quorum::One);
        }
        Some("GET_PROVIDERS") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };
            kademlia.get_providers(key);
        }
        Some("PUT") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };
            let value = {
                match args.next() {
                    Some(value) => value.as_bytes().to_vec(),
                    None => {
                        eprintln!("Expected value");
                        return;
                    }
                }
            };
            let record = Record {
                key,
                value,
                publisher: None,
                expires: None,
            };
            kademlia
                .put_record(record, Quorum::One)
                .expect("Failed to store record locally.");
        }
        Some("PUT_PROVIDER") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };

            kademlia
                .start_providing(key)
                .expect("Failed to start providing key");
        }
        _ => {
            eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
        }
    }
}

运行示例

PS C:\Users\kingchuxing\Documents\learning-libp2p-main\rust> cargo run --example 04-kv-store
Listening in "/ip4/172.23.118.182/tcp/65055"
Listening in "/ip4/192.168.0.104/tcp/65055"
Listening in "/ip4/127.0.0.1/tcp/65055"
GET 123
Failed to get record: NotFound { key: Key(b"123"), closest_peers: [] }
PUT 123
缺少值
PUT 123 123456789
Failed to put record: QuorumFailed { key: Key(b"123"), success: [], quorum: 1 }
GET 123     
Got record "123" "123456789"
PUT_PROVIDER 234 //输入提供者
Successfully put provider record "234"
GET_PROVIDERS 234 //获取提供者
Peer PeerId("12D3KooWB7CFnrmeH5gzRxA4CYR2YTg2K3NMvNHP5dWDPFwAHY38") provides key "234"
GET 234
Failed to get record: NotFound { key: Key(b"234"), closest_peers: [] }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/768377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI 会淘汰程序员吗?

前言 前些日子看过一篇文章&#xff0c;说国外一位拥有 19 年编码经验、会 100% 手写代码的程序员被企业解雇了&#xff0c;因为他的竞争对手&#xff0c;一位仅有 4 年经验、却善于使用 Copilot、GPT-4 的后辈&#xff0c;生产力比他更高&#xff0c;成本比他更低&#xff0c…

基于java+springboot+vue实现的家政服务平台(文末源码+Lw)299

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本家政服务平台就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…

2. Python+Playwright playwright的API

Playwright支持同步和异步两种API&#xff0c;使用异步API需要导入asyncio库&#xff0c;它是一个可以用来实现Python协程的库&#xff0c;更详细介绍可参考Python协程 。我们可以根据自己的偏好选择适合的模式。 同步与异步模式原理 同步操作方式&#xff1a;在代码执行时&am…

SpringBoot 整合 Minio 实现文件切片极速上传技术

Centos7安装Minio 创建目标文件夹 mkdir minio使用docker查看目标镜像状况 大家需要注意&#xff0c;此处我们首先需要安装docker&#xff0c;对于相关安装教程&#xff0c;大家可以查看我之前的文章&#xff0c;按部就班就可以&#xff0c;此处不再赘述&#xff01;&#x…

学习和发展人工智能:新兴趋势和成功秘诀

人工智能(AI)继续吸引组织&#xff0c;因为它似乎无穷无尽地提高生产力和业务成果。在本博客中&#xff0c;了解学习和发展(L&D)部门如何利用人工智能改进流程&#xff0c;简化工作流程&#xff1f; 学习与发展(L&D)部门领导开始探索如何提高和支持人工智能能力的劳动…

Linux Swap机制关键点分析

1. page被swap出去之后,再次缺页是怎么找到找个换出的页面? 正常内存的页面是通过pte映射找到page的,swap出去的page有其特殊的方式:swap的页面page->private字段保存的是:swap_entry_t通过swap_entry_t就能找到该页面的扇区号sector_t,拿到扇区号就可以从块设备中读…

充电宝哪个牌子比较好用?好用的充电宝推荐!

在如今这个电子设备不离手的时代&#xff0c;充电宝已经成为了我们生活中的必备好物。但面对市面上琳琅满目的充电宝品牌和产品&#xff0c;相信很多朋友都曾陷入过纠结&#xff1a;充电宝哪个牌子比较好用呢&#xff1f;为了解决大家的困惑&#xff0c;经过我精心的筛选和试用…

8.x86游戏实战-OD详解

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 上一个内容&#xff1a;7.x86游戏实战-C实现跨进程读写-跨进程写内存 工具下载&#xff1a;下载 OllyI…

【信即是功夫】人皆有良知在心中

良知就是做人、做事的准则&#xff0c;良知就是天理&#xff1b;实实在在地自信 每个人心中都有一个圣人&#xff0c;只因自己不能真的相信&#xff0c;把这个圣人埋没了 良知在每个人心中&#xff0c;无论你如何做&#xff0c;也无法泯灭它。即使身为盗贼的人&#xff0c;他…

【LeetCode的使用方法】

🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步! 🔮LeetCode的使用方法 🔮LeetCode 是一个在线编程平台,广泛…

掌握Go语言邮件发送:net/smtp实用教程与最佳实践

掌握Go语言邮件发送&#xff1a;net/smtp实用教程与最佳实践 概述基本配置与初始化导入net/smtp包设置SMTP服务器基本信息创建SMTP客户端实例身份验证 发送简单文本邮件配置发件人信息构建邮件头部信息编写邮件正文使用SendMail方法发送邮件示例代码 发送带附件的邮件邮件多部分…

STM32之五:TIM定时器(2-通用定时器)

目录 通用定时器&#xff08;TIM2~5&#xff09;框图 1、 输入时钟源选择 2、 时基单元 3 、输入捕获&#xff1a;&#xff08;IC—Input Capture&#xff09; 3.1 输入捕获通道框图&#xff08;TI1为例&#xff09; 3.1.1 滤波器&#xff1a; 3.1.2 边沿检测器&#xf…

CesiumJS【Basic】- #058 绘制网格填充多边形(Entity方式)-使用shader

文章目录 绘制网格填充多边形(Entity方式)-使用shader1 目标2 代码2.1 main.ts绘制网格填充多边形(Entity方式)-使用shader 1 目标 使用Entity方式绘制绘制网格填充多边形 - 使用shader 2 代码 2.1 main.ts import * as Cesium from cesium;// 创建 Cesium Viewer 实例…

安装Gitlab+Jenkins

GItlab概述 GitLab概述&#xff1a; 是一个利用 Ruby on Rails 开发的开源应用程序&#xff0c;实现一个自托管的Git项目仓库&#xff0c;可通过Web界面进行访问公开的或者私人项目。 Ruby on Rails 是一个可以使你开发、部署、维护 web 应用程序变得简单的框架。 GitLab拥有与…

ESP32-VScode环境设置

目录 前言 一、安装VSCode 二、安装ESP32环境 1.安装ESP-IDF 2.ESP-IDF设置 3:开始配置环境 4.打开example进行验证 5.烧录 6.调整波特率 总结 前言 环境&#xff1a;Visual Studio Code 芯片&#xff1a;ESP32 说实话&#xff0c;这是我装的时间最长的一个环境&…

C++ 和C#的差别

首先把眼睛瞪大&#xff0c;然后憋住一口气&#xff0c;读下去&#xff1a; 1、CPP 就是C plus plus的缩写&#xff0c;中国大陆的程序员圈子中通常被读做"C加加"&#xff0c;而西方的程序员通常读做"C plus plus"&#xff0c;它是一种使用非常广泛的计算…

【分布式系统】监控平台Zabbix对接grafana

以前两篇博客为基础 【分布式系统】监控平台Zabbix介绍与部署&#xff08;命令截图版&#xff09;-CSDN博客 【分布式系统】监控平台Zabbix自定义模版配置-CSDN博客 一.安装grafana并启动 添加一台服务器192.168.80.104 初始化操作 systemctl disable --now firewalld set…

运维锅总浅析云原生DevOps工具

本文从Tekton与Kubevela、Jenkins、GitLab CI的区别与联系对常见的云原生DevOps工具进行对比分析&#xff0c;最后给出DevOps工具选型思路。希望对您有所帮助&#xff01; 一、DevOps简介 DevOps是一种结合了软件开发&#xff08;Development&#xff09;和IT运维&#xff08…

【代码随想录】【算法训练营】【第56天】 [卡码98]所有可达路径

前言 思路及算法思维&#xff0c;指路 代码随想录。 题目来自 卡码网。 day 56&#xff0c;周二&#xff0c;继续ding~ 题目详情 [卡码98] 所有可达路径 题目描述 卡码98 所有可达路径 解题思路 前提&#xff1a; 思路&#xff1a; 重点&#xff1a; 代码实现 C语言…

python - 列表 / 元组 / 字符串

一.列表 由于pyhon的变量没有数据类型&#xff0c;所以python是没有数组的&#xff08;因为数组只能存放一种类型&#xff0c;要么全部存放整型&#xff0c;要么全部存放浮点型&#xff09;&#xff0c;只有列表list&#xff0c;所以整数&#xff0c;浮点数&#xff0c;字符串…
最新文章