fix buffering cache

This commit is contained in:
YouROK
2020-12-08 22:03:48 +03:00
parent 442c1fc27a
commit 212a60a893
12 changed files with 83 additions and 31 deletions

View File

@@ -29,6 +29,7 @@ type BTSets struct {
ConnectionsLimit int ConnectionsLimit int
DhtConnectionLimit int // 0 - inf DhtConnectionLimit int // 0 - inf
PeersListenPort int PeersListenPort int
Strategy int // 0 - RequestStrategyDuplicateRequestTimeout, 1 - RequestStrategyFuzzing, 2 - RequestStrategyFastest
} }
func (v *BTSets) String() string { func (v *BTSets) String() string {

View File

@@ -1,6 +1,7 @@
package torr package torr
import ( import (
"io"
"os" "os"
"sort" "sort"
"time" "time"
@@ -121,3 +122,7 @@ func Shutdown() {
sets.CloseDB() sets.CloseDB()
os.Exit(0) os.Exit(0)
} }
func WriteStatus(w io.Writer) {
bts.client.WriteStatus(w)
}

View File

@@ -3,6 +3,7 @@ package torr
import ( import (
"log" "log"
"sync" "sync"
"time"
"server/settings" "server/settings"
"server/torr/storage/torrstor" "server/torr/storage/torrstor"
@@ -73,10 +74,6 @@ func (bt *BTServer) configure() {
bt.config.NoDefaultPortForwarding = settings.BTsets.DisableUPNP bt.config.NoDefaultPortForwarding = settings.BTsets.DisableUPNP
bt.config.NoDHT = settings.BTsets.DisableDHT bt.config.NoDHT = settings.BTsets.DisableDHT
bt.config.NoUpload = settings.BTsets.DisableUpload bt.config.NoUpload = settings.BTsets.DisableUpload
// bt.config.EncryptionPolicy = torrent.EncryptionPolicy{
// DisableEncryption: settings.BTsets.Encryption == 1,
// ForceEncryption: settings.BTsets.Encryption == 2,
// }
bt.config.IPBlocklist = blocklist bt.config.IPBlocklist = blocklist
bt.config.DefaultStorage = bt.storage bt.config.DefaultStorage = bt.storage
bt.config.Bep20 = peerID bt.config.Bep20 = peerID
@@ -85,7 +82,20 @@ func (bt *BTServer) configure() {
bt.config.ExtendedHandshakeClientVersion = cliVers bt.config.ExtendedHandshakeClientVersion = cliVers
bt.config.EstablishedConnsPerTorrent = settings.BTsets.ConnectionsLimit bt.config.EstablishedConnsPerTorrent = settings.BTsets.ConnectionsLimit
bt.config.DefaultRequestStrategy = torrent.RequestStrategyFastest() // Encryption/Obfuscation
bt.config.HeaderObfuscationPolicy = torrent.HeaderObfuscationPolicy{
RequirePreferred: false,
Preferred: true,
}
switch settings.BTsets.Strategy {
case 1: // RequestStrategyFuzzing
bt.config.DefaultRequestStrategy = torrent.RequestStrategyFuzzing()
case 2: // RequestStrategyFastest
bt.config.DefaultRequestStrategy = torrent.RequestStrategyFastest()
default: // RequestStrategyDuplicateRequestTimeout
bt.config.DefaultRequestStrategy = torrent.RequestStrategyDuplicateRequestTimeout(1 * time.Second)
}
if settings.BTsets.DhtConnectionLimit > 0 { if settings.BTsets.DhtConnectionLimit > 0 {
bt.config.ConnTracker.SetMaxEntries(settings.BTsets.DhtConnectionLimit) bt.config.ConnTracker.SetMaxEntries(settings.BTsets.DhtConnectionLimit)

View File

@@ -27,7 +27,8 @@ type Cache struct {
pieces map[int]*Piece pieces map[int]*Piece
bufferPull *BufferPool bufferPull *BufferPool
readers map[*Reader]struct{} readers map[*Reader]struct{}
muReaders sync.Mutex
isRemove bool isRemove bool
muRemove sync.Mutex muRemove sync.Mutex
@@ -80,7 +81,14 @@ func (c *Cache) Close() error {
} }
c.pieces = nil c.pieces = nil
c.bufferPull = nil c.bufferPull = nil
c.muReaders.Lock()
for reader, _ := range c.readers {
reader.Close()
}
c.readers = nil c.readers = nil
c.muReaders.Unlock()
utils.FreeOSMemGC() utils.FreeOSMemGC()
return nil return nil
} }
@@ -94,9 +102,11 @@ func (c *Cache) AdjustRA(readahead int64) {
if settings.BTsets.CacheSize == 0 { if settings.BTsets.CacheSize == 0 {
c.capacity = readahead * 3 c.capacity = readahead * 3
} }
c.muReaders.Lock()
for r, _ := range c.readers { for r, _ := range c.readers {
r.SetReadahead(readahead) r.SetReadahead(readahead)
} }
c.muReaders.Unlock()
} }
func (c *Cache) GetState() *state.CacheState { func (c *Cache) GetState() *state.CacheState {
@@ -121,6 +131,7 @@ func (c *Cache) GetState() *state.CacheState {
} }
} }
c.muReaders.Lock()
for r, _ := range c.readers { for r, _ := range c.readers {
start, end := r.getUsedPieces() start, end := r.getUsedPieces()
if p, ok := c.pieces[start]; ok { if p, ok := c.pieces[start]; ok {
@@ -159,6 +170,7 @@ func (c *Cache) GetState() *state.CacheState {
} }
} }
} }
c.muReaders.Unlock()
c.filled = fill c.filled = fill
cState.Filled = c.filled cState.Filled = c.filled
@@ -199,12 +211,14 @@ func (c *Cache) getRemPieces() []*Piece {
for id, p := range c.pieces { for id, p := range c.pieces {
if p.Size > 0 { if p.Size > 0 {
fill += p.Size fill += p.Size
c.muReaders.Lock()
for r, _ := range c.readers { for r, _ := range c.readers {
start, end := r.getUsedPieces() start, end := r.getUsedPieces()
if id < start || id > end { if id < start || id > end {
piecesRemove = append(piecesRemove, p) piecesRemove = append(piecesRemove, p)
} }
} }
c.muReaders.Unlock()
} }
} }

View File

@@ -28,9 +28,12 @@ func NewReader(file *torrent.File, cache *Cache) *Reader {
r.file = file r.file = file
r.Reader = file.NewReader() r.Reader = file.NewReader()
r.SetReadAHead(0) r.SetReadahead(0)
r.cache = cache r.cache = cache
r.cache.readers[r] = struct{}{}
cache.muReaders.Lock()
cache.readers[r] = struct{}{}
cache.muReaders.Unlock()
return r return r
} }
@@ -65,7 +68,7 @@ func (r *Reader) Read(p []byte) (n int, err error) {
return return
} }
func (r *Reader) SetReadAHead(length int64) { func (r *Reader) SetReadahead(length int64) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.Reader.SetReadahead(length) r.Reader.SetReadahead(length)
@@ -84,7 +87,11 @@ func (r *Reader) Close() error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.isClosed = true r.isClosed = true
r.cache.muReaders.Lock()
delete(r.cache.readers, r) delete(r.cache.readers, r)
r.cache.muReaders.Unlock()
return r.Reader.Close() return r.Reader.Close()
} }
@@ -93,6 +100,11 @@ func (c *Cache) NewReader(file *torrent.File) *Reader {
} }
func (c *Cache) Readers() int { func (c *Cache) Readers() int {
if c == nil {
return 0
}
c.muReaders.Lock()
defer c.muReaders.Unlock()
if c == nil || c.readers == nil { if c == nil || c.readers == nil {
return 0 return 0
} }

View File

@@ -3,6 +3,7 @@ package torrstor
import ( import (
"io" "io"
"github.com/dustin/go-humanize"
"server/log" "server/log"
) )
@@ -15,8 +16,7 @@ func (r *Reader) getUsedPieces() (int, int) {
} }
func (r *Reader) preload() { func (r *Reader) preload() {
r.currOffsetPreload = r.offset r.endOffsetPreload = r.offset + r.cache.capacity - 1024
r.endOffsetPreload = r.offset + r.cache.capacity
if r.endOffsetPreload > r.file.Length() { if r.endOffsetPreload > r.file.Length() {
r.endOffsetPreload = r.file.Length() r.endOffsetPreload = r.file.Length()
@@ -27,7 +27,7 @@ func (r *Reader) preload() {
} }
r.isPreload = true r.isPreload = true
log.TLogln("Start buffering from", humanize.IBytes(uint64(r.currOffsetPreload)))
go func() { go func() {
buffReader := r.file.NewReader() buffReader := r.file.NewReader()
defer func() { defer func() {
@@ -35,6 +35,7 @@ func (r *Reader) preload() {
buffReader.Close() buffReader.Close()
}() }()
buffReader.SetReadahead(0) buffReader.SetReadahead(0)
r.currOffsetPreload = r.offset + r.readahead
buffReader.Seek(r.currOffsetPreload, io.SeekStart) buffReader.Seek(r.currOffsetPreload, io.SeekStart)
buff := make([]byte, 1024) buff := make([]byte, 1024)
for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed { for r.currOffsetPreload < r.endOffsetPreload && !r.isClosed {
@@ -44,7 +45,14 @@ func (r *Reader) preload() {
return return
} }
r.currOffsetPreload += int64(off) r.currOffsetPreload += int64(off)
r.endOffsetPreload = r.offset + r.cache.capacity - 1024
if r.currOffsetPreload < r.offset {
r.currOffsetPreload = r.offset + r.readahead
buffReader.Seek(r.currOffsetPreload, io.SeekStart)
}
//log.TLogln(humanize.IBytes(uint64(r.offset)), humanize.IBytes(uint64(r.currOffsetPreload)), humanize.IBytes(uint64(r.endOffsetPreload)))
} }
log.TLogln("End buffering")
}() }()
} }

View File

@@ -2,12 +2,10 @@ package torr
import ( import (
"errors" "errors"
"fmt"
"log" "log"
"net/http" "net/http"
"time" "time"
"github.com/anacrolix/missinggo/httptoo"
sets "server/settings" sets "server/settings"
) )
@@ -25,10 +23,10 @@ func (t *Torrent) Stream(fileIndex int, req *http.Request, resp http.ResponseWri
sets.SetViewed(&sets.Viewed{t.Hash().HexString(), fileIndex}) sets.SetViewed(&sets.Viewed{t.Hash().HexString(), fileIndex})
//TODO проверить почему плеер постоянно переподключается //TODO проверить почему плеер постоянно переподключается
resp.Header().Set("Connection", "keep-alive") resp.Header().Set("Connection", "close")
resp.Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", t.Hash().HexString(), file.Path()))) //resp.Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", t.Hash().HexString(), file.Path())))
http.ServeContent(resp, req, file.Path(), time.Time{}, reader) http.ServeContent(resp, req, file.Path(), time.Unix(t.Timestamp, 0), reader)
t.CloseReader(reader) t.CloseReader(reader)
log.Println("Disconnect client") log.Println("Disconnect client")

View File

@@ -166,6 +166,8 @@ func (t *Torrent) progressEvent() {
} }
func (t *Torrent) updateRA() { func (t *Torrent) updateRA() {
t.muTorrent.Lock()
defer t.muTorrent.Unlock()
if t.Torrent != nil && t.Torrent.Info() != nil { if t.Torrent != nil && t.Torrent.Info() != nil {
pieceLen := t.Torrent.Info().PieceLength pieceLen := t.Torrent.Info().PieceLength
adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.Readers()) adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.Readers())
@@ -175,7 +177,7 @@ func (t *Torrent) updateRA() {
case adj > pieceLen*4: case adj > pieceLen*4:
adj = pieceLen * 4 adj = pieceLen * 4
} }
t.cache.AdjustRA(adj) go t.cache.AdjustRA(adj)
} }
} }

View File

@@ -1,10 +0,0 @@
package pages
import (
"github.com/gin-gonic/gin"
"server/web/pages/template"
)
func mainPage(c *gin.Context) {
c.Data(200, "text/html; charset=utf-8", template.IndexHtml)
}

View File

@@ -2,8 +2,20 @@ package pages
import ( import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"server/torr"
"server/web/pages/template"
) )
func SetupRoute(route *gin.Engine) { func SetupRoute(route *gin.Engine) {
route.GET("/", mainPage) route.GET("/", mainPage)
route.GET("/stat", statPage)
}
func mainPage(c *gin.Context) {
c.Data(200, "text/html; charset=utf-8", template.IndexHtml)
}
func statPage(c *gin.Context) {
torr.WriteStatus(c.Writer)
c.Status(200)
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long