From fc7f5e411e22c64de044895e2519c97d5135a7e4 Mon Sep 17 00:00:00 2001 From: YouROK <8yourok8@mail.ru> Date: Thu, 17 Dec 2020 10:46:12 +0300 Subject: [PATCH] remove loader reader and make pieces priority control --- src/server/torr/storage/torrstor/cache.go | 23 +-- src/server/torr/storage/torrstor/reader.go | 7 +- .../torr/storage/torrstor/readerloader.go | 152 +++++------------- 3 files changed, 51 insertions(+), 131 deletions(-) diff --git a/src/server/torr/storage/torrstor/cache.go b/src/server/torr/storage/torrstor/cache.go index cd87400..5dfb9db 100644 --- a/src/server/torr/storage/torrstor/cache.go +++ b/src/server/torr/storage/torrstor/cache.go @@ -133,7 +133,7 @@ func (c *Cache) GetState() *state.CacheState { c.muReaders.Lock() for r, _ := range c.readers { - start, end := r.getUsedPiecesRange() + start, end := r.getPiecesRange() if p, ok := c.pieces[start]; ok { stats[start] = state.ItemState{ Id: p.Id, @@ -169,7 +169,7 @@ func (c *Cache) GetState() *state.CacheState { } } - reader, loader := r.getReaderPieces() + reader := r.getReaderPiece() if p, ok := c.pieces[reader]; ok { stats[reader] = state.ItemState{ Id: p.Id, @@ -187,23 +187,6 @@ func (c *Cache) GetState() *state.CacheState { ReaderType: 3, } } - if p, ok := c.pieces[loader]; ok { - stats[loader] = state.ItemState{ - Id: p.Id, - Size: p.Size, - Length: p.Length, - Completed: p.complete, - ReaderType: 3, - } - } else { - stats[loader] = state.ItemState{ - Id: loader, - Size: 0, - Length: c.pieceLength, - Completed: false, - ReaderType: 3, - } - } } c.muReaders.Unlock() @@ -248,7 +231,7 @@ func (c *Cache) getRemPieces() []*Piece { fill += p.Size c.muReaders.Lock() for r, _ := range c.readers { - start, end := r.getUsedPiecesRange() + start, end := r.getPiecesRange() if id < start || id > end { piecesRemove = append(piecesRemove, p) } diff --git a/src/server/torr/storage/torrstor/reader.go b/src/server/torr/storage/torrstor/reader.go index 08146b5..abbf8b9 100644 --- a/src/server/torr/storage/torrstor/reader.go +++ b/src/server/torr/storage/torrstor/reader.go @@ -18,9 +18,10 @@ type Reader struct { mu sync.Mutex ///Preload - isPreload bool - loaderOffset int64 - muPreload sync.Mutex + lastRangeBegin int // num piece + lastRangeEnd int // num piece + isPreload bool + muPreload sync.Mutex } func newReader(file *torrent.File, cache *Cache) *Reader { diff --git a/src/server/torr/storage/torrstor/readerloader.go b/src/server/torr/storage/torrstor/readerloader.go index 1c94066..b8f5fc4 100644 --- a/src/server/torr/storage/torrstor/readerloader.go +++ b/src/server/torr/storage/torrstor/readerloader.go @@ -1,122 +1,58 @@ package torrstor import ( - "io" - - "github.com/dustin/go-humanize" - "server/log" + "github.com/anacrolix/torrent" + "server/settings" ) -func (r *Reader) getUsedPiecesRange() (int, int) { - startOff := r.getStartLoaderOffset() - endOff := r.getEndLoaderOffset() +func (r *Reader) getPiecesRange() (int, int) { + startOff, endOff := r.getReaderRange() return r.getPieceNum(startOff), r.getPieceNum(endOff) } -func (r *Reader) getReaderPieces() (int, int) { +func (r *Reader) getReaderPiece() int { readerOff := r.offset - loaderOff := r.loaderOffset - return r.getPieceNum(readerOff), r.getPieceNum(loaderOff) -} - -////////////////////////////////////////////////////////////// -/// Прелоадер начинает загрузку от старта плеера+RAH и имеет свой RAH -/// Прелоадер грузит до конца-RAH -func (r *Reader) preload() { - r.muPreload.Lock() - // загрузка уже идет или конец меньше RAH, тогда старается основной ридер - if r.isPreload || r.getEndLoaderOffset()-r.loaderOffset < r.readahead { - r.muPreload.Unlock() - return - } - r.isPreload = true - r.muPreload.Unlock() - - log.TLogln("Start buffering from", humanize.IBytes(uint64(r.loaderOffset))) - go func() { - // получаем ридер - buffReader := r.file.NewReader() - defer func() { - r.isPreload = false - buffReader.Close() - log.TLogln("End buffering") - }() - // ищем не прочитанный кусок - r.loaderOffset = r.findPreloadedStart() - // выходим если ничего подгружать не нужно - if r.loaderOffset >= r.getEndLoaderOffset() { - return - } - // двигаем лоадер - buffReader.Seek(r.loaderOffset, io.SeekStart) - buff := make([]byte, 1024) - // isReadahead чтобы меньше переключать RAH - isReadahead := false - buffReader.SetReadahead(0) - // читаем пока позиция лоадера меньше конца и не закрыт ридер - for r.loaderOffset < r.getEndLoaderOffset()-1024 && !r.isClosed { - off, err := buffReader.Read(buff) - if err != nil { - log.TLogln("Error read e head buffer", err) - return - } - r.loaderOffset += int64(off) - // если лоадер не успевает загрузить данные и вошел на границу загрузки основного ридера, двигаем его - if r.loaderOffset < r.offset+r.readahead { - // подвигаем за границу основного ридера+1 кусок - r.loaderOffset = r.offset + r.readahead + r.cache.pieceLength - buffReader.Seek(r.loaderOffset, io.SeekStart) - } - // если ридер подобрался к концу-RAH - if r.loaderOffset > r.getEndLoaderOffset()-r.readahead-1024 && isReadahead { - // читаем конец без RAH - log.TLogln("disable buffering RAH") - buffReader.SetReadahead(0) - isReadahead = false - } else if r.loaderOffset < r.getEndLoaderOffset()-r.readahead-1024 && !isReadahead { - // Конец удалился и можно включить RAH - log.TLogln("enable buffering RAH") - buffReader.SetReadahead(r.readahead) - isReadahead = true - } - //log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.loaderOffset)), humanize.IBytes(uint64(r.endLoaderOffset))) - } - }() -} - -func (r *Reader) getStartLoaderOffset() int64 { - off := r.offset - r.cache.capacity/2 - if off < 0 { - off = 0 - } - return off -} - -func (r *Reader) getEndLoaderOffset() int64 { - off := r.offset + r.cache.capacity/2 - if off > r.file.Length() { - off = r.file.Length() - } - return off -} - -func (r *Reader) findPreloadedStart() int64 { - found := false - pstart := r.getPieceNum(r.offset + r.readahead) - pend := r.getPieceNum(r.getEndLoaderOffset()) - for i := pstart; i < pend; i++ { - if r.cache.pieces[i].Size < r.cache.pieces[i].Length { - pstart = i - found = true - break - } - } - if !found { - return r.getEndLoaderOffset() - } - return int64(pstart) * r.cache.pieceLength + return r.getPieceNum(readerOff) } func (r *Reader) getPieceNum(offset int64) int { return int((offset + r.file.Offset()) / r.cache.pieceLength) } + +func (r *Reader) getReaderRange() (int64, int64) { + prc := int64(settings.BTsets.ReaderPreload) + beginOffset := r.offset - r.cache.capacity*(100-prc)/100 + endOffset := r.offset + r.cache.capacity*prc/100 + + if beginOffset < 0 { + beginOffset = 0 + } + + if endOffset > r.file.Length() { + endOffset = r.file.Length() + } + return beginOffset, endOffset +} + +func (r *Reader) preload() { + torr := r.file.Torrent() + begin, end := r.getPiecesRange() + rahPiece := int(r.readahead / torr.Info().PieceLength) + readerPiece := r.getReaderPiece() + + for i := r.lastRangeBegin; i < r.lastRangeEnd; i++ { + if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces + continue + } + piece := torr.Piece(i) + piece.SetPriority(torrent.PiecePriorityNone) + } + + for i := begin; i < end; i++ { + if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces + continue + } + torr.Piece(i).SetPriority(torrent.PiecePriorityNormal) + } + r.lastRangeBegin, r.lastRangeEnd = begin, end +}