diff --git a/src/server/torr/storage/torrstor/cache.go b/src/server/torr/storage/torrstor/cache.go index 89da0e6..cd87400 100644 --- a/src/server/torr/storage/torrstor/cache.go +++ b/src/server/torr/storage/torrstor/cache.go @@ -133,7 +133,7 @@ func (c *Cache) GetState() *state.CacheState { c.muReaders.Lock() for r, _ := range c.readers { - start, prereader, end := r.getUsedPieces() + start, end := r.getUsedPiecesRange() if p, ok := c.pieces[start]; ok { stats[start] = state.ItemState{ Id: p.Id, @@ -151,25 +151,6 @@ func (c *Cache) GetState() *state.CacheState { ReaderType: 1, } } - - if p, ok := c.pieces[prereader]; ok { - stats[prereader] = state.ItemState{ - Id: p.Id, - Size: p.Size, - Length: p.Length, - Completed: p.complete, - ReaderType: 3, - } - } else { - stats[prereader] = state.ItemState{ - Id: prereader, - Size: 0, - Length: c.pieceLength, - Completed: false, - ReaderType: 3, - } - } - if p, ok := c.pieces[end]; ok { stats[end] = state.ItemState{ Id: p.Id, @@ -187,6 +168,42 @@ func (c *Cache) GetState() *state.CacheState { ReaderType: 2, } } + + reader, loader := r.getReaderPieces() + if p, ok := c.pieces[reader]; ok { + stats[reader] = state.ItemState{ + Id: p.Id, + Size: p.Size, + Length: p.Length, + Completed: p.complete, + ReaderType: 3, + } + } else { + stats[reader] = state.ItemState{ + Id: reader, + Size: 0, + Length: c.pieceLength, + Completed: false, + ReaderType: 3, + } + } + if p, ok := c.pieces[loader]; ok { + stats[loader] = state.ItemState{ + Id: p.Id, + Size: p.Size, + Length: p.Length, + Completed: p.complete, + ReaderType: 3, + } + } else { + stats[loader] = state.ItemState{ + Id: loader, + Size: 0, + Length: c.pieceLength, + Completed: false, + ReaderType: 3, + } + } } c.muReaders.Unlock() @@ -231,7 +248,7 @@ func (c *Cache) getRemPieces() []*Piece { fill += p.Size c.muReaders.Lock() for r, _ := range c.readers { - start, _, end := r.getUsedPieces() + start, end := r.getUsedPiecesRange() if id < start || id > end { piecesRemove = append(piecesRemove, p) } diff --git a/src/server/torr/storage/torrstor/reader.go b/src/server/torr/storage/torrstor/reader.go index 3c4c943..92ef994 100644 --- a/src/server/torr/storage/torrstor/reader.go +++ b/src/server/torr/storage/torrstor/reader.go @@ -18,10 +18,9 @@ type Reader struct { mu sync.Mutex ///Preload - isPreload bool - endOffsetPreload int64 - currOffsetPreload int64 - muPreload sync.Mutex + isPreload bool + loaderOffset int64 + muPreload sync.Mutex } func NewReader(file *torrent.File, cache *Cache) *Reader { diff --git a/src/server/torr/storage/torrstor/readerloader.go b/src/server/torr/storage/torrstor/readerloader.go index 32ae5df..1c94066 100644 --- a/src/server/torr/storage/torrstor/readerloader.go +++ b/src/server/torr/storage/torrstor/readerloader.go @@ -7,92 +7,103 @@ import ( "server/log" ) -func (r *Reader) getUsedPieces() (int, int, int) { - startOff, endOff := r.offset, r.endOffsetPreload - if startOff < endOff { - endOff = startOff + r.readahead - } - return r.getRangePieces(r.offset, r.currOffsetPreload, r.endOffsetPreload) +func (r *Reader) getUsedPiecesRange() (int, int) { + startOff := r.getStartLoaderOffset() + endOff := r.getEndLoaderOffset() + return r.getPieceNum(startOff), r.getPieceNum(endOff) +} + +func (r *Reader) getReaderPieces() (int, int) { + readerOff := r.offset + loaderOff := r.loaderOffset + return r.getPieceNum(readerOff), r.getPieceNum(loaderOff) } ////////////////////////////////////////////////////////////// /// Прелоадер начинает загрузку от старта плеера+RAH и имеет свой RAH /// Прелоадер грузит до конца-RAH func (r *Reader) preload() { - // определяем конец загрузки - r.endOffsetPreload = r.offset + r.cache.capacity - 1024 - - // конец за пределами конца файла, тримим - if r.endOffsetPreload > r.file.Length() { - r.endOffsetPreload = r.file.Length() - } r.muPreload.Lock() // загрузка уже идет или конец меньше RAH, тогда старается основной ридер - if r.isPreload || r.endOffsetPreload < r.readahead { + if r.isPreload || r.getEndLoaderOffset()-r.loaderOffset < r.readahead { r.muPreload.Unlock() return } r.isPreload = true r.muPreload.Unlock() - log.TLogln("Start buffering from", humanize.IBytes(uint64(r.currOffsetPreload))) + log.TLogln("Start buffering from", humanize.IBytes(uint64(r.loaderOffset))) go func() { // получаем ридер buffReader := r.file.NewReader() defer func() { r.isPreload = false buffReader.Close() + log.TLogln("End buffering") }() // ищем не прочитанный кусок - r.currOffsetPreload = r.findPreloadedStart() + r.loaderOffset = r.findPreloadedStart() // выходим если ничего подгружать не нужно - if r.currOffsetPreload >= r.endOffsetPreload { + if r.loaderOffset >= r.getEndLoaderOffset() { return } // двигаем лоадер - buffReader.Seek(r.currOffsetPreload, io.SeekStart) + buffReader.Seek(r.loaderOffset, io.SeekStart) buff := make([]byte, 1024) // isReadahead чтобы меньше переключать RAH isReadahead := false buffReader.SetReadahead(0) // читаем пока позиция лоадера меньше конца и не закрыт ридер - for r.currOffsetPreload < r.endOffsetPreload-1024 && !r.isClosed { + for r.loaderOffset < r.getEndLoaderOffset()-1024 && !r.isClosed { off, err := buffReader.Read(buff) if err != nil { log.TLogln("Error read e head buffer", err) return } - r.currOffsetPreload += int64(off) - // пересчитываем конец загрузки - r.endOffsetPreload = r.offset + r.cache.capacity + r.loaderOffset += int64(off) // если лоадер не успевает загрузить данные и вошел на границу загрузки основного ридера, двигаем его - if r.currOffsetPreload < r.offset+r.readahead { + if r.loaderOffset < r.offset+r.readahead { // подвигаем за границу основного ридера+1 кусок - r.currOffsetPreload = r.offset + r.readahead + r.cache.pieceLength - buffReader.Seek(r.currOffsetPreload, io.SeekStart) + r.loaderOffset = r.offset + r.readahead + r.cache.pieceLength + buffReader.Seek(r.loaderOffset, io.SeekStart) } // если ридер подобрался к концу-RAH - if r.currOffsetPreload > r.endOffsetPreload-r.readahead-1024 && isReadahead { + if r.loaderOffset > r.getEndLoaderOffset()-r.readahead-1024 && isReadahead { // читаем конец без RAH log.TLogln("disable buffering RAH") buffReader.SetReadahead(0) isReadahead = false - } else if r.currOffsetPreload < r.endOffsetPreload-r.readahead-1024 && !isReadahead { + } else if r.loaderOffset < r.getEndLoaderOffset()-r.readahead-1024 && !isReadahead { // Конец удалился и можно включить RAH log.TLogln("enable buffering RAH") buffReader.SetReadahead(r.readahead) isReadahead = true } - //log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.currOffsetPreload)), humanize.IBytes(uint64(r.endOffsetPreload))) + //log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.loaderOffset)), humanize.IBytes(uint64(r.endLoaderOffset))) } - log.TLogln("End buffering") }() } +func (r *Reader) getStartLoaderOffset() int64 { + off := r.offset - r.cache.capacity/2 + if off < 0 { + off = 0 + } + return off +} + +func (r *Reader) getEndLoaderOffset() int64 { + off := r.offset + r.cache.capacity/2 + if off > r.file.Length() { + off = r.file.Length() + } + return off +} + func (r *Reader) findPreloadedStart() int64 { found := false pstart := r.getPieceNum(r.offset + r.readahead) - pend := r.getPieceNum(r.endOffsetPreload) + pend := r.getPieceNum(r.getEndLoaderOffset()) for i := pstart; i < pend; i++ { if r.cache.pieces[i].Size < r.cache.pieces[i].Length { pstart = i @@ -101,18 +112,11 @@ func (r *Reader) findPreloadedStart() int64 { } } if !found { - return r.endOffsetPreload + return r.getEndLoaderOffset() } return int64(pstart) * r.cache.pieceLength } -func (r *Reader) getRangePieces(offCurr, offReader, offEnd int64) (int, int, int) { - currPiece := r.getPieceNum(offCurr) - readerPiece := r.getPieceNum(offReader) - endPiece := r.getPieceNum(offEnd) - return currPiece, readerPiece, endPiece -} - func (r *Reader) getPieceNum(offset int64) int { return int((offset + r.file.Offset()) / r.cache.pieceLength) }