change reader

This commit is contained in:
YouROK
2020-12-08 15:49:36 +03:00
parent 915b17058c
commit ab16167a12
8 changed files with 337 additions and 296 deletions

View File

@@ -1,112 +0,0 @@
package torr
import (
"fmt"
"io"
"github.com/anacrolix/torrent"
"server/log"
)
type Reader struct {
torrent.Reader
offset int64
readahead int64
file *torrent.File
torr *Torrent
isClosed bool
///Preload
isPreload bool
endOffsetPreload int64
currOffsetPreload int64
}
func NewReader(torr *Torrent, file *torrent.File, readahead int64) *Reader {
r := new(Reader)
r.file = file
r.Reader = file.NewReader()
if readahead <= 0 {
readahead = torr.Torrent.Info().PieceLength
}
r.SetReadahead(readahead)
torr.GetCache().AddReader(r)
r.torr = torr
return r
}
func (r *Reader) Seek(offset int64, whence int) (n int64, err error) {
switch whence {
case io.SeekStart:
r.offset = offset
case io.SeekCurrent:
r.offset += offset
case io.SeekEnd:
r.offset = r.file.Length() - offset
}
n, err = r.Reader.Seek(offset, whence)
r.offset = n
return
}
func (r *Reader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.offset += int64(n)
r.preload()
return
}
func (r *Reader) SetReadahead(length int64) {
r.Reader.SetReadahead(length)
r.readahead = length
}
func (r *Reader) Offset() int64 {
return r.offset
}
func (r *Reader) Readahead() int64 {
return r.readahead
}
func (r *Reader) Close() error {
r.isClosed = true
return r.Reader.Close()
}
func (r *Reader) preload() {
r.currOffsetPreload = r.offset
capacity := r.torr.cache.GetCapacity()
plength := r.torr.Info().PieceLength
r.endOffsetPreload = r.offset + capacity - r.readahead - plength*3
if r.endOffsetPreload > r.file.Length() {
r.endOffsetPreload = r.file.Length()
}
if r.endOffsetPreload < r.readahead || r.isPreload {
return
}
r.isPreload = true
//TODO remove logs
fmt.Println("Start buffering...")
go func() {
buffReader := r.file.NewReader()
defer func() {
r.isPreload = false
buffReader.Close()
fmt.Println("End buffering...")
}()
buffReader.SetReadahead(0)
buffReader.Seek(r.currOffsetPreload, io.SeekStart)
buff5mb := make([]byte, 1024)
for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed {
off, err := buffReader.Read(buff5mb)
if err != nil {
log.TLogln("Error read e head buffer", err)
return
}
r.currOffsetPreload += int64(off)
}
}()
}

View File

@@ -1,18 +1,23 @@
package state package state
import (
"server/torr/state"
)
type CacheState struct { type CacheState struct {
Hash string Hash string
Capacity int64 Capacity int64
Filled int64 Filled int64
PiecesLength int64 PiecesLength int64
PiecesCount int PiecesCount int
DownloadSpeed float64 Torrent *state.TorrentStatus
Pieces map[int]ItemState Pieces map[int]ItemState
} }
type ItemState struct { type ItemState struct {
Id int Id int
Length int64 Length int64
Size int64 Size int64
Completed bool Completed bool
ReaderType int
} }

View File

@@ -20,7 +20,7 @@ type BufferPool struct {
func NewBufferPool(bufferLength int64, capacity int64) *BufferPool { func NewBufferPool(bufferLength int64, capacity int64) *BufferPool {
bp := new(BufferPool) bp := new(BufferPool)
buffsSize := int(capacity/bufferLength) + 3 buffsSize := int(capacity/bufferLength) + 4
bp.frees = buffsSize bp.frees = buffsSize
bp.size = bufferLength bp.size = bufferLength
return bp return bp
@@ -74,21 +74,6 @@ func (b *BufferPool) ReleaseBuffer(index int) {
} }
} }
func (b *BufferPool) Used() map[int]struct{} {
if len(b.buffs) == 0 {
b.mu.Lock()
b.mkBuffs()
b.mu.Unlock()
}
used := make(map[int]struct{})
for _, b := range b.buffs {
if b.used {
used[b.pieceId] = struct{}{}
}
}
return used
}
func (b *BufferPool) Len() int { func (b *BufferPool) Len() int {
return b.frees return b.frees
} }

