remove loader reader and make pieces priority control

This commit is contained in:
YouROK
2020-12-17 10:46:12 +03:00
parent a0d0a44945
commit fc7f5e411e
3 changed files with 51 additions and 131 deletions

View File

@@ -133,7 +133,7 @@ func (c *Cache) GetState() *state.CacheState {
c.muReaders.Lock()
for r, _ := range c.readers {
start, end := r.getUsedPiecesRange()
start, end := r.getPiecesRange()
if p, ok := c.pieces[start]; ok {
stats[start] = state.ItemState{
Id: p.Id,
@@ -169,7 +169,7 @@ func (c *Cache) GetState() *state.CacheState {
}
}
reader, loader := r.getReaderPieces()
reader := r.getReaderPiece()
if p, ok := c.pieces[reader]; ok {
stats[reader] = state.ItemState{
Id: p.Id,
@@ -187,23 +187,6 @@ func (c *Cache) GetState() *state.CacheState {
ReaderType: 3,
}
}
if p, ok := c.pieces[loader]; ok {
stats[loader] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 3,
}
} else {
stats[loader] = state.ItemState{
Id: loader,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 3,
}
}
}
c.muReaders.Unlock()
@@ -248,7 +231,7 @@ func (c *Cache) getRemPieces() []*Piece {
fill += p.Size
c.muReaders.Lock()
for r, _ := range c.readers {
start, end := r.getUsedPiecesRange()
start, end := r.getPiecesRange()
if id < start || id > end {
piecesRemove = append(piecesRemove, p)
}

View File

@@ -18,8 +18,9 @@ type Reader struct {
mu sync.Mutex
///Preload
lastRangeBegin int // num piece
lastRangeEnd int // num piece
isPreload bool
loaderOffset int64
muPreload sync.Mutex
}

View File

@@ -1,122 +1,58 @@
package torrstor
import (
"io"
"github.com/dustin/go-humanize"
"server/log"
"github.com/anacrolix/torrent"
"server/settings"
)
func (r *Reader) getUsedPiecesRange() (int, int) {
startOff := r.getStartLoaderOffset()
endOff := r.getEndLoaderOffset()
func (r *Reader) getPiecesRange() (int, int) {
startOff, endOff := r.getReaderRange()
return r.getPieceNum(startOff), r.getPieceNum(endOff)
}
func (r *Reader) getReaderPieces() (int, int) {
func (r *Reader) getReaderPiece() int {
readerOff := r.offset
loaderOff := r.loaderOffset
return r.getPieceNum(readerOff), r.getPieceNum(loaderOff)
}
//////////////////////////////////////////////////////////////
/// Прелоадер начинает загрузку от старта плеера+RAH и имеет свой RAH
/// Прелоадер грузит до конца-RAH
func (r *Reader) preload() {
r.muPreload.Lock()
// загрузка уже идет или конец меньше RAH, тогда старается основной ридер
if r.isPreload || r.getEndLoaderOffset()-r.loaderOffset < r.readahead {
r.muPreload.Unlock()
return
}
r.isPreload = true
r.muPreload.Unlock()
log.TLogln("Start buffering from", humanize.IBytes(uint64(r.loaderOffset)))
go func() {
// получаем ридер
buffReader := r.file.NewReader()
defer func() {
r.isPreload = false
buffReader.Close()
log.TLogln("End buffering")
}()
// ищем не прочитанный кусок
r.loaderOffset = r.findPreloadedStart()
// выходим если ничего подгружать не нужно
if r.loaderOffset >= r.getEndLoaderOffset() {
return
}
// двигаем лоадер
buffReader.Seek(r.loaderOffset, io.SeekStart)
buff := make([]byte, 1024)
// isReadahead чтобы меньше переключать RAH
isReadahead := false
buffReader.SetReadahead(0)
// читаем пока позиция лоадера меньше конца и не закрыт ридер
for r.loaderOffset < r.getEndLoaderOffset()-1024 && !r.isClosed {
off, err := buffReader.Read(buff)
if err != nil {
log.TLogln("Error read e head buffer", err)
return
}
r.loaderOffset += int64(off)
// если лоадер не успевает загрузить данные и вошел на границу загрузки основного ридера, двигаем его
if r.loaderOffset < r.offset+r.readahead {
// подвигаем за границу основного ридера+1 кусок
r.loaderOffset = r.offset + r.readahead + r.cache.pieceLength
buffReader.Seek(r.loaderOffset, io.SeekStart)
}
// если ридер подобрался к концу-RAH
if r.loaderOffset > r.getEndLoaderOffset()-r.readahead-1024 && isReadahead {
// читаем конец без RAH
log.TLogln("disable buffering RAH")
buffReader.SetReadahead(0)
isReadahead = false
} else if r.loaderOffset < r.getEndLoaderOffset()-r.readahead-1024 && !isReadahead {
// Конец удалился и можно включить RAH
log.TLogln("enable buffering RAH")
buffReader.SetReadahead(r.readahead)
isReadahead = true
}
//log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.loaderOffset)), humanize.IBytes(uint64(r.endLoaderOffset)))
}
}()
}
func (r *Reader) getStartLoaderOffset() int64 {
off := r.offset - r.cache.capacity/2
if off < 0 {
off = 0
}
return off
}
func (r *Reader) getEndLoaderOffset() int64 {
off := r.offset + r.cache.capacity/2
if off > r.file.Length() {
off = r.file.Length()
}
return off
}
func (r *Reader) findPreloadedStart() int64 {
found := false
pstart := r.getPieceNum(r.offset + r.readahead)
pend := r.getPieceNum(r.getEndLoaderOffset())
for i := pstart; i < pend; i++ {
if r.cache.pieces[i].Size < r.cache.pieces[i].Length {
pstart = i
found = true
break
}
}
if !found {
return r.getEndLoaderOffset()
}
return int64(pstart) * r.cache.pieceLength
return r.getPieceNum(readerOff)
}
func (r *Reader) getPieceNum(offset int64) int {
return int((offset + r.file.Offset()) / r.cache.pieceLength)
}
func (r *Reader) getReaderRange() (int64, int64) {
prc := int64(settings.BTsets.ReaderPreload)
beginOffset := r.offset - r.cache.capacity*(100-prc)/100
endOffset := r.offset + r.cache.capacity*prc/100
if beginOffset < 0 {
beginOffset = 0
}
if endOffset > r.file.Length() {
endOffset = r.file.Length()
}
return beginOffset, endOffset
}
func (r *Reader) preload() {
torr := r.file.Torrent()
begin, end := r.getPiecesRange()
rahPiece := int(r.readahead / torr.Info().PieceLength)
readerPiece := r.getReaderPiece()
for i := r.lastRangeBegin; i < r.lastRangeEnd; i++ {
if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces
continue
}
piece := torr.Piece(i)
piece.SetPriority(torrent.PiecePriorityNone)
}
for i := begin; i < end; i++ {
if i >= readerPiece && i <= readerPiece+rahPiece { // reader pieces
continue
}
torr.Piece(i).SetPriority(torrent.PiecePriorityNormal)
}
r.lastRangeBegin, r.lastRangeEnd = begin, end
}