refactor and add disk cache

This commit is contained in:
YouROK
2021-05-08 21:41:04 +03:00
parent 018c98f0fc
commit f07f3182a3
3 changed files with 173 additions and 59 deletions

View File

@@ -0,0 +1,63 @@
package torrstor
import (
"io"
"sync"
"time"
)
type DiskPiece struct {
piece *Piece
mu sync.RWMutex
}
func NewDiskPiece(p *Piece) *DiskPiece {
return &DiskPiece{piece: p}
}
func (p *DiskPiece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
id := int64(p.piece.Id)
pl := p.piece.cache.pieceLength
poff := id * pl
off += poff
n = 0
off, err = p.piece.cache.file.Seek(off, io.SeekStart)
if err == nil {
n, err = p.piece.cache.file.Write(b)
}
go p.piece.cache.loadPieces()
p.piece.cache.saveInfo()
p.piece.Size += int64(n)
p.piece.Accessed = time.Now().Unix()
return
}
func (p *DiskPiece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
id := int64(p.piece.Id)
pl := p.piece.cache.pieceLength
poff := id * pl
off += poff
n = 0
off, err = p.piece.cache.file.Seek(off, io.SeekStart)
if err == nil {
n, err = p.piece.cache.file.Read(b)
}
p.piece.Accessed = time.Now().Unix()
return n, nil
}
func (p *DiskPiece) Release() {
}

View File

@@ -0,0 +1,71 @@
package torrstor
import (
"io"
"sync"
"time"
"github.com/anacrolix/torrent"
)
type MemPiece struct {
piece *Piece
buffer []byte
mu sync.RWMutex
}
func NewMemPiece(p *Piece) *MemPiece {
return &MemPiece{piece: p}
}
func (p *MemPiece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer == nil {
go p.piece.cache.cleanPieces()
p.buffer = make([]byte, p.piece.cache.pieceLength, p.piece.cache.pieceLength)
}
n = copy(p.buffer[off:], b[:])
p.piece.Size += int64(n)
p.piece.Accessed = time.Now().Unix()
return
}
func (p *MemPiece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.EOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.piece.Accessed = time.Now().Unix()
if int64(len(b))+off >= p.piece.Size {
go p.piece.cache.cleanPieces()
}
if n == 0 {
return 0, io.EOF
}
return n, nil
}
func (p *MemPiece) Release() {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer != nil {
p.buffer = nil
}
p.piece.Size = 0
p.piece.Complete = false
p.piece.cache.torrent.Piece(p.piece.Id).SetPriority(torrent.PiecePriorityNone)
}

View File

@@ -1,96 +1,76 @@
package torrstor package torrstor
import ( import (
"errors"
"io"
"sync"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"server/settings"
) )
type Piece struct { type Piece struct {
storage.PieceImpl storage.PieceImpl `json:"-"`
Id int Id int `json:"-"`
Size int64 Size int64 `json:"size"`
complete bool Complete bool `json:"complete"`
accessed int64 Accessed int64 `json:"accessed"`
buffer []byte
mu sync.RWMutex mPiece *MemPiece `json:"-"`
cache *Cache dPiece *DiskPiece `json:"-"`
cache *Cache `json:"-"`
}
func NewPiece(id int, cache *Cache) *Piece {
p := &Piece{
Id: id,
cache: cache,
}
if !settings.BTsets.UseDisk {
p.mPiece = NewMemPiece(p)
} else {
p.dPiece = NewDiskPiece(p)
}
return p
} }
func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock() if !settings.BTsets.UseDisk {
defer p.mu.Unlock() return p.mPiece.WriteAt(b, off)
} else {
if p.buffer == nil { return p.dPiece.WriteAt(b, off)
go p.cache.cleanPieces()
p.buffer = make([]byte, p.cache.pieceLength, p.cache.pieceLength)
} }
n = copy(p.buffer[off:], b[:])
p.Size += int64(n)
p.accessed = time.Now().Unix()
return
} }
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock() if !settings.BTsets.UseDisk {
defer p.mu.RUnlock() return p.mPiece.ReadAt(b, off)
} else {
size := len(b) return p.dPiece.ReadAt(b, off)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
} }
} }
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.EOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.accessed = time.Now().Unix()
if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces()
}
if n == 0 {
return 0, io.EOF
}
return n, nil
}
func (p *Piece) MarkComplete() error { func (p *Piece) MarkComplete() error {
if len(p.buffer) == 0 { p.Complete = true
return errors.New("piece is not complete")
}
p.complete = true
return nil return nil
} }
func (p *Piece) MarkNotComplete() error { func (p *Piece) MarkNotComplete() error {
p.complete = false p.Complete = false
return nil return nil
} }
func (p *Piece) Completion() storage.Completion { func (p *Piece) Completion() storage.Completion {
return storage.Completion{ return storage.Completion{
Complete: p.complete && len(p.buffer) > 0, Complete: p.Complete,
Ok: true, Ok: true,
} }
} }
func (p *Piece) Release() { func (p *Piece) Release() {
p.mu.Lock() if !settings.BTsets.UseDisk {
defer p.mu.Unlock() p.mPiece.Release()
if p.buffer != nil { } else {
p.buffer = nil p.dPiece.Release()
} }
p.Size = 0
p.complete = false
p.cache.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNone)
} }