View File

@@ -9,16 +9,13 @@ import (
"server/torr/storage/state" "server/torr/storage/state"
"server/torr/utils" "server/torr/utils"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
type Cache struct { type Cache struct {
storage.TorrentImpl storage.TorrentImpl
storage *Storage
s *Storage
capacity int64 capacity int64
filled int64 filled int64
@@ -26,19 +23,14 @@ type Cache struct {
pieceLength int64 pieceLength int64
pieceCount int pieceCount int
piecesBuff int
muPiece sync.Mutex
muRemove sync.Mutex
muReader sync.Mutex
isRemove bool
pieces map[int]*Piece pieces map[int]*Piece
bufferPull *BufferPool bufferPull *BufferPool
prcLoaded int readers map[*Reader]struct{}
readers map[torrent.Reader]struct{} isRemove bool
muRemove sync.Mutex
} }
func NewCache(capacity int64, storage *Storage) *Cache { func NewCache(capacity int64, storage *Storage) *Cache {
@@ -46,8 +38,8 @@ func NewCache(capacity int64, storage *Storage) *Cache {
capacity: capacity, capacity: capacity,
filled: 0, filled: 0,
pieces: make(map[int]*Piece), pieces: make(map[int]*Piece),
s: storage, storage: storage,
readers: make(map[torrent.Reader]struct{}), readers: make(map[*Reader]struct{}),
} }
return ret return ret
@@ -56,17 +48,11 @@ func NewCache(capacity int64, storage *Storage) *Cache {
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
log.TLogln("Create cache for:", info.Name, hash.HexString()) log.TLogln("Create cache for:", info.Name, hash.HexString())
if c.capacity == 0 { if c.capacity == 0 {
c.capacity = info.PieceLength * 6 c.capacity = info.PieceLength * 4
} }
//Min capacity of 2 pieces length
cap := info.PieceLength * 2
if c.capacity < cap {
c.capacity = cap
}
c.pieceLength = info.PieceLength c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces() c.pieceCount = info.NumPieces()
c.piecesBuff = int(c.capacity / c.pieceLength)
c.hash = hash c.hash = hash
c.bufferPull = NewBufferPool(c.pieceLength, c.capacity) c.bufferPull = NewBufferPool(c.pieceLength, c.capacity)
@@ -81,8 +67,6 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
} }
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock()
defer c.muPiece.Unlock()
if val, ok := c.pieces[m.Index()]; ok { if val, ok := c.pieces[m.Index()]; ok {
return val return val
} }
@@ -90,10 +74,9 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
} }
func (c *Cache) Close() error { func (c *Cache) Close() error {
c.isRemove = false
log.TLogln("Close cache for:", c.hash) log.TLogln("Close cache for:", c.hash)
if _, ok := c.s.caches[c.hash]; ok { if _, ok := c.storage.caches[c.hash]; ok {
delete(c.s.caches, c.hash) delete(c.storage.caches, c.hash)
} }
c.pieces = nil c.pieces = nil
c.bufferPull = nil c.bufferPull = nil
@@ -102,102 +85,12 @@ func (c *Cache) Close() error {
return nil return nil
} }
func (c *Cache) cleanPieces() {
if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
remPieces := c.getRemPieces()
if len(remPieces) > 0 && (c.filled > c.capacity || c.bufferPull.Len() <= 1) {
remCount := int((c.filled - c.capacity) / c.pieceLength)
if remCount < 1 {
remCount = 1
}
if remCount > len(remPieces) {
remCount = len(remPieces)
}
remPieces = remPieces[:remCount]
for _, p := range remPieces {
c.removePiece(p)
}
}
}
func (c *Cache) getRemPieces() []*Piece {
pieces := make([]*Piece, 0)
fill := int64(0)
loading := 0
used := c.bufferPull.Used()
for u := range used {
if v, ok := c.pieces[u]; ok {
if v.Size > 0 {
if v.Id > 0 {
pieces = append(pieces, v)
}
fill += v.Size
if !v.complete {
loading++
}
}
}
}
c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed < pieces[j].accessed
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces
}
func (c *Cache) removePiece(piece *Piece) { func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release() piece.Release()
utils.FreeOSMemGC()
if c.prcLoaded >= 75 {
utils.FreeOSMemGC()
} else {
utils.FreeOSMem()
}
}
func prc(val, of int) int {
return int(float64(val) * 100.0 / float64(of))
}
func (c *Cache) AddReader(r torrent.Reader) {
c.muReader.Lock()
defer c.muReader.Unlock()
c.readers[r] = struct{}{}
}
func (c *Cache) RemReader(r torrent.Reader) {
c.muReader.Lock()
defer c.muReader.Unlock()
delete(c.readers, r)
}
func (c *Cache) ReadersLen() int {
if c == nil || c.readers == nil {
return 0
}
return len(c.readers)
} }
func (c *Cache) AdjustRA(readahead int64) { func (c *Cache) AdjustRA(readahead int64) {
c.muReader.Lock()
defer c.muReader.Unlock()
if settings.BTsets.CacheSize == 0 { if settings.BTsets.CacheSize == 0 {
c.capacity = readahead * 3 c.capacity = readahead * 3
} }
@@ -214,26 +107,111 @@ func (c *Cache) GetState() *state.CacheState {
cState.Hash = c.hash.HexString() cState.Hash = c.hash.HexString()
stats := make(map[int]state.ItemState, 0) stats := make(map[int]state.ItemState, 0)
c.muPiece.Lock()
var fill int64 = 0 var fill int64 = 0
for _, p := range c.pieces { for _, p := range c.pieces {
if p.Size > 0 { if p.Size > 0 {
fill += p.Length fill += p.Length
stats[p.Id] = state.ItemState{ stats[p.Id] = state.ItemState{
Id: p.Id, Id: p.Id,
Size: p.Size, Size: p.Size,
Length: p.Length, Length: p.Length,
Completed: p.complete, Completed: p.complete,
ReaderType: 0,
} }
} }
} }
for r, _ := range c.readers {
start, end := r.getUsedPieces()
if p, ok := c.pieces[start]; ok {
stats[start] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 1,
}
} else {
stats[start] = state.ItemState{
Id: start,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 1,
}
}
if p, ok := c.pieces[end]; ok {
stats[end] = state.ItemState{
Id: p.Id,
Size: p.Size,
Length: p.Length,
Completed: p.complete,
ReaderType: 2,
}
} else {
stats[end] = state.ItemState{
Id: end,
Size: 0,
Length: c.pieceLength,
Completed: false,
ReaderType: 2,
}
}
}
c.filled = fill c.filled = fill
c.muPiece.Unlock()
cState.Filled = c.filled cState.Filled = c.filled
cState.Pieces = stats cState.Pieces = stats
return cState return cState
} }
func (c *Cache) GetCapacity() int64 { func (c *Cache) cleanPieces() {
return c.capacity if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
remPieces := c.getRemPieces()
if c.filled > c.capacity {
rems := (c.filled - c.capacity) / c.pieceLength
for _, p := range remPieces {
c.removePiece(p)
rems--
if rems <= 0 {
break
}
}
}
}
func (c *Cache) getRemPieces() []*Piece {
piecesRemove := make([]*Piece, 0)
fill := int64(0)
for id, p := range c.pieces {
if p.Size > 0 {
fill += p.Size
for r, _ := range c.readers {
start, end := r.getUsedPieces()
if id < start || id > end {
piecesRemove = append(piecesRemove, p)
}
}
}
}
sort.Slice(piecesRemove, func(i, j int) bool {
return piecesRemove[i].accessed < piecesRemove[j].accessed
})
c.filled = fill
return piecesRemove
} }

