From f07f3182a3cc3a61e1adbbfa5700f99768cf3410 Mon Sep 17 00:00:00 2001 From: YouROK <8yourok8@mail.ru> Date: Sat, 8 May 2021 21:41:04 +0300 Subject: [PATCH] refactor and add disk cache --- server/torr/storage/torrstor/diskpiece.go | 63 +++++++++++++++ server/torr/storage/torrstor/mempiece.go | 71 ++++++++++++++++ server/torr/storage/torrstor/piece.go | 98 +++++++++-------------- 3 files changed, 173 insertions(+), 59 deletions(-) create mode 100644 server/torr/storage/torrstor/diskpiece.go create mode 100644 server/torr/storage/torrstor/mempiece.go diff --git a/server/torr/storage/torrstor/diskpiece.go b/server/torr/storage/torrstor/diskpiece.go new file mode 100644 index 0000000..2a4cf44 --- /dev/null +++ b/server/torr/storage/torrstor/diskpiece.go @@ -0,0 +1,63 @@ +package torrstor + +import ( + "io" + "sync" + "time" +) + +type DiskPiece struct { + piece *Piece + + mu sync.RWMutex +} + +func NewDiskPiece(p *Piece) *DiskPiece { + return &DiskPiece{piece: p} +} + +func (p *DiskPiece) WriteAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + id := int64(p.piece.Id) + pl := p.piece.cache.pieceLength + poff := id * pl + off += poff + + n = 0 + off, err = p.piece.cache.file.Seek(off, io.SeekStart) + if err == nil { + n, err = p.piece.cache.file.Write(b) + } + + go p.piece.cache.loadPieces() + p.piece.cache.saveInfo() + + p.piece.Size += int64(n) + p.piece.Accessed = time.Now().Unix() + return +} + +func (p *DiskPiece) ReadAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + id := int64(p.piece.Id) + pl := p.piece.cache.pieceLength + poff := id * pl + off += poff + + n = 0 + off, err = p.piece.cache.file.Seek(off, io.SeekStart) + if err == nil { + n, err = p.piece.cache.file.Read(b) + } + + p.piece.Accessed = time.Now().Unix() + return n, nil +} + +func (p *DiskPiece) Release() { + +} diff --git a/server/torr/storage/torrstor/mempiece.go b/server/torr/storage/torrstor/mempiece.go new file mode 100644 index 0000000..3f7b48c --- /dev/null +++ b/server/torr/storage/torrstor/mempiece.go @@ -0,0 +1,71 @@ +package torrstor + +import ( + "io" + "sync" + "time" + + "github.com/anacrolix/torrent" +) + +type MemPiece struct { + piece *Piece + + buffer []byte + mu sync.RWMutex +} + +func NewMemPiece(p *Piece) *MemPiece { + return &MemPiece{piece: p} +} + +func (p *MemPiece) WriteAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.buffer == nil { + go p.piece.cache.cleanPieces() + p.buffer = make([]byte, p.piece.cache.pieceLength, p.piece.cache.pieceLength) + } + n = copy(p.buffer[off:], b[:]) + p.piece.Size += int64(n) + p.piece.Accessed = time.Now().Unix() + return +} + +func (p *MemPiece) ReadAt(b []byte, off int64) (n int, err error) { + p.mu.RLock() + defer p.mu.RUnlock() + + size := len(b) + if size+int(off) > len(p.buffer) { + size = len(p.buffer) - int(off) + if size < 0 { + size = 0 + } + } + if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size { + return 0, io.EOF + } + n = copy(b, p.buffer[int(off) : int(off)+size][:]) + p.piece.Accessed = time.Now().Unix() + if int64(len(b))+off >= p.piece.Size { + go p.piece.cache.cleanPieces() + } + if n == 0 { + return 0, io.EOF + } + return n, nil +} + +func (p *MemPiece) Release() { + p.mu.Lock() + defer p.mu.Unlock() + if p.buffer != nil { + p.buffer = nil + } + p.piece.Size = 0 + p.piece.Complete = false + + p.piece.cache.torrent.Piece(p.piece.Id).SetPriority(torrent.PiecePriorityNone) +} diff --git a/server/torr/storage/torrstor/piece.go b/server/torr/storage/torrstor/piece.go index f5ce3d7..ac2fd58 100644 --- a/server/torr/storage/torrstor/piece.go +++ b/server/torr/storage/torrstor/piece.go @@ -1,96 +1,76 @@ package torrstor import ( - "errors" - "io" - "sync" - "time" - - "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" + "server/settings" ) type Piece struct { - storage.PieceImpl + storage.PieceImpl `json:"-"` - Id int - Size int64 + Id int `json:"-"` + Size int64 `json:"size"` - complete bool - accessed int64 - buffer []byte + Complete bool `json:"complete"` + Accessed int64 `json:"accessed"` - mu sync.RWMutex - cache *Cache + mPiece *MemPiece `json:"-"` + dPiece *DiskPiece `json:"-"` + + cache *Cache `json:"-"` +} + +func NewPiece(id int, cache *Cache) *Piece { + p := &Piece{ + Id: id, + cache: cache, + } + + if !settings.BTsets.UseDisk { + p.mPiece = NewMemPiece(p) + } else { + p.dPiece = NewDiskPiece(p) + } + return p } func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.buffer == nil { - go p.cache.cleanPieces() - p.buffer = make([]byte, p.cache.pieceLength, p.cache.pieceLength) + if !settings.BTsets.UseDisk { + return p.mPiece.WriteAt(b, off) + } else { + return p.dPiece.WriteAt(b, off) } - n = copy(p.buffer[off:], b[:]) - p.Size += int64(n) - p.accessed = time.Now().Unix() - return } func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { - p.mu.RLock() - defer p.mu.RUnlock() - - size := len(b) - if size+int(off) > len(p.buffer) { - size = len(p.buffer) - int(off) - if size < 0 { - size = 0 - } + if !settings.BTsets.UseDisk { + return p.mPiece.ReadAt(b, off) + } else { + return p.dPiece.ReadAt(b, off) } - if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size { - return 0, io.EOF - } - n = copy(b, p.buffer[int(off) : int(off)+size][:]) - p.accessed = time.Now().Unix() - if int64(len(b))+off >= p.Size { - go p.cache.cleanPieces() - } - if n == 0 { - return 0, io.EOF - } - return n, nil } func (p *Piece) MarkComplete() error { - if len(p.buffer) == 0 { - return errors.New("piece is not complete") - } - p.complete = true + p.Complete = true return nil } func (p *Piece) MarkNotComplete() error { - p.complete = false + p.Complete = false return nil } func (p *Piece) Completion() storage.Completion { return storage.Completion{ - Complete: p.complete && len(p.buffer) > 0, + Complete: p.Complete, Ok: true, } } func (p *Piece) Release() { - p.mu.Lock() - defer p.mu.Unlock() - if p.buffer != nil { - p.buffer = nil + if !settings.BTsets.UseDisk { + p.mPiece.Release() + } else { + p.dPiece.Release() } - p.Size = 0 - p.complete = false - - p.cache.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNone) }