Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update 075 #120

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ func buildDir(workspace string) (string, string, string, string, string, string,
return "", "", "", "", "", "", err
}

fadebackDir := filepath.Join(workspace, configs.Fadeback)
if err := os.MkdirAll(fadebackDir, pattern.DirMode); err != nil {
feedbackDir := filepath.Join(workspace, configs.Feedback)
if err := os.MkdirAll(feedbackDir, pattern.DirMode); err != nil {
return "", "", "", "", "", "", err
}

Expand All @@ -398,7 +398,7 @@ func buildDir(workspace string) (string, string, string, string, string, string,
if err := os.MkdirAll(dfileDir, pattern.DirMode); err != nil {
return "", "", "", "", "", "", err
}
return logDir, cacheDir, trackDir, fadebackDir, ufileDir, dfileDir, nil
return logDir, cacheDir, trackDir, feedbackDir, ufileDir, dfileDir, nil
}

func buildCache(cacheDir string) (db.Cache, error) {
Expand Down
2 changes: 1 addition & 1 deletion configs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
// tracked files
Track = "track"
//
Fadeback = "fadeback"
Feedback = "feedback"
//
Ufile = "ufile"
//
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/AstaFrode/go-libp2p v0.26.4-0.20231113143058-912296254d44
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde
github.com/CESSProject/p2p-go v0.2.6-0.20231117060004-cbb4a2e1792e
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cbergoon/merkletree v0.2.0
github.com/centrifuge/go-substrate-rpc-client/v4 v4.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0 h1:WelC
github.com/CESSProject/cess-go-sdk v0.3.21-0.20231114061951-fc77d8141ff0/go.mod h1:KWlHxDKfinyfEI3w7BqMqgo+oi7grsUVayDw/2NYSks=
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde h1:5MDRjjtg6PEhqyVjupwaapN96cOZiddOGAYwKQeaTu0=
github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde/go.mod h1:RUXBd3ROP98MYepEEa0Y0l/T0vQlIKqFJxI/ocdnRLM=
github.com/CESSProject/p2p-go v0.2.6-0.20231117060004-cbb4a2e1792e h1:r+SJZzcxb3so7W/6/98HGtT0BoB3YcK5Kf2uTp0q9Uc=
github.com/CESSProject/p2p-go v0.2.6-0.20231117060004-cbb4a2e1792e/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI=
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668 h1:Enqu6i1fEnm8kORGlYopu94klrbPmMr70DpRYaLnslE=
github.com/CESSProject/p2p-go v0.2.6-0.20231122035207-4ef352825668/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI=
github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM=
github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
Expand Down
13 changes: 8 additions & 5 deletions node/discoverMgt.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ func (n *Node) findPeers(ch chan<- bool) {
}
err := n.Connect(n.GetCtxQueryFromCtxCancel(), foundPeer)
if err != nil {
n.Peerstore().RemovePeer(foundPeer.ID)
// n.Peerstore().RemovePeer(foundPeer.ID)
n.GetDht().RoutingTable().RemovePeer(foundPeer.ID)
} else {
for _, addr := range foundPeer.Addrs {
n.Peerstore().AddAddr(foundPeer.ID, addr, peerstore.AddressTTL)
}
// for _, addr := range foundPeer.Addrs {
// n.Peerstore().AddAddr(foundPeer.ID, addr, peerstore.AddressTTL)
// }
n.GetDht().RoutingTable().TryAddPeer(foundPeer.ID, true, true)
n.SavePeer(foundPeer.ID.Pretty(), peer.AddrInfo{
ID: foundPeer.ID,
Addrs: foundPeer.Addrs,
Expand Down Expand Up @@ -96,7 +98,8 @@ func (n *Node) recvPeers(ch chan<- bool) {
for _, v := range foundPeer.Responses {
if v != nil {
if len(v.Addrs) > 0 {
n.Peerstore().AddAddrs(v.ID, v.Addrs, peerstore.AddressTTL)
// n.Peerstore().AddAddrs(v.ID, v.Addrs, peerstore.AddressTTL)
n.GetDht().RoutingTable().TryAddPeer(foundPeer.ID, true, true)
n.SavePeer(v.ID.Pretty(), peer.AddrInfo{
ID: v.ID,
Addrs: v.Addrs,
Expand Down
263 changes: 45 additions & 218 deletions node/syncFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,259 +107,86 @@ func (n *Node) syncFiles(ch chan<- bool) {
}
}

err = n.ReadDataAction(addr.ID, wantfiles[i].File, wantfiles[i].File, filepath.Join(n.GetDirs().FileDir, wantfiles[i].File), size)
if err != nil {
n.Log("err", err.Error())
}

//
for _, fragment := range fragmentlist {
fid, err := n.FidToCid(string(fragment[:]))
if err != nil {
n.Log("err", err.Error())
n.Block("err", fmt.Sprintf("[FidToCid] [%v] %v", string(fragment[:]), err))
continue
}
_, err = n.getBlockData(fid)
buf, err := n.getBlockData(fid)
if err != nil {
n.Log("err", err.Error())
} else {
n.Log("info", fmt.Sprintf("get block data suc and fid is:%s", fid))
n.Block("err", fmt.Sprintf("[getBlockData] [%v] %v", fid, err))
continue
}
n.Block("info", fmt.Sprintf("[%v] get block data suc", fid))
n.SaveAndNotifyDataBlock(buf)
}
}
}
}

func (n *Node) getBlocks(ch chan<- bool) {
defer func() {
ch <- true
}()

time.Sleep(time.Second * 30)

var wantList = []string{
// 256k
// "QmXeE7NjTLviHDaf7ZvWhdTW2P43QY41DgxpeogQeBXnoZ",
// 1M
"QmRdTXKPV8VPCuPaawjJZZaACsDYRfVtZtNZTDLXrAQPx3",
// 2M
// "QmU9TPr52YZPngKq3AG21FfFztgfsW3LQQKnUWvF73fZjT",
// 3M
// "QmayoE5xG6tvgzTp2bZNErk7TSHW8ou4FKv2mamtGLNpMi",
// 3.5M
// "QmTZciSbB99gqWZknzcZ1HFFtbpnnEz4LTpxQe7zxBmpTF",
// 4M
"Qmay44Rd1U2Gcg3WB2BgTCezP9LNqrTzeDUZt2LKb7HSFh",
// 6M
// "QmadWN69GhfNWtuobno9v1XhXWjSfLkRXEdL32W4gaYunb",
// 8M
// "QmRQyZGdTjS2bYy1BisteG41NtWFSWTjE3smueqQpNZady",
// 10M
"QmdoP3JcdjeWXFDUrsvL3rqKejZJrRaBEEUEHyKwW6JV96",
// 12M
// "QmcKLqao5TcawyJdtDRFbHvMc1ktfepjpqXuu62o6ve1bx",
// 6Bytes
// "QmXsMQRrggM1pD2axFoscGSrANph6f1nsQm6oFm3AHzECy",
// 16M
// "QmcfxLnapUJbDXN68zTYQZX5eaCZ3nSqeG5PFwDCThRwnP",
// "QmauZXKtvXaz7j3Kqq3wA2NURgmJTQL5o8T4N5YProYzt8",
// "QmVLsR4VH3AwEMf9mvBhBGgcQcKLyjQdekJPKHXHWgVF4c",
// "Qma9hBSf9FobnK8N3cLxUuGHtMZfd4b2a66jCVQtYe5CHr",
// "QmdoJangkxFUQsMrvAsmN4vZVHUHqpREJfmbrSHk6XAsDm",
// "QmQUnEaKsHNLteYcQJDNVaHHV243sq8uWU6pVpQUbrSq1P",
// "QmWo6MkkYmKzToPzkXQu2aTS3vQj21fHQiFc3G3jNTzzWT",
// "QmUoJN78mHiCETZv3oRLLYSPAKfgWWgGvw18kCfF5TFH3d",
// "QmXPKrreCAhPMixUFQmCerAvNaMe2rDMqgzjaVrQTqocgZ",
// "QmYgYQSPeQuGxuQutsAoikYWXA7wmakQuUcqcLNuMGV6Y7",
// "QmfDcKHD6PuWfhZJPxaSBVCbmBzxnKzzNroUJoyncqL6Eq",
// "QmT3Pe3wiWpGpcYu4ANuSizzB3463svxSTz1VaqT276vrs",
// "QmV5TvFzDBfVmo8vyweVBvaNeHESdnUss81QEuKm5EeMdQ",
// "QmX3yLqBH78MHNmYjSR27vwGidFH8XYZ96zAYAZLdSkZxj",
// "QmZotvwGTFSaLaSueeLdWuvHgzpYY4d9g2L6ZdX2BpkE9V",
// "QmaqYqCLiEWdXdsV2HJdsTqfEJGUgk6Tbv8FDmhPcKsjm9",
// "QmP5ya9sCaMJDvMJKGXNAxfiS73DKm477nuvG6LwHBbQfz",
// "QmR9GRtE6JypMbQ8BD4bjAfETGuursZ5Q4GiTLppx5u7kc",
}
for i := 0; i < len(wantList); i++ {
_, err := n.getBlockData(wantList[i])
if err != nil {
fmt.Println(err)
err = n.ReadDataAction(addr.ID, wantfiles[i].File, wantfiles[i].File, filepath.Join(n.GetDirs().FileDir, wantfiles[i].File), size)
if err != nil {
n.Block("err", fmt.Sprintf("[ReadDataAction] [%v] %v", wantfiles[i].File, err))
}
}
}
}

func (n *Node) noticyBlocks(ch chan<- bool) {
func (n *Node) noticeBlocks(ch chan<- bool) {
defer func() {
time.Sleep(time.Minute * 2)
ch <- true
if err := recover(); err != nil {
n.Pnc(utils.RecoverError(err))
}
}()

//n.Discover("info", ">>>>> start discoverMgt <<<<<")
fmt.Println("Start syncBlocks")
var ok bool
var blockMap = make(map[string]*blocks.BasicBlock, 0)
n.Block("info", ">>>>> start noticyBlocks <<<<<")

// 1M
data_1M := make([]byte, 1024*1024)
var blockData1M = blocks.NewBlock(data_1M)
err := n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData1M)
blockdirs, err := utils.DirDirs(filepath.Join(n.Workspace(), core.FileBlockDir), 0)
if err != nil {
fmt.Println("[Put] err: ", err)
n.Block("err", fmt.Sprintf("[DirDirs] [%v] %v", filepath.Join(n.Workspace(), core.FileBlockDir), err))
time.Sleep(time.Minute)
return
}
fmt.Println(">>> gen a new 1M data cid: ", blockData1M.Cid().String())
blockMap[blockData1M.Cid().String()] = blockData1M

// // 2M
// data_2M := make([]byte, 2*1024*1024)
// var blockData2M = blocks.NewBlock(data_2M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData2M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 2M data cid: ", blockData2M.Cid().String())
// blockMap[blockData2M.Cid().String()] = blockData2M

// // 3M
// data_3M := make([]byte, 3*1024*1024)
// var blockData3M = blocks.NewBlock(data_3M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData3M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 3M data cid: ", blockData3M.Cid().String())
// blockMap[blockData3M.Cid().String()] = blockData3M

// // 3.5M
// data_3_5M := make([]byte, 7*1024*1024/2)
// var blockData3_5M = blocks.NewBlock(data_3_5M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData3_5M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 3.5M data cid: ", blockData3_5M.Cid().String())
// blockMap[blockData3_5M.Cid().String()] = blockData3_5M

// 4M
data_4M := make([]byte, 4*1024*1024)
var blockData4M = blocks.NewBlock(data_4M)
err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData4M)
if err != nil {
fmt.Println("[Put] err: ", err)
return
}
fmt.Println(">>> gen a new 4M data cid: ", blockData4M.Cid().String())
blockMap[blockData4M.Cid().String()] = blockData4M

// // 6M
// data_6M := make([]byte, 6*1024*1024)
// var blockData6M = blocks.NewBlock(data_6M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData6M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 6M data cid: ", blockData6M.Cid().String())
// blockMap[blockData6M.Cid().String()] = blockData6M

// // 8M
// data_8M := make([]byte, 8*1024*1024)
// var blockData8M = blocks.NewBlock(data_8M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData8M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 8M data cid: ", blockData8M.Cid().String())
// blockMap[blockData8M.Cid().String()] = blockData8M

// 10M
data_10M := make([]byte, 10*1024*1024)
var blockData10M = blocks.NewBlock(data_10M)
err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData10M)
if err != nil {
fmt.Println("[Put] err: ", err)
if len(blockdirs) == 0 {
time.Sleep(time.Minute)
return
}
fmt.Println(">>> gen a new 10M data cid: ", blockData10M.Cid().String())
blockMap[blockData10M.Cid().String()] = blockData10M

// // 12M
// data_12M := make([]byte, 12*1024*1024)
// var blockData12M = blocks.NewBlock(data_12M)
// err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData12M)
// if err != nil {
// fmt.Println("[Put] err: ", err)
// return
// }
// fmt.Println(">>> gen a new 12M data cid: ", blockData12M.Cid().String())
// blockMap[blockData12M.Cid().String()] = blockData12M

for {
blockdirs, err := utils.DirDirs(filepath.Join(n.Workspace(), core.FileBlockDir), 0)
for i := 0; i < len(blockdirs); i++ {
datadir := filepath.Join(blockdirs[i], ".data")
hash, err := sutils.CalcPathSHA256(datadir)
if err != nil {
fmt.Println("[noticyBlocks.DirDirs] err: ", err)
return
n.Block("err", fmt.Sprintf("[CalcPathSHA256] [%v] %v", datadir, err))
continue
}
mycid, err := n.FidToCid(hash)
if err != nil {
n.Block("err", fmt.Sprintf("[FidToCid] [%v] %v", hash, err))
continue
}

for i := 0; i < len(blockdirs); i++ {
datadir := filepath.Join(blockdirs[i], ".data")
fmt.Println("[datadir]: ", datadir)
hash, err := sutils.CalcPathSHA256(datadir)
if err != nil {
fmt.Println("[CalcPathSHA256] err: ", err)
continue
}
mycid, err := n.FidToCid(hash)
if err != nil {
fmt.Println("[FidToCid] err: ", err)
continue
}

fmt.Println("Local cid: ", mycid)

buf, err := n.GetLocalDataFromBlock(mycid)
if err != nil {
fmt.Println("[GetDataFromBlock] err: ", err)
continue
}
var blockData = blocks.NewBlock(buf)

err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData)
if err != nil {
fmt.Println("[Put] err: ", err)
continue
}

_, ok = blockMap[mycid]
if !ok {
blockMap[mycid] = blockData
}
buf, err := n.GetLocalDataFromBlock(mycid)
if err != nil {
n.Block("err", fmt.Sprintf("[GetLocalDataFromBlock] [%v] %v", mycid, err))
continue
}

if len(blockdirs) == 0 {
fmt.Println("----Local block data is empty------")
time.Sleep(time.Minute)
return
var blockData = blocks.NewBlock(buf)
err = n.GetBlockstore().Put(n.GetCtxQueryFromCtxCancel(), blockData)
if err != nil {
n.Block("err", fmt.Sprintf("[Blockstore.Put] %v", err))
continue
}

count := 0
for count <= 10 {
for k, v := range blockMap {
err = n.GetBitSwap().NotifyNewBlocks(n.GetCtxQueryFromCtxCancel(), v)
if err != nil {
fmt.Println("[NotifyNewBlocks] ", k, " err: ", err)
} else {
fmt.Println("[NotifyNewBlocks] ", k, " suc")
}
count++
}
time.Sleep(time.Second * 10)
err = n.GetBitSwap().NotifyNewBlocks(n.GetCtxQueryFromCtxCancel(), blockData)
if err != nil {
n.Block("err", fmt.Sprintf("[NotifyNewBlocks] [%v] %v", mycid, err))
continue
}

n.Block("info", fmt.Sprintf("[NotifyNewBlocks] [%v] ", mycid))
}
}

Expand All @@ -370,7 +197,7 @@ func (n *Node) getBlockData(wantcid string) ([]byte, error) {
fmt.Println("[cid.Decode] err: ", err)
return nil, err
}
ctxTout, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
ctxTout, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
defer cancelFunc()
buf, err := n.GetBitSwap().GetBlock(ctxTout, acid)
if err != nil {
Expand Down
Loading