View File

@@ -0,0 +1,90 @@
package torrstor
import (
"io"
"github.com/anacrolix/torrent"
)
type Reader struct {
torrent.Reader
offset int64
readahead int64
file *torrent.File
cache *Cache
isClosed bool
///Preload
isPreload bool
endOffsetPreload int64
currOffsetPreload int64
}
func NewReader(file *torrent.File, cache *Cache) *Reader {
r := new(Reader)
r.file = file
r.Reader = file.NewReader()
r.SetReadAHead(0)
r.cache = cache
r.cache.readers[r] = struct{}{}
return r
}
func (r *Reader) Seek(offset int64, whence int) (n int64, err error) {
if r.isClosed {
return 0, io.EOF
}
switch whence {
case io.SeekStart:
r.offset = offset
case io.SeekCurrent:
r.offset += offset
case io.SeekEnd:
r.offset = r.file.Length() - offset
}
n, err = r.Reader.Seek(offset, whence)
r.offset = n
return
}
func (r *Reader) Read(p []byte) (n int, err error) {
if r.isClosed {
return 0, io.EOF
}
n, err = r.Reader.Read(p)
r.offset += int64(n)
go r.preload()
return
}
func (r *Reader) SetReadAHead(length int64) {
r.Reader.SetReadahead(length)
r.readahead = length
}
func (r *Reader) Offset() int64 {
return r.offset
}
func (r *Reader) ReadAHead() int64 {
return r.readahead
}
func (r *Reader) Close() error {
r.isClosed = true
delete(r.cache.readers, r)
return r.Reader.Close()
}
func (c *Cache) NewReader(file *torrent.File) *Reader {
return NewReader(file, c)
}
func (c *Cache) Readers() int {
if c == nil || c.readers == nil {
return 0
}
return len(c.readers)
}

