Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pika 丢数据问题 #993

Closed
leisurelyrcxf opened this issue Dec 3, 2020 · 16 comments
Closed

pika 丢数据问题 #993

leisurelyrcxf opened this issue Dec 3, 2020 · 16 comments

Comments

@leisurelyrcxf
Copy link

hi 我们在云环境使用codis+pika,结果造成了客户100G以上数据丢失,查询原因后发现是pika状态机有问题造成的,已经有fix。问下流程是怎样的?

@leisurelyrcxf
Copy link
Author

触发的条件是反复不停地slaveof 同一个master

@leisurelyrcxf
Copy link
Author

单slot数据量大概在50G左右,3.2.9版本

@leisurelyrcxf
Copy link
Author

是基于3.2.9版本开发的。

@kernelai
Copy link
Collaborator

kernelai commented Dec 4, 2020

1 故障的现象是什么?还是slave 同步失败?
2 同步失败导致master的数据丢失吗?
2 故障时的主从日志还有吗?

@leisurelyrcxf
Copy link
Author

slave数据丢了。但是检测同步完成了。

@leisurelyrcxf
Copy link
Author

leisurelyrcxf commented Dec 4, 2020

我们是在codis+pika环境下做slot迁移。程序的逻辑大概是这样的。

@leisurelyrcxf
Copy link
Author

leisurelyrcxf commented Dec 4, 2020

func TestReslaveOf(t *testing.T) {
    clientMaster, err := NewClient("127.0.0.1:56382", "", time.Second)
    if err != nil {
        t.Error(err.Error())
        return
    }
    clientSlave, err := NewClient("127.0.0.1:56381", "", time.Second)
    if err != nil {
        t.Error(err)
        return
    }
    _ = clientSlave.ReconnectIfNeeded()
    if err := clientSlave.SlaveOf("no:one", 1, false, false); err != nil {
        t.Error(err)
    }
    _ = clientSlave.ReconnectIfNeeded()
    if err := clientSlave.DeleteSlot(1); err != nil {
        t.Error(err)
    }
    _ = clientSlave.ReconnectIfNeeded()
    if err := clientSlave.AddSlot(1); err != nil {
        t.Error(err)
    }

LoopFor:
    for {
        _ = clientSlave.ReconnectIfNeeded()
        err := clientSlave.SlaveOf(clientMaster.Addr, 1, false, false)
        if err != nil {
            t.Errorf(err.Error())
            return
        }

        timeout := time.After(2 * time.Second)
        for {
            if slaveOfDone(clientMaster, clientSlave) {
                t.Logf("slave of done")
                return
            }

            select {
            case <-timeout:
                //time.Sleep(2 * time.Second)
                continue LoopFor
            default:
                time.Sleep(200 * time.Millisecond)
            }
        }
    }
}

func slaveOfDone(clientMaster, clientSlave *Client) bool {
    _ = clientMaster.ReconnectIfNeeded()
    masterSlotInfo, err := clientMaster.SlotInfo(1)
    if err != nil {
        return false
    }
    slaveReplInfo, err := masterSlotInfo.FindSlaveReplInfo(clientSlave.Addr)
    if err != nil {
        return false
    }
    return slaveReplInfo.Status == pika.SlaveStatusBinlogSync && slaveReplInfo.Lag == 0
}

@leisurelyrcxf
Copy link
Author

反复slaveof 直到lag为0的时候认为迁移完成。

@leisurelyrcxf
Copy link
Author

结果迁移完成后发现slave数据不全。

@leisurelyrcxf
Copy link
Author

在这个branch里我做了fix
https://github.com/leisurelyrcxf/pika/commits/3.2.9-fix-repl

@leisurelyrcxf
Copy link
Author

很容易复现。

@LIBA-S
Copy link
Contributor

LIBA-S commented Dec 4, 2020

感谢反馈,我们将针对该问题进行排查。

hi 我们在云环境使用codis+pika,结果造成了客户100G以上数据丢失,查询原因后发现是pika状态机有问题造成的,已经有fix。问下流程是怎样的?

@leisurelyrcxf
Copy link
Author

感谢反馈,我们将针对该问题进行排查。

hi 我们在云环境使用codis+pika,结果造成了客户100G以上数据丢失,查询原因后发现是pika状态机有问题造成的,已经有fix。问下流程是怎样的?

thank you. 你们开源不会对3.2.9做fix对不?因为我是对3.2.9做的fix,但是cherry-pick到master冲突太多。。

@leisurelyrcxf
Copy link
Author

here i made an analyse, may accelerate tracking a little bit.

@leisurelyrcxf
Copy link
Author

leisurelyrcxf commented Dec 5, 2020

pika slaveof is neither idempodent nor thread-safe.

One Error Execution. slave addr: 127.0.0.1:56381, master addr: 127.0.0.1:56380

1, slave slaveof master, slave falls into state kTryConnect, send SendPartitionTrySyncRequest to master

2, upon receving SendPartitionTrySyncRequest, if slave is too stale and can't sync incrementally using binlog, master will send back response indicating a dbSync is required

3, slave recieve response and will fall into state kTryDbSync, slave prepare local dir, defaults to 'pika_home/dbsync', here is the code

void Partition::PrepareRsync() {
slash::DeleteDirIfExist(dbsync_path_);
slash::CreatePath(dbsync_path_ + "strings");
slash::CreatePath(dbsync_path_ + "hashes");
slash::CreatePath(dbsync_path_ + "lists");

...

}

apparently this function is not robust because it doesn't check return value of the functions, it can't know whether it has succeeded (actually the DeleteDirIfExist won't succeed if another master is sending files using rsync simultanously).

then slave will send SendPartitionDBSyncRequest to master, falls into kWaitDBSync state

4, master receive SendPartitionDBSyncRequest request, create a snapshot to its local dump dir, then send files to slave, the sending procedure may last long if snapshot is very large.

5, slave waiting...

6, after master sending all files succeeded, master will send a info file named info to slave

7, slave keeps tracing its dbsync folder in a while loop, if it finds the info file appeared, then it knows the transferring has succeeded, it will do some check. If passed it will fall into state kTryConnect, and then send another SendPartitionTrySyncRequest

8, master receive SendPartitionTrySyncRequest and then check binlog, normally it will start an incremental replication, send binlogSync response, otherwise master will fall back to send another dbSync response

9, upon receving binlogSync response, slave will fall into state kConnected

10, master replicates its binlogs after the snapshot checkpoint to slave

But if the slave execute another slaveof master command later. Unfortunately pika lack mechanism of detecting repeating replication job thus it will start another whole round from step 1 to step 7 (see above). This poses the problem.

Step 3, 4,6, 7 have conflict, there may exist an executing order that result in a corrupted data dir with an info file that indicates a successful dbsync.

@leisurelyrcxf
Copy link
Author

我那个fix的思路是对状态机引入CAS操作,避免直接set状态机,同时加入一个slave的master version号避免ABA问题

@luky116 luky116 closed this as completed May 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants