This commit is contained in:
YouROK
2020-12-11 16:29:26 +03:00
parent 27675c89f2
commit d90a3aee43
3 changed files with 83 additions and 63 deletions

View File

@@ -133,7 +133,7 @@ func (c *Cache) GetState() *state.CacheState {
c.muReaders.Lock() c.muReaders.Lock()
for r, _ := range c.readers { for r, _ := range c.readers {
start, prereader, end := r.getUsedPieces() start, end := r.getUsedPiecesRange()
if p, ok := c.pieces[start]; ok { if p, ok := c.pieces[start]; ok {
stats[start] = state.ItemState{ stats[start] = state.ItemState{
Id: p.Id, Id: p.Id,
@@ -151,25 +151,6 @@ func (c *Cache) GetState() *state.CacheState {
ReaderType: 1, ReaderType: 1,
} }
} }
if p, ok := c.pieces[prereader]; ok {
stats[prereader] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 3,
}
} else {
stats[prereader] = state.ItemState{
Id: prereader,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 3,
}
}
if p, ok := c.pieces[end]; ok { if p, ok := c.pieces[end]; ok {
stats[end] = state.ItemState{ stats[end] = state.ItemState{
Id: p.Id, Id: p.Id,
@@ -187,6 +168,42 @@ func (c *Cache) GetState() *state.CacheState {
ReaderType: 2, ReaderType: 2,
} }
} }
reader, loader := r.getReaderPieces()
if p, ok := c.pieces[reader]; ok {
stats[reader] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 3,
}
} else {
stats[reader] = state.ItemState{
Id: reader,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 3,
}
}
if p, ok := c.pieces[loader]; ok {
stats[loader] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 3,
}
} else {
stats[loader] = state.ItemState{
Id: loader,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 3,
}
}
} }
c.muReaders.Unlock() c.muReaders.Unlock()
@@ -231,7 +248,7 @@ func (c *Cache) getRemPieces() []*Piece {
fill += p.Size fill += p.Size
c.muReaders.Lock() c.muReaders.Lock()
for r, _ := range c.readers { for r, _ := range c.readers {
start, _, end := r.getUsedPieces() start, end := r.getUsedPiecesRange()
if id < start || id > end { if id < start || id > end {
piecesRemove = append(piecesRemove, p) piecesRemove = append(piecesRemove, p)
} }

View File

@@ -18,10 +18,9 @@ type Reader struct {
mu sync.Mutex mu sync.Mutex
///Preload ///Preload
isPreload bool isPreload bool
endOffsetPreload int64 loaderOffset int64
currOffsetPreload int64 muPreload sync.Mutex
muPreload sync.Mutex
} }
func NewReader(file *torrent.File, cache *Cache) *Reader { func NewReader(file *torrent.File, cache *Cache) *Reader {

View File

@@ -7,92 +7,103 @@ import (
"server/log" "server/log"
) )
func (r *Reader) getUsedPieces() (int, int, int) { func (r *Reader) getUsedPiecesRange() (int, int) {
startOff, endOff := r.offset, r.endOffsetPreload startOff := r.getStartLoaderOffset()
if startOff < endOff { endOff := r.getEndLoaderOffset()
endOff = startOff + r.readahead return r.getPieceNum(startOff), r.getPieceNum(endOff)
} }
return r.getRangePieces(r.offset, r.currOffsetPreload, r.endOffsetPreload)
func (r *Reader) getReaderPieces() (int, int) {
readerOff := r.offset
loaderOff := r.loaderOffset
return r.getPieceNum(readerOff), r.getPieceNum(loaderOff)
} }
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
/// Прелоадер начинает загрузку от старта плеера+RAH и имеет свой RAH /// Прелоадер начинает загрузку от старта плеера+RAH и имеет свой RAH
/// Прелоадер грузит до конца-RAH /// Прелоадер грузит до конца-RAH
func (r *Reader) preload() { func (r *Reader) preload() {
// определяем конец загрузки
r.endOffsetPreload = r.offset + r.cache.capacity - 1024
// конец за пределами конца файла, тримим
if r.endOffsetPreload > r.file.Length() {
r.endOffsetPreload = r.file.Length()
}
r.muPreload.Lock() r.muPreload.Lock()
// загрузка уже идет или конец меньше RAH, тогда старается основной ридер // загрузка уже идет или конец меньше RAH, тогда старается основной ридер
if r.isPreload || r.endOffsetPreload < r.readahead { if r.isPreload || r.getEndLoaderOffset()-r.loaderOffset < r.readahead {
r.muPreload.Unlock() r.muPreload.Unlock()
return return
} }
r.isPreload = true r.isPreload = true
r.muPreload.Unlock() r.muPreload.Unlock()
log.TLogln("Start buffering from", humanize.IBytes(uint64(r.currOffsetPreload))) log.TLogln("Start buffering from", humanize.IBytes(uint64(r.loaderOffset)))
go func() { go func() {
// получаем ридер // получаем ридер
buffReader := r.file.NewReader() buffReader := r.file.NewReader()
defer func() { defer func() {
r.isPreload = false r.isPreload = false
buffReader.Close() buffReader.Close()
log.TLogln("End buffering")
}() }()
// ищем не прочитанный кусок // ищем не прочитанный кусок
r.currOffsetPreload = r.findPreloadedStart() r.loaderOffset = r.findPreloadedStart()
// выходим если ничего подгружать не нужно // выходим если ничего подгружать не нужно
if r.currOffsetPreload >= r.endOffsetPreload { if r.loaderOffset >= r.getEndLoaderOffset() {
return return
} }
// двигаем лоадер // двигаем лоадер
buffReader.Seek(r.currOffsetPreload, io.SeekStart) buffReader.Seek(r.loaderOffset, io.SeekStart)
buff := make([]byte, 1024) buff := make([]byte, 1024)
// isReadahead чтобы меньше переключать RAH // isReadahead чтобы меньше переключать RAH
isReadahead := false isReadahead := false
buffReader.SetReadahead(0) buffReader.SetReadahead(0)
// читаем пока позиция лоадера меньше конца и не закрыт ридер // читаем пока позиция лоадера меньше конца и не закрыт ридер
for r.currOffsetPreload < r.endOffsetPreload-1024 && !r.isClosed { for r.loaderOffset < r.getEndLoaderOffset()-1024 && !r.isClosed {
off, err := buffReader.Read(buff) off, err := buffReader.Read(buff)
if err != nil { if err != nil {
log.TLogln("Error read e head buffer", err) log.TLogln("Error read e head buffer", err)
return return
} }
r.currOffsetPreload += int64(off) r.loaderOffset += int64(off)
// пересчитываем конец загрузки
r.endOffsetPreload = r.offset + r.cache.capacity
// если лоадер не успевает загрузить данные и вошел на границу загрузки основного ридера, двигаем его // если лоадер не успевает загрузить данные и вошел на границу загрузки основного ридера, двигаем его
if r.currOffsetPreload < r.offset+r.readahead { if r.loaderOffset < r.offset+r.readahead {
// подвигаем за границу основного ридера+1 кусок // подвигаем за границу основного ридера+1 кусок
r.currOffsetPreload = r.offset + r.readahead + r.cache.pieceLength r.loaderOffset = r.offset + r.readahead + r.cache.pieceLength
buffReader.Seek(r.currOffsetPreload, io.SeekStart) buffReader.Seek(r.loaderOffset, io.SeekStart)
} }
// если ридер подобрался к концу-RAH // если ридер подобрался к концу-RAH
if r.currOffsetPreload > r.endOffsetPreload-r.readahead-1024 && isReadahead { if r.loaderOffset > r.getEndLoaderOffset()-r.readahead-1024 && isReadahead {
// читаем конец без RAH // читаем конец без RAH
log.TLogln("disable buffering RAH") log.TLogln("disable buffering RAH")
buffReader.SetReadahead(0) buffReader.SetReadahead(0)
isReadahead = false isReadahead = false
} else if r.currOffsetPreload < r.endOffsetPreload-r.readahead-1024 && !isReadahead { } else if r.loaderOffset < r.getEndLoaderOffset()-r.readahead-1024 && !isReadahead {
// Конец удалился и можно включить RAH // Конец удалился и можно включить RAH
log.TLogln("enable buffering RAH") log.TLogln("enable buffering RAH")
buffReader.SetReadahead(r.readahead) buffReader.SetReadahead(r.readahead)
isReadahead = true isReadahead = true
} }
//log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.currOffsetPreload)), humanize.IBytes(uint64(r.endOffsetPreload))) //log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.loaderOffset)), humanize.IBytes(uint64(r.endLoaderOffset)))
} }
log.TLogln("End buffering")
}() }()
} }
func (r *Reader) getStartLoaderOffset() int64 {
off := r.offset - r.cache.capacity/2
if off < 0 {
off = 0
}
return off
}
func (r *Reader) getEndLoaderOffset() int64 {
off := r.offset + r.cache.capacity/2
if off > r.file.Length() {
off = r.file.Length()
}
return off
}
func (r *Reader) findPreloadedStart() int64 { func (r *Reader) findPreloadedStart() int64 {
found := false found := false
pstart := r.getPieceNum(r.offset + r.readahead) pstart := r.getPieceNum(r.offset + r.readahead)
pend := r.getPieceNum(r.endOffsetPreload) pend := r.getPieceNum(r.getEndLoaderOffset())
for i := pstart; i < pend; i++ { for i := pstart; i < pend; i++ {
if r.cache.pieces[i].Size < r.cache.pieces[i].Length { if r.cache.pieces[i].Size < r.cache.pieces[i].Length {
pstart = i pstart = i
@@ -101,18 +112,11 @@ func (r *Reader) findPreloadedStart() int64 {
} }
} }
if !found { if !found {
return r.endOffsetPreload return r.getEndLoaderOffset()
} }
return int64(pstart) * r.cache.pieceLength return int64(pstart) * r.cache.pieceLength
} }
func (r *Reader) getRangePieces(offCurr, offReader, offEnd int64) (int, int, int) {
currPiece := r.getPieceNum(offCurr)
readerPiece := r.getPieceNum(offReader)
endPiece := r.getPieceNum(offEnd)
return currPiece, readerPiece, endPiece
}
func (r *Reader) getPieceNum(offset int64) int { func (r *Reader) getPieceNum(offset int64) int {
return int((offset + r.file.Offset()) / r.cache.pieceLength) return int((offset + r.file.Offset()) / r.cache.pieceLength)
} }