Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: create the tmpdir if the directory is removed by mistake. #18970

Merged
merged 16 commits into from
Aug 18, 2020
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var (
// checkBeforeDropLDFlag is a go build flag.
checkBeforeDropLDFlag = "None"
// tempStorageDirName is the default temporary storage dir name by base64 encoding a string `port/statusPort`
tempStorageDirName = encodeDefTempStorageDir(DefHost, DefStatusHost, DefPort, DefStatusPort)
tempStorageDirName = encodeDefTempStorageDir(os.TempDir(), DefHost, DefStatusHost, DefPort, DefStatusPort)
)

// Config contains configuration options.
Expand Down Expand Up @@ -154,11 +154,13 @@ type Config struct {
// and the `tmp-storage-path` was not specified in the conf.toml or was specified the same as the default value.
func (c *Config) UpdateTempStoragePath() {
if c.TempStoragePath == tempStorageDirName {
c.TempStoragePath = encodeDefTempStorageDir(c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
c.TempStoragePath = encodeDefTempStorageDir(os.TempDir(), c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
} else {
c.TempStoragePath = encodeDefTempStorageDir(c.TempStoragePath, c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
}
}

func encodeDefTempStorageDir(host, statusHost string, port, statusPort uint) string {
func encodeDefTempStorageDir(tempDir string, host, statusHost string, port, statusPort uint) string {
dirName := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%v:%v/%v:%v", host, port, statusHost, statusPort)))
var osUID string
currentUser, err := user.Current()
Expand All @@ -167,7 +169,7 @@ func encodeDefTempStorageDir(host, statusHost string, port, statusPort uint) str
} else {
osUID = currentUser.Uid
}
return filepath.Join(os.TempDir(), osUID+"_tidb", dirName, "tmp-storage")
return filepath.Join(tempDir, osUID+"_tidb", dirName, "tmp-storage")
}

// nullableBool defaults unset bool options to unset instead of false, which enables us to know if the user has set 2
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (s *testConfigSuite) TestEncodeDefTempStorageDir(c *C) {

dirPrefix := filepath.Join(os.TempDir(), osUID+"_tidb")
for _, test := range tests {
tempStorageDir := encodeDefTempStorageDir(test.host, test.statusHost, test.port, test.statusPort)
tempStorageDir := encodeDefTempStorageDir(os.TempDir(), test.host, test.statusHost, test.port, test.statusPort)
c.Assert(tempStorageDir, Equals, filepath.Join(dirPrefix, test.expect, "tmp-storage"))
}
}
Expand Down
16 changes: 16 additions & 0 deletions executor/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor_test

import (
"fmt"
"os"
"strings"

. "github.com/pingcap/check"
Expand All @@ -25,6 +26,11 @@ import (
)

func (s *testSerialSuite1) TestSortInDisk(c *C) {
s.testSortInDisk(c, false)
s.testSortInDisk(c, true)
}

func (s *testSerialSuite1) testSortInDisk(c *C, removeDir bool) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
Expand All @@ -43,6 +49,16 @@ func (s *testSerialSuite1) TestSortInDisk(c *C) {
tk.Se.SetSessionManager(sm)
s.domain.ExpensiveQueryHandle().SetSessionManager(sm)

if removeDir {
c.Assert(os.RemoveAll(config.GetGlobalConfig().TempStoragePath), IsNil)
defer func() {
_, err := os.Stat(config.GetGlobalConfig().TempStoragePath)
if err != nil {
c.Assert(os.IsExist(err), IsTrue)
}
}()
}

tk.MustExec("set @@tidb_mem_quota_query=1;")
tk.MustExec("set @@tidb_max_chunk_size=32;")
tk.MustExec("drop table if exists t")
Expand Down
56 changes: 8 additions & 48 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/danjacques/gofslock/fslock"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -54,6 +52,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/gcworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -151,11 +150,10 @@ var (
)

var (
storage kv.Storage
dom *domain.Domain
svr *server.Server
tempDirLock fslock.Handle
graceful bool
storage kv.Storage
dom *domain.Domain
svr *server.Server
graceful bool
)

func main() {
Expand All @@ -169,7 +167,8 @@ func main() {
config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)
if config.GetGlobalConfig().OOMUseTmpStorage {
config.GetGlobalConfig().UpdateTempStoragePath()
initializeTempDir()
err := disk.InitializeTempDir()
terror.MustNil(err)
checkTempStorageQuota()
}
setGlobalVars()
Expand Down Expand Up @@ -200,42 +199,6 @@ func syncLog() {
}
}

func initializeTempDir() {
tempDir := config.GetGlobalConfig().TempStoragePath
lockFile := "_dir.lock"
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
err = os.MkdirAll(tempDir, 0755)
terror.MustNil(err)
}
tempDirLock, err = fslock.Lock(filepath.Join(tempDir, lockFile))
if err != nil {
switch err {
case fslock.ErrLockHeld:
log.Error("The current temporary storage dir has been occupied by another instance, "+
"check tmp-storage-path config and make sure they are different.", zap.String("TempStoragePath", tempDir), zap.Error(err))
default:
log.Error("Failed to acquire exclusive lock on the temporary storage dir.", zap.String("TempStoragePath", tempDir), zap.Error(err))
}
os.Exit(1)
}

subDirs, err := ioutil.ReadDir(tempDir)
terror.MustNil(err)

for _, subDir := range subDirs {
// Do not remove the lock file.
if subDir.Name() == lockFile {
continue
}
err = os.RemoveAll(filepath.Join(tempDir, subDir.Name()))
if err != nil {
log.Warn("Remove temporary file error",
zap.String("tempStorageSubDir", filepath.Join(tempDir, subDir.Name())), zap.Error(err))
}
}
}

func checkTempStorageQuota() {
// check capacity and the quota when OOMUseTmpStorage is enabled
c := config.GetGlobalConfig()
Expand Down Expand Up @@ -730,10 +693,7 @@ func cleanup() {
}
plugin.Shutdown(context.Background())
closeDomainAndStorage()
if tempDirLock != nil {
err := tempDirLock.Unlock()
terror.Log(errors.Trace(err))
}
disk.CleanUp()
}

func stringToList(repairString string) []string {
Expand Down
6 changes: 5 additions & 1 deletion util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
}

func (l *ListInDisk) initDiskFile() (err error) {
err = disk.CheckAndInitTempDir()
if err != nil {
return
}
l.disk, err = ioutil.TempFile(config.GetGlobalConfig().TempStoragePath, l.diskTracker.Label().String())
if err != nil {
return
Expand Down Expand Up @@ -176,7 +180,7 @@ func (l *ListInDisk) Close() error {
if l.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
return os.Remove(l.disk.Name())
terror.Log(os.Remove(l.disk.Name()))
}
return nil
}
Expand Down
110 changes: 110 additions & 0 deletions util/disk/tempDir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package disk

import (
"io/ioutil"
"os"
"path/filepath"

"github.com/danjacques/gofslock/fslock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

var (
tempDirLock fslock.Handle
sf singleflight.Group
)

// CheckAndInitTempDir check whether the temp directory is existed.
// If not, initializes the temp directory.
func CheckAndInitTempDir() (err error) {
_, err, _ = sf.Do("tempDir", func() (value interface{}, err error) {
if !checkTempDirExist() {
log.Info("Tmp-storage-path not found. Try to initialize TempDir.")
err = InitializeTempDir()
}
return
})
return
}

func checkTempDirExist() bool {
tempDir := config.GetGlobalConfig().TempStoragePath
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
return false
}
return true
}

// InitializeTempDir initializes the temp directory.
func InitializeTempDir() error {
tempDir := config.GetGlobalConfig().TempStoragePath
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
err = os.MkdirAll(tempDir, 0755)
if err != nil {
return err
}
}
lockFile := "_dir.lock"
tempDirLock, err = fslock.Lock(filepath.Join(tempDir, lockFile))
if err != nil {
switch err {
case fslock.ErrLockHeld:
log.Error("The current temporary storage dir has been occupied by another instance, "+
"check tmp-storage-path config and make sure they are different.", zap.String("TempStoragePath", tempDir), zap.Error(err))
default:
log.Error("Failed to acquire exclusive lock on the temporary storage dir.", zap.String("TempStoragePath", tempDir), zap.Error(err))
}
return err
}

subDirs, err := ioutil.ReadDir(tempDir)
if err != nil {
return err
}

// If it exists others files except lock file, creates another goroutine to clean them.
if len(subDirs) > 1 {
go func() {
for _, subDir := range subDirs {
// Do not remove the lock file.
if subDir.Name() == lockFile {
continue
}
err := os.RemoveAll(filepath.Join(tempDir, subDir.Name()))
if err != nil {
log.Warn("Remove temporary file error",
zap.String("tempStorageSubDir", filepath.Join(tempDir, subDir.Name())), zap.Error(err))
}
}
}()
}
return nil
}

// CleanUp releases the directory lock when exiting TiDB.
func CleanUp() {
if tempDirLock != nil {
err := tempDirLock.Unlock()
terror.Log(errors.Trace(err))
}
}
53 changes: 53 additions & 0 deletions util/disk/tempDir_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package disk

import (
"os"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tidb/config"
)

func TestT(t *testing.T) {
check.TestingT(t)
}

var _ = check.SerialSuites(&testDiskSerialSuite{})

type testDiskSerialSuite struct {
}

func (s *testDiskSerialSuite) TestRemoveDir(c *check.C) {
err := InitializeTempDir()
c.Assert(err, check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, true)
c.Assert(os.RemoveAll(config.GetGlobalConfig().TempStoragePath), check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, false)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(c *check.C) {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
err := CheckAndInitTempDir()
c.Assert(err, check.IsNil)
wg.Done()
}(c)
}
wg.Wait()
err = CheckAndInitTempDir()
c.Assert(err, check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, true)
}