Skip to content

Commit

Permalink
cherry pick pingcap#18970 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
wshwsh12 authored and ti-srebot committed Aug 18, 2020
1 parent d890499 commit ce71fc7
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 53 deletions.
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
// checkBeforeDropLDFlag is a go build flag.
checkBeforeDropLDFlag = "None"
// tempStorageDirName is the default temporary storage dir name by base64 encoding a string `port/statusPort`
tempStorageDirName = encodeDefTempStorageDir(DefHost, DefStatusHost, DefPort, DefStatusPort)
tempStorageDirName = encodeDefTempStorageDir(os.TempDir(), DefHost, DefStatusHost, DefPort, DefStatusPort)
)

// Config contains configuration options.
Expand Down Expand Up @@ -149,11 +149,13 @@ type Config struct {
// and the `tmp-storage-path` was not specified in the conf.toml or was specified the same as the default value.
func (c *Config) UpdateTempStoragePath() {
if c.TempStoragePath == tempStorageDirName {
c.TempStoragePath = encodeDefTempStorageDir(c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
c.TempStoragePath = encodeDefTempStorageDir(os.TempDir(), c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
} else {
c.TempStoragePath = encodeDefTempStorageDir(c.TempStoragePath, c.Host, c.Status.StatusHost, c.Port, c.Status.StatusPort)
}
}

func encodeDefTempStorageDir(host, statusHost string, port, statusPort uint) string {
func encodeDefTempStorageDir(tempDir string, host, statusHost string, port, statusPort uint) string {
dirName := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%v:%v/%v:%v", host, port, statusHost, statusPort)))
var osUID string
currentUser, err := user.Current()
Expand All @@ -162,7 +164,7 @@ func encodeDefTempStorageDir(host, statusHost string, port, statusPort uint) str
} else {
osUID = currentUser.Uid
}
return filepath.Join(os.TempDir(), osUID+"_tidb", dirName, "tmp-storage")
return filepath.Join(tempDir, osUID+"_tidb", dirName, "tmp-storage")
}

// nullableBool defaults unset bool options to unset instead of false, which enables us to know if the user has set 2
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (s *testConfigSuite) TestEncodeDefTempStorageDir(c *C) {

dirPrefix := filepath.Join(os.TempDir(), osUID+"_tidb")
for _, test := range tests {
tempStorageDir := encodeDefTempStorageDir(test.host, test.statusHost, test.port, test.statusPort)
tempStorageDir := encodeDefTempStorageDir(os.TempDir(), test.host, test.statusHost, test.port, test.statusPort)
c.Assert(tempStorageDir, Equals, filepath.Join(dirPrefix, test.expect, "tmp-storage"))
}
}
Expand Down
23 changes: 23 additions & 0 deletions executor/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor_test

import (
"fmt"
"os"
"strings"

. "github.com/pingcap/check"
Expand All @@ -25,11 +26,23 @@ import (
)

func (s *testSerialSuite1) TestSortInDisk(c *C) {
<<<<<<< HEAD
originCfg := config.GetGlobalConfig()
newConf := *originCfg
newConf.OOMUseTmpStorage = true
config.StoreGlobalConfig(&newConf)
defer config.StoreGlobalConfig(originCfg)
=======
s.testSortInDisk(c, false)
s.testSortInDisk(c, true)
}

func (s *testSerialSuite1) testSortInDisk(c *C, removeDir bool) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
})
>>>>>>> 92513a2... util: create the tmpdir if the directory is removed by mistake. (#18970)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"), IsNil)
Expand All @@ -44,6 +57,16 @@ func (s *testSerialSuite1) TestSortInDisk(c *C) {
tk.Se.SetSessionManager(sm)
s.domain.ExpensiveQueryHandle().SetSessionManager(sm)

if removeDir {
c.Assert(os.RemoveAll(config.GetGlobalConfig().TempStoragePath), IsNil)
defer func() {
_, err := os.Stat(config.GetGlobalConfig().TempStoragePath)
if err != nil {
c.Assert(os.IsExist(err), IsTrue)
}
}()
}

tk.MustExec("set @@tidb_mem_quota_query=1;")
tk.MustExec("set @@tidb_max_chunk_size=32;")
tk.MustExec("drop table if exists t")
Expand Down
56 changes: 8 additions & 48 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/danjacques/gofslock/fslock"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -54,6 +52,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/gcworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -149,11 +148,10 @@ var (
)

var (
storage kv.Storage
dom *domain.Domain
svr *server.Server
tempDirLock fslock.Handle
graceful bool
storage kv.Storage
dom *domain.Domain
svr *server.Server
graceful bool
)

func main() {
Expand All @@ -167,7 +165,8 @@ func main() {
config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)
if config.GetGlobalConfig().OOMUseTmpStorage {
config.GetGlobalConfig().UpdateTempStoragePath()
initializeTempDir()
err := disk.InitializeTempDir()
terror.MustNil(err)
checkTempStorageQuota()
}
setGlobalVars()
Expand Down Expand Up @@ -197,42 +196,6 @@ func syncLog() {
}
}

func initializeTempDir() {
tempDir := config.GetGlobalConfig().TempStoragePath
lockFile := "_dir.lock"
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
err = os.MkdirAll(tempDir, 0755)
terror.MustNil(err)
}
tempDirLock, err = fslock.Lock(filepath.Join(tempDir, lockFile))
if err != nil {
switch err {
case fslock.ErrLockHeld:
log.Error("The current temporary storage dir has been occupied by another instance, "+
"check tmp-storage-path config and make sure they are different.", zap.String("TempStoragePath", tempDir), zap.Error(err))
default:
log.Error("Failed to acquire exclusive lock on the temporary storage dir.", zap.String("TempStoragePath", tempDir), zap.Error(err))
}
os.Exit(1)
}

subDirs, err := ioutil.ReadDir(tempDir)
terror.MustNil(err)

for _, subDir := range subDirs {
// Do not remove the lock file.
if subDir.Name() == lockFile {
continue
}
err = os.RemoveAll(filepath.Join(tempDir, subDir.Name()))
if err != nil {
log.Warn("Remove temporary file error",
zap.String("tempStorageSubDir", filepath.Join(tempDir, subDir.Name())), zap.Error(err))
}
}
}

func checkTempStorageQuota() {
// check capacity and the quota when OOMUseTmpStorage is enabled
c := config.GetGlobalConfig()
Expand Down Expand Up @@ -700,10 +663,7 @@ func cleanup() {
}
plugin.Shutdown(context.Background())
closeDomainAndStorage()
if tempDirLock != nil {
err := tempDirLock.Unlock()
terror.Log(errors.Trace(err))
}
disk.CleanUp()
}

func stringToList(repairString string) []string {
Expand Down
8 changes: 8 additions & 0 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
}

func (l *ListInDisk) initDiskFile() (err error) {
err = disk.CheckAndInitTempDir()
if err != nil {
return
}
l.disk, err = ioutil.TempFile(config.GetGlobalConfig().TempStoragePath, l.diskTracker.Label().String())
if err != nil {
return
Expand Down Expand Up @@ -184,8 +188,12 @@ func (l *ListInDisk) Close() error {
if l.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
<<<<<<< HEAD
bufWriterPool.Put(l.bufWriter)
return os.Remove(l.disk.Name())
=======
terror.Log(os.Remove(l.disk.Name()))
>>>>>>> 92513a2... util: create the tmpdir if the directory is removed by mistake. (#18970)
}
return nil
}
Expand Down
110 changes: 110 additions & 0 deletions util/disk/tempDir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package disk

import (
"io/ioutil"
"os"
"path/filepath"

"github.com/danjacques/gofslock/fslock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

var (
tempDirLock fslock.Handle
sf singleflight.Group
)

// CheckAndInitTempDir check whether the temp directory is existed.
// If not, initializes the temp directory.
func CheckAndInitTempDir() (err error) {
_, err, _ = sf.Do("tempDir", func() (value interface{}, err error) {
if !checkTempDirExist() {
log.Info("Tmp-storage-path not found. Try to initialize TempDir.")
err = InitializeTempDir()
}
return
})
return
}

func checkTempDirExist() bool {
tempDir := config.GetGlobalConfig().TempStoragePath
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
return false
}
return true
}

// InitializeTempDir initializes the temp directory.
func InitializeTempDir() error {
tempDir := config.GetGlobalConfig().TempStoragePath
_, err := os.Stat(tempDir)
if err != nil && !os.IsExist(err) {
err = os.MkdirAll(tempDir, 0755)
if err != nil {
return err
}
}
lockFile := "_dir.lock"
tempDirLock, err = fslock.Lock(filepath.Join(tempDir, lockFile))
if err != nil {
switch err {
case fslock.ErrLockHeld:
log.Error("The current temporary storage dir has been occupied by another instance, "+
"check tmp-storage-path config and make sure they are different.", zap.String("TempStoragePath", tempDir), zap.Error(err))
default:
log.Error("Failed to acquire exclusive lock on the temporary storage dir.", zap.String("TempStoragePath", tempDir), zap.Error(err))
}
return err
}

subDirs, err := ioutil.ReadDir(tempDir)
if err != nil {
return err
}

// If it exists others files except lock file, creates another goroutine to clean them.
if len(subDirs) > 1 {
go func() {
for _, subDir := range subDirs {
// Do not remove the lock file.
if subDir.Name() == lockFile {
continue
}
err := os.RemoveAll(filepath.Join(tempDir, subDir.Name()))
if err != nil {
log.Warn("Remove temporary file error",
zap.String("tempStorageSubDir", filepath.Join(tempDir, subDir.Name())), zap.Error(err))
}
}
}()
}
return nil
}

// CleanUp releases the directory lock when exiting TiDB.
func CleanUp() {
if tempDirLock != nil {
err := tempDirLock.Unlock()
terror.Log(errors.Trace(err))
}
}
53 changes: 53 additions & 0 deletions util/disk/tempDir_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package disk

import (
"os"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tidb/config"
)

func TestT(t *testing.T) {
check.TestingT(t)
}

var _ = check.SerialSuites(&testDiskSerialSuite{})

type testDiskSerialSuite struct {
}

func (s *testDiskSerialSuite) TestRemoveDir(c *check.C) {
err := InitializeTempDir()
c.Assert(err, check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, true)
c.Assert(os.RemoveAll(config.GetGlobalConfig().TempStoragePath), check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, false)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(c *check.C) {
err := CheckAndInitTempDir()
c.Assert(err, check.IsNil)
wg.Done()
}(c)
}
wg.Wait()
err = CheckAndInitTempDir()
c.Assert(err, check.IsNil)
c.Assert(checkTempDirExist(), check.Equals, true)
}

0 comments on commit ce71fc7

Please sign in to comment.