View File

@@ -0,0 +1,64 @@
package torrstor
import (
"fmt"
"io"
"github.com/dustin/go-humanize"
"server/log"
)
func (r *Reader) getUsedPieces() (int, int) {
startOff, endOff := r.offset, r.endOffsetPreload
if startOff < endOff {
endOff = startOff + r.readahead
}
return r.getRangePieces(r.offset, r.endOffsetPreload)
}
func (r *Reader) preload() {
r.currOffsetPreload = r.offset
r.endOffsetPreload = r.offset + r.cache.capacity
if r.endOffsetPreload > r.file.Length() {
r.endOffsetPreload = r.file.Length()
}
if r.isPreload || r.endOffsetPreload < r.readahead {
return
}
r.isPreload = true
//TODO remove logs
fmt.Println("Start buffering...", humanize.Bytes(uint64(r.offset)), humanize.Bytes(uint64(r.endOffsetPreload)))
go func() {
buffReader := r.file.NewReader()
defer func() {
r.isPreload = false
buffReader.Close()
fmt.Println("End buffering...")
}()
buffReader.SetReadahead(1)
buffReader.Seek(r.currOffsetPreload, io.SeekStart)
buff := make([]byte, 1024)
for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed {
off, err := buffReader.Read(buff)
if err != nil {
log.TLogln("Error read e head buffer", err)
return
}
r.currOffsetPreload += int64(off)
}
}()
}
func (r *Reader) getRangePieces(offCurr, offEnd int64) (int, int) {
currPiece := r.getPieceNum(offCurr)
endPiece := r.getPieceNum(offEnd)
return currPiece, endPiece
}
func (r *Reader) getPieceNum(offset int64) int {
return int((offset + r.file.Offset()) / r.cache.pieceLength)
}

View File

@@ -18,7 +18,7 @@ func (t *Torrent) Stream(fileIndex int, req *http.Request, resp http.ResponseWri
return errors.New("file index out of range") return errors.New("file index out of range")
} }
file := files[fileIndex-1] file := files[fileIndex-1]
reader := t.NewReader(file, 0) reader := t.NewReader(file)
log.Println("Connect client") log.Println("Connect client")

View File

