This commit is contained in:
YouROK
2021-05-17 14:55:57 +03:00
parent 29f13fd482
commit e578628886
40 changed files with 1319 additions and 656 deletions

View File

@@ -1,7 +1,11 @@
package torrstor
import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
@@ -37,6 +41,8 @@ type Cache struct {
torrent *torrent.Torrent
}
const FileRangeNotDelete = 5 * 1024 * 1024
func NewCache(capacity int64, storage *Storage) *Cache {
ret := &Cache{
capacity: capacity,
@@ -59,10 +65,30 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
c.pieceCount = info.NumPieces()
c.hash = hash
if settings.BTsets.UseDisk {
name := filepath.Join(settings.BTsets.TorrentsSavePath, hash.HexString())
err := os.MkdirAll(name, 0777)
if err != nil {
log.TLogln("Error create dir:", err)
}
}
for i := 0; i < c.pieceCount; i++ {
c.pieces[i] = &Piece{
Id: i,
cache: c,
c.pieces[i] = NewPiece(i, c)
}
if settings.BTsets.UseDisk {
name := filepath.Join(settings.BTsets.TorrentsSavePath, hash.HexString())
fs, err := ioutil.ReadDir(name)
if err == nil {
for _, f := range fs {
id, err := strconv.Atoi(f.Name())
if err == nil {
c.pieces[id].Size = f.Size()
c.pieces[id].Complete = f.Size() == c.pieceLength
c.pieces[id].Accessed = f.ModTime().Unix()
}
}
}
}
}
@@ -80,9 +106,7 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
func (c *Cache) Close() error {
log.TLogln("Close cache for:", c.hash)
if _, ok := c.storage.caches[c.hash]; ok {
delete(c.storage.caches, c.hash)
}
delete(c.storage.caches, c.hash)
c.pieces = nil
c.muReaders.Lock()
@@ -120,7 +144,7 @@ func (c *Cache) GetState() *state.CacheState {
Id: p.Id,
Size: p.Size,
Length: c.pieceLength,
Completed: p.complete,
Completed: p.Complete,
}
}
}
@@ -204,12 +228,15 @@ func (c *Cache) getRemPieces() []*Piece {
}
for r, _ := range c.readers {
if c.isIdInFileBE(ranges, r.getReaderPiece()) {
continue
}
pc := r.getReaderPiece()
end := r.getPiecesRange().End
limit := 5
for pc <= end && limit > 0 {
if !c.pieces[pc].complete {
if !c.pieces[pc].Complete {
if c.torrent.PieceState(pc).Priority == torrent.PiecePriorityNone {
c.torrent.Piece(pc).SetPriority(torrent.PiecePriorityNormal)
}
@@ -220,7 +247,7 @@ func (c *Cache) getRemPieces() []*Piece {
}
sort.Slice(piecesRemove, func(i, j int) bool {
return piecesRemove[i].accessed < piecesRemove[j].accessed
return piecesRemove[i].Accessed < piecesRemove[j].Accessed
})
c.filled = fill
@@ -230,9 +257,9 @@ func (c *Cache) getRemPieces() []*Piece {
func (c *Cache) isIdInFileBE(ranges []Range, id int) bool {
for _, rng := range ranges {
ss := int(rng.File.Offset() / c.pieceLength)
se := int((5*1024*1024 + rng.File.Offset()) / c.pieceLength)
se := int((FileRangeNotDelete + rng.File.Offset()) / c.pieceLength)
es := int((rng.File.Offset() + rng.File.Length() - 5*1024*1024) / c.pieceLength)
es := int((rng.File.Offset() + rng.File.Length() - FileRangeNotDelete) / c.pieceLength)
ee := int((rng.File.Offset() + rng.File.Length()) / c.pieceLength)
if id >= ss && id <= se || id >= es && id <= ee {
@@ -242,6 +269,68 @@ func (c *Cache) isIdInFileBE(ranges []Range, id int) bool {
return false
}
// run only in cache on disk
func (c *Cache) LoadPiecesOnDisk() {
if c.torrent == nil {
return
}
if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
ranges := make([]Range, 0)
c.muReaders.Lock()
for r, _ := range c.readers {
ranges = append(ranges, r.getPiecesRange())
}
c.muReaders.Unlock()
ranges = mergeRange(ranges)
for r, _ := range c.readers {
pc := r.getReaderPiece()
limit := 5
for limit > 0 {
if !c.pieces[pc].Complete {
if c.torrent.PieceState(pc).Priority == torrent.PiecePriorityNone {
c.torrent.Piece(pc).SetPriority(torrent.PiecePriorityNormal)
}
limit--
}
pc++
}
}
if len(c.readers) == 0 {
limit := 5
pc := 0
end := c.pieceCount
for pc <= end {
if !c.pieces[pc].Complete {
break
}
pc++
}
for pc <= end && limit > 0 {
if !c.pieces[pc].Complete {
if c.torrent.PieceState(pc).Priority == torrent.PiecePriorityNone {
c.torrent.Piece(pc).SetPriority(torrent.PiecePriorityNormal)
}
limit--
}
pc++
}
}
}
//////////////////
// Reader section
////////

View File

@@ -0,0 +1,57 @@
package torrstor
import (
"os"
"path/filepath"
"strconv"
"sync"
"time"
"server/log"
"server/settings"
)
type DiskPiece struct {
piece *Piece
file *os.File
mu sync.RWMutex
}
func NewDiskPiece(p *Piece) *DiskPiece {
name := filepath.Join(settings.BTsets.TorrentsSavePath, p.cache.hash.HexString(), strconv.Itoa(p.Id))
ff, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
log.TLogln("Error open file:", err)
return nil
}
return &DiskPiece{piece: p, file: ff}
}
func (p *DiskPiece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
n, err = p.file.WriteAt(b, off)
go p.piece.cache.LoadPiecesOnDisk()
p.piece.Size += int64(n)
p.piece.Accessed = time.Now().Unix()
return
}
func (p *DiskPiece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
n, err = p.file.ReadAt(b, off)
p.piece.Accessed = time.Now().Unix()
return n, nil
}
func (p *DiskPiece) Release() {
p.file.Close()
}

View File

@@ -0,0 +1,71 @@
package torrstor
import (
"io"
"sync"
"time"
"github.com/anacrolix/torrent"
)
type MemPiece struct {
piece *Piece
buffer []byte
mu sync.RWMutex
}
func NewMemPiece(p *Piece) *MemPiece {
return &MemPiece{piece: p}
}
func (p *MemPiece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer == nil {
go p.piece.cache.cleanPieces()
p.buffer = make([]byte, p.piece.cache.pieceLength, p.piece.cache.pieceLength)
}
n = copy(p.buffer[off:], b[:])
p.piece.Size += int64(n)
p.piece.Accessed = time.Now().Unix()
return
}
func (p *MemPiece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.EOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.piece.Accessed = time.Now().Unix()
if int64(len(b))+off >= p.piece.Size {
go p.piece.cache.cleanPieces()
}
if n == 0 {
return 0, io.EOF
}
return n, nil
}
func (p *MemPiece) Release() {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer != nil {
p.buffer = nil
}
p.piece.Size = 0
p.piece.Complete = false
p.piece.cache.torrent.Piece(p.piece.Id).SetPriority(torrent.PiecePriorityNone)
}

View File

@@ -1,86 +1,76 @@
package torrstor
import (
"errors"
"io"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/storage"
"server/settings"
)
type Piece struct {
storage.PieceImpl
storage.PieceImpl `json:"-"`
Id int
Size int64
Id int `json:"-"`
Size int64 `json:"size"`
complete bool
accessed int64
buffer []byte
Complete bool `json:"complete"`
Accessed int64 `json:"accessed"`
cache *Cache
mPiece *MemPiece `json:"-"`
dPiece *DiskPiece `json:"-"`
cache *Cache `json:"-"`
}
func NewPiece(id int, cache *Cache) *Piece {
p := &Piece{
Id: id,
cache: cache,
}
if !settings.BTsets.UseDisk {
p.mPiece = NewMemPiece(p)
} else {
p.dPiece = NewDiskPiece(p)
}
return p
}
func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
if p.buffer == nil {
go p.cache.cleanPieces()
p.buffer = make([]byte, p.cache.pieceLength, p.cache.pieceLength)
if !settings.BTsets.UseDisk {
return p.mPiece.WriteAt(b, off)
} else {
return p.dPiece.WriteAt(b, off)
}
n = copy(p.buffer[off:], b[:])
p.Size += int64(n)
p.accessed = time.Now().Unix()
return
}
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
if !settings.BTsets.UseDisk {
return p.mPiece.ReadAt(b, off)
} else {
return p.dPiece.ReadAt(b, off)
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.EOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.accessed = time.Now().Unix()
if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces()
}
if n == 0 {
return 0, io.EOF
}
return n, nil
}
func (p *Piece) MarkComplete() error {
if len(p.buffer) == 0 {
return errors.New("piece is not complete")
}
p.complete = true
p.Complete = true
return nil
}
func (p *Piece) MarkNotComplete() error {
p.complete = false
p.Complete = false
return nil
}
func (p *Piece) Completion() storage.Completion {
return storage.Completion{
Complete: p.complete && len(p.buffer) > 0,
Complete: p.Complete,
Ok: true,
}
}
func (p *Piece) Release() {
if p.buffer != nil {
p.buffer = nil
if !settings.BTsets.UseDisk {
p.mPiece.Release()
} else {
p.dPiece.Release()
}
p.Size = 0
p.complete = false
p.cache.torrent.Piece(p.Id).SetPriority(torrent.PiecePriorityNone)
}

View File

@@ -4,6 +4,7 @@ import (
"io"
"strings"
"sync"
"time"
"github.com/anacrolix/torrent"
@@ -21,8 +22,9 @@ type Reader struct {
isClosed bool
///Preload
muPreload sync.Mutex
ranges Range
lastAccess int64
muPreload sync.Mutex
ranges Range
}
func newReader(file *torrent.File, cache *Cache) *Reader {
@@ -53,6 +55,7 @@ func (r *Reader) Seek(offset int64, whence int) (n int64, err error) {
}
n, err = r.Reader.Seek(offset, whence)
r.offset = n
r.lastAccess = time.Now().Unix()
return
}
@@ -83,6 +86,7 @@ func (r *Reader) Read(p []byte) (n int, err error) {
}
r.offset += int64(n)
r.lastAccess = time.Now().Unix()
} else {
log.TLogln("Torrent closed and readed")
}
@@ -130,6 +134,11 @@ func (r *Reader) getPieceNum(offset int64) int {
}
func (r *Reader) getOffsetRange() (int64, int64) {
if time.Now().Unix() > r.lastAccess+60 {
return r.file.Offset(), r.file.Offset()
}
prc := int64(settings.BTsets.ReaderReadAHead)
readers := int64(len(r.cache.readers))
if readers == 0 {