@@ -166,9 +166,9 @@ func (t *Torrent) progressEvent() {
} }
func (t *Torrent) updateRA() { func (t *Torrent) updateRA() {
if t.BytesReadUsefulData > settings.BTsets.PreloadBufferSize { if t.Torrent != nil && t.Torrent.Info() != nil {
pieceLen := t.Torrent.Info().PieceLength pieceLen := t.Torrent.Info().PieceLength
adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.ReadersLen()) adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.Readers())
switch { switch {
case adj < pieceLen: case adj < pieceLen:
adj = pieceLen adj = pieceLen
@@ -180,7 +180,7 @@ func (t *Torrent) updateRA() {
} }
func (t *Torrent) expired() bool { func (t *Torrent) expired() bool {
return t.cache.ReadersLen() == 0 && t.expiredTime.Before(time.Now()) && (t.Stat == state.TorrentWorking || t.Stat == state.TorrentClosed) return t.cache.Readers() == 0 && t.expiredTime.Before(time.Now()) && (t.Stat == state.TorrentWorking || t.Stat == state.TorrentClosed)
} }
func (t *Torrent) Files() []*torrent.File { func (t *Torrent) Files() []*torrent.File {
@@ -208,17 +208,16 @@ func (t *Torrent) Length() int64 {
return t.Torrent.Length() return t.Torrent.Length()
} }
func (t *Torrent) NewReader(file *torrent.File, readahead int64) *Reader { func (t *Torrent) NewReader(file *torrent.File) *torrstor.Reader {
if t.Stat == state.TorrentClosed { if t.Stat == state.TorrentClosed {
return nil return nil
} }
reader := NewReader(t, file, readahead) reader := t.cache.NewReader(file)
return reader return reader
} }
func (t *Torrent) CloseReader(reader *Reader) { func (t *Torrent) CloseReader(reader *torrstor.Reader) {
reader.Close() reader.Close()
t.cache.RemReader(reader)
t.expiredTime = time.Now().Add(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)) t.expiredTime = time.Now().Add(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout))
} }
@@ -264,39 +263,71 @@ func (t *Torrent) Preload(index int, size int64) {
file = t.Files()[0] file = t.Files()[0]
} }
buff5mb := int64(5 * 1024 * 1024) readerStart := file.NewReader()
startPreloadLength := size if readerStart == nil {
endPreloadOffset := int64(0)
if startPreloadLength > buff5mb {
endPreloadOffset = file.Offset() + file.Length() - buff5mb
}
readerPre := t.NewReader(file, startPreloadLength)
if readerPre == nil {
return return
} }
defer func() { defer func() {
t.CloseReader(readerPre) readerStart.Close()
t.expiredTime = time.Now().Add(time.Minute * 5) t.expiredTime = time.Now().Add(time.Minute * 5)
}() }()
readerEnd := file.NewReader()
if endPreloadOffset > 0 { if readerEnd == nil {
readerPost := t.NewReader(file, 1) return
if readerPre == nil {
return
}
readerPost.Seek(endPreloadOffset, io.SeekStart)
readerPost.SetReadahead(buff5mb)
defer func() {
t.CloseReader(readerPost)
t.expiredTime = time.Now().Add(time.Minute * 5)
}()
} }
defer func() {
readerEnd.Close()
}()
readerStart.SetReadahead(0)
readerEnd.SetReadahead(0)
if size > file.Length() { if size > file.Length() {
size = file.Length() size = file.Length()
} }
/// preload from start
go func() {
defer func() {
t.Stat = state.TorrentWorking
}()
offset := int64(0)
end := size - (2 * t.Info().PieceLength)
if end < 0 {
end = size
}
buf := make([]byte, 1024)
readerStart.Seek(offset, io.SeekStart)
for offset < end {
off, err := readerStart.Read(buf)
if err != nil {
if err != io.EOF {
log.TLogln("Error preload:", err)
}
break
}
offset += int64(off)
}
}()
/// preload from end -2 pieces
go func() {
offset := file.Length() - (2 * t.Info().PieceLength)
end := file.Length() - 1024
if offset < 0 || end < 0 {
return
}
buf := make([]byte, 1024)
readerEnd.Seek(offset, io.SeekStart)
for offset < end {
off, err := readerEnd.Read(buf)
if err != nil {
if err != io.EOF {
log.TLogln("Error preload:", err)
}
break
}
offset += int64(off)
}
}()
t.PreloadSize = size t.PreloadSize = size
var lastSize int64 = 0 var lastSize int64 = 0
errCount := 0 errCount := 0
@@ -408,7 +439,7 @@ func (t *Torrent) Status() *state.TorrentStatus {
func (t *Torrent) CacheState() *cacheSt.CacheState { func (t *Torrent) CacheState() *cacheSt.CacheState {
if t.Torrent != nil && t.cache != nil { if t.Torrent != nil && t.cache != nil {
st := t.cache.GetState() st := t.cache.GetState()
st.DownloadSpeed = t.DownloadSpeed st.Torrent = t.Status()
return st return st
} }
return nil return nil