change cache

This commit is contained in:
yourok
2019-09-23 09:25:32 +03:00
parent 8933a860a0
commit 2b286c15c1
14 changed files with 194 additions and 678 deletions

View File

@@ -7,21 +7,21 @@ import (
"sync" "sync"
"server/settings" "server/settings"
"server/torr/storage" "server/torr/storage/memcache"
"server/torr/storage/memcacheV2"
"server/torr/storage/state" "server/torr/storage/state"
"server/utils" "server/utils"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"log"
) )
type BTServer struct { type BTServer struct {
config *torrent.ClientConfig config *torrent.ClientConfig
client *torrent.Client client *torrent.Client
storage storage.Storage storage *memcache.Storage
torrents map[metainfo.Hash]*Torrent torrents map[metainfo.Hash]*Torrent
@@ -53,7 +53,7 @@ func (bt *BTServer) Disconnect() {
if bt.client != nil { if bt.client != nil {
bt.client.Close() bt.client.Close()
bt.client = nil bt.client = nil
utils.FreeOSMemGC() utils.FreeOSMemGC(0)
} }
} }
@@ -63,12 +63,13 @@ func (bt *BTServer) Reconnect() error {
} }
func (bt *BTServer) configure() { func (bt *BTServer) configure() {
bt.storage = memcacheV2.NewStorage(settings.Get().CacheSize) bt.storage = memcache.NewStorage(settings.Get().CacheSize)
blocklist, _ := iplist.MMapPackedFile(filepath.Join(settings.Path, "blocklist")) blocklist, _ := iplist.MMapPackedFile(filepath.Join(settings.Path, "blocklist"))
userAgent := "uTorrent/3.4.9" userAgent := "uTorrent/3.5.5"
peerID := "-UT3490-" peerID := "-UT3550-"
cliVers := "µTorrent 3.5.5"
bt.config = torrent.NewDefaultClientConfig() bt.config = torrent.NewDefaultClientConfig()
@@ -87,14 +88,12 @@ func (bt *BTServer) configure() {
bt.config.Bep20 = peerID bt.config.Bep20 = peerID
bt.config.PeerID = utils.PeerIDRandom(peerID) bt.config.PeerID = utils.PeerIDRandom(peerID)
bt.config.HTTPUserAgent = userAgent bt.config.HTTPUserAgent = userAgent
bt.config.ExtendedHandshakeClientVersion = cliVers
bt.config.EstablishedConnsPerTorrent = settings.Get().ConnectionsLimit bt.config.EstablishedConnsPerTorrent = settings.Get().ConnectionsLimit
if settings.Get().DhtConnectionLimit > 0 { if settings.Get().DhtConnectionLimit > 0 {
bt.config.ConnTracker.SetMaxEntries(settings.Get().DhtConnectionLimit) bt.config.ConnTracker.SetMaxEntries(settings.Get().DhtConnectionLimit)
} }
bt.config.TorrentPeersHighWater = 3000
bt.config.HalfOpenConnsPerTorrent = 50
if settings.Get().DownloadRateLimit > 0 { if settings.Get().DownloadRateLimit > 0 {
bt.config.DownloadRateLimiter = utils.Limit(settings.Get().DownloadRateLimit * 1024) bt.config.DownloadRateLimiter = utils.Limit(settings.Get().DownloadRateLimit * 1024)
} }
@@ -105,9 +104,7 @@ func (bt *BTServer) configure() {
bt.config.ListenPort = settings.Get().PeersListenPort bt.config.ListenPort = settings.Get().PeersListenPort
} }
//bt.config.Debug = true log.Println("Configure client:", settings.Get())
fmt.Println("Configure client:", settings.Get())
} }
func (bt *BTServer) AddTorrent(magnet metainfo.Magnet, onAdd func(*Torrent)) (*Torrent, error) { func (bt *BTServer) AddTorrent(magnet metainfo.Magnet, onAdd func(*Torrent)) (*Torrent, error) {
@@ -183,16 +180,6 @@ func (bt *BTServer) CacheState(hash metainfo.Hash) *state.CacheState {
return cacheState return cacheState
} }
func (bt *BTServer) GetCache(hash metainfo.Hash) *memcacheV2.Cache {
st := bt.GetTorrent(hash)
if st == nil {
return nil
}
cacheState := bt.storage.GetCache(hash)
return cacheState.(*memcacheV2.Cache)
}
func (bt *BTServer) WriteState(w io.Writer) { func (bt *BTServer) WriteState(w io.Writer) {
bt.client.WriteStatus(w) bt.client.WriteStatus(w)
} }

View File

@@ -11,20 +11,20 @@ import (
"github.com/anacrolix/missinggo/httptoo" "github.com/anacrolix/missinggo/httptoo"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/labstack/echo" "github.com/labstack/echo"
"log"
) )
func (bt *BTServer) View(torr *Torrent, file *torrent.File, c echo.Context) error { func (bt *BTServer) View(torr *Torrent, file *torrent.File, c echo.Context) error {
go settings.SetViewed(torr.Hash().HexString(), file.Path()) go settings.SetViewed(torr.Hash().HexString(), file.Path())
reader := NewReader(file, bt.GetCache(torr.hash)) reader := torr.NewReader(file, 0)
//reader := torr.NewReader(file, 0)
fmt.Println("Connect reader:", len(torr.readers)) log.Println("Connect client")
c.Response().Header().Set("Connection", "close") c.Response().Header().Set("Connection", "close")
c.Response().Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", torr.Hash().HexString(), file.Path()))) c.Response().Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", torr.Hash().HexString(), file.Path())))
http.ServeContent(c.Response(), c.Request(), file.Path(), time.Time{}, reader) http.ServeContent(c.Response(), c.Request(), file.Path(), time.Time{}, reader)
fmt.Println("Disconnect reader:", len(torr.readers)) log.Println("Disconnect client")
torr.CloseReader(reader) torr.CloseReader(reader)
return c.NoContent(http.StatusOK) return c.NoContent(http.StatusOK)
} }

View File

@@ -1,8 +1,8 @@
package torr package torr
import ( import (
"fmt"
"io" "io"
"log"
"sort" "sort"
"sync" "sync"
"time" "time"
@@ -13,6 +13,8 @@ import (
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/labstack/gommon/bytes" "github.com/labstack/gommon/bytes"
"server/torr/reader"
"server/torr/storage/memcache"
) )
type TorrentStatus int type TorrentStatus int
@@ -47,12 +49,11 @@ type Torrent struct {
status TorrentStatus status TorrentStatus
readers map[torrent.Reader]struct{}
muTorrent sync.Mutex muTorrent sync.Mutex
muReader sync.Mutex muReader sync.Mutex
bt *BTServer bt *BTServer
cache *memcache.Cache
lastTimeSpeed time.Time lastTimeSpeed time.Time
DownloadSpeed float64 DownloadSpeed float64
@@ -102,7 +103,6 @@ func NewTorrent(magnet metainfo.Magnet, bt *BTServer) (*Torrent, error) {
torr.status = TorrentAdded torr.status = TorrentAdded
torr.lastTimeSpeed = time.Now() torr.lastTimeSpeed = time.Now()
torr.bt = bt torr.bt = bt
torr.readers = make(map[torrent.Reader]struct{})
torr.hash = magnet.InfoHash torr.hash = magnet.InfoHash
torr.closed = goTorrent.Closed() torr.closed = goTorrent.Closed()
@@ -116,11 +116,17 @@ func (t *Torrent) WaitInfo() bool {
if t.Torrent == nil { if t.Torrent == nil {
return false return false
} }
tm := time.NewTimer(time.Minute * 10)
select { select {
case <-t.Torrent.GotInfo(): case <-t.Torrent.GotInfo():
t.cache = t.bt.storage.GetCache(t.hash)
return true return true
case <-t.closed: case <-t.closed:
return false return false
case <-tm.C:
return false
} }
} }
@@ -181,7 +187,7 @@ func (t *Torrent) progressEvent() {
} }
func (t *Torrent) expired() bool { func (t *Torrent) expired() bool {
return len(t.readers) == 0 && t.expiredTime.Before(time.Now()) && (t.status == TorrentWorking || t.status == TorrentClosed) return t.cache.ReadersLen() == 0 && t.expiredTime.Before(time.Now()) && (t.status == TorrentWorking || t.status == TorrentClosed)
} }
func (t *Torrent) Files() []*torrent.File { func (t *Torrent) Files() []*torrent.File {
@@ -207,7 +213,7 @@ func (t *Torrent) Length() int64 {
return t.Torrent.Length() return t.Torrent.Length()
} }
func (t *Torrent) NewReader(file *torrent.File, readahead int64) torrent.Reader { func (t *Torrent) NewReader(file *torrent.File, readahead int64) *reader.Reader {
t.muReader.Lock() t.muReader.Lock()
if t.status == TorrentClosed { if t.status == TorrentClosed {
@@ -215,23 +221,27 @@ func (t *Torrent) NewReader(file *torrent.File, readahead int64) torrent.Reader
} }
defer t.muReader.Unlock() defer t.muReader.Unlock()
reader := file.NewReader() reader := reader.NewReader(file)
if readahead <= 0 { if readahead <= 0 {
readahead = utils.GetReadahead() readahead = utils.GetReadahead()
} }
reader.SetReadahead(readahead) reader.SetReadahead(readahead)
t.readers[reader] = struct{}{} t.cache.AddReader(reader)
return reader return reader
} }
func (t *Torrent) CloseReader(reader torrent.Reader) { func (t *Torrent) CloseReader(reader *reader.Reader) {
t.muReader.Lock() t.muReader.Lock()
reader.Close() reader.Close()
delete(t.readers, reader) t.cache.RemReader(reader)
t.expiredTime = time.Now().Add(time.Second * 5) t.expiredTime = time.Now().Add(time.Minute)
t.muReader.Unlock() t.muReader.Unlock()
} }
func (t *Torrent) GetCache() *memcache.Cache {
return t.cache
}
func (t *Torrent) Preload(file *torrent.File, size int64) { func (t *Torrent) Preload(file *torrent.File, size int64) {
if size < 0 { if size < 0 {
return return
@@ -276,7 +286,7 @@ func (t *Torrent) Preload(file *torrent.File, size int64) {
} }
defer func() { defer func() {
t.CloseReader(readerPre) t.CloseReader(readerPre)
t.expiredTime = time.Now().Add(time.Minute * 1) t.expiredTime = time.Now().Add(time.Minute * 5)
}() }()
if endPreloadOffset > 0 { if endPreloadOffset > 0 {
@@ -288,7 +298,7 @@ func (t *Torrent) Preload(file *torrent.File, size int64) {
readerPost.SetReadahead(buff5mb) readerPost.SetReadahead(buff5mb)
defer func() { defer func() {
t.CloseReader(readerPost) t.CloseReader(readerPost)
t.expiredTime = time.Now().Add(time.Minute * 1) t.expiredTime = time.Now().Add(time.Minute * 5)
}() }()
} }
@@ -300,9 +310,9 @@ func (t *Torrent) Preload(file *torrent.File, size int64) {
var lastSize int64 = 0 var lastSize int64 = 0
errCount := 0 errCount := 0
for t.status == TorrentPreload { for t.status == TorrentPreload {
t.expiredTime = time.Now().Add(time.Minute * 1) t.expiredTime = time.Now().Add(time.Minute * 5)
t.PreloadedBytes = t.Torrent.BytesCompleted() t.PreloadedBytes = t.Torrent.BytesCompleted()
fmt.Println("Preload:", file.Torrent().InfoHash().HexString(), bytes.Format(t.PreloadedBytes), "/", bytes.Format(t.PreloadSize), "Speed:", utils.Format(t.DownloadSpeed), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) log.Println("Preload:", file.Torrent().InfoHash().HexString(), bytes.Format(t.PreloadedBytes), "/", bytes.Format(t.PreloadSize), "Speed:", utils.Format(t.DownloadSpeed), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers)
if t.PreloadedBytes >= t.PreloadSize { if t.PreloadedBytes >= t.PreloadSize {
return return
} }
@@ -337,10 +347,6 @@ func (t *Torrent) Close() {
t.muReader.Lock() t.muReader.Lock()
defer t.muReader.Unlock() defer t.muReader.Unlock()
for r := range t.readers {
r.Close()
}
if _, ok := t.bt.torrents[t.hash]; ok { if _, ok := t.bt.torrents[t.hash]; ok {
delete(t.bt.torrents, t.hash) delete(t.bt.torrents, t.hash)
} }

View File

@@ -1,27 +1,21 @@
package torr package reader
import ( import (
"io"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"server/torr/storage/memcacheV2" "io"
) )
type Reader struct { type Reader struct {
torrent.Reader torrent.Reader
offset int64
offset int64 readahead int64
file *torrent.File
file *torrent.File
cache *memcacheV2.Cache
} }
func NewReader(file *torrent.File, cache *memcacheV2.Cache) *Reader { func NewReader(file *torrent.File) *Reader {
r := new(Reader) r := new(Reader)
r.file = file r.file = file
r.Reader = file.NewReader() r.Reader = file.NewReader()
r.Reader.SetReadahead(0)
r.cache = cache
return r return r
} }
@@ -36,13 +30,24 @@ func (r *Reader) Seek(offset int64, whence int) (n int64, err error) {
} }
n, err = r.Reader.Seek(offset, whence) n, err = r.Reader.Seek(offset, whence)
r.offset = n r.offset = n
r.cache.SetPos(int(n))
return return
} }
func (r *Reader) Read(p []byte) (n int, err error) { func (r *Reader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p) n, err = r.Reader.Read(p)
r.offset += int64(n) r.offset += int64(n)
r.cache.SetPos(int(n))
return return
} }
func (r *Reader) SetReadahead(length int64) {
r.Reader.SetReadahead(length)
r.readahead = length
}
func (r *Reader) Offset() int64 {
return r.offset
}
func (r *Reader) Readahead() int64 {
return r.readahead
}

View File

@@ -11,6 +11,5 @@ type Storage interface {
storage.ClientImpl storage.ClientImpl
GetStats(hash metainfo.Hash) *state.CacheState GetStats(hash metainfo.Hash) *state.CacheState
GetCache(hash metainfo.Hash) interface{}
CloseHash(hash metainfo.Hash) CloseHash(hash metainfo.Hash)
} }

View File

@@ -1,9 +1,9 @@
package memcache package memcache
import ( import (
"fmt"
"sync" "sync"
"log"
"server/utils" "server/utils"
) )
@@ -14,42 +14,62 @@ type buffer struct {
} }
type BufferPool struct { type BufferPool struct {
buffs map[int]*buffer buffs map[int]*buffer
bufferLength int64 frees int
bufferCount int size int64
mu sync.Mutex mu sync.Mutex
} }
func NewBufferPool(bufferLength int64) *BufferPool { func NewBufferPool(bufferLength int64, capacity int64) *BufferPool {
bp := new(BufferPool) bp := new(BufferPool)
bp.bufferLength = bufferLength buffsSize := int(capacity/bufferLength) + 3
bp.buffs = make(map[int]*buffer) bp.frees = buffsSize
bp.size = bufferLength
return bp return bp
} }
func (b *BufferPool) mkBuffs() {
if b.buffs != nil {
return
}
b.buffs = make(map[int]*buffer, b.frees)
log.Println("Create", b.frees, "buffers")
for i := 0; i < b.frees; i++ {
buf := buffer{
-1,
make([]byte, b.size, b.size),
false,
}
b.buffs[i] = &buf
}
}
func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
b.mkBuffs()
for id, buf := range b.buffs { for id, buf := range b.buffs {
if !buf.used { if !buf.used {
fmt.Println("Get buffer:", id)
buf.used = true buf.used = true
buf.pieceId = p.Id buf.pieceId = p.Id
buff = buf.buf buff = buf.buf
index = id index = id
b.frees--
//fmt.Printf("Get buffer: %v %v %v %p\n", id, p.Id, b.frees, buff)
return return
} }
} }
log.Println("Create slow buffer")
fmt.Println("Create buffer:", b.bufferCount) buf := buffer{
buf := new(buffer) p.Id,
buf.buf = make([]byte, b.bufferLength) make([]byte, b.size, b.size),
buf.used = true true,
buf.pieceId = p.Id }
b.buffs[b.bufferCount] = buf b.frees++
index = b.bufferCount b.buffs[b.frees] = &buf
buff = buf.buf buff = buf.buf
b.bufferCount++ index = b.frees
return return
} }
@@ -60,10 +80,11 @@ func (b *BufferPool) ReleaseBuffer(index int) {
} }
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
b.mkBuffs()
if buff, ok := b.buffs[index]; ok { if buff, ok := b.buffs[index]; ok {
fmt.Println("Release buffer:", index)
buff.used = false buff.used = false
buff.pieceId = -1 buff.pieceId = -1
b.frees++
} else { } else {
utils.FreeOSMem() utils.FreeOSMem()
} }
@@ -72,6 +93,9 @@ func (b *BufferPool) ReleaseBuffer(index int) {
func (b *BufferPool) Used() map[int]struct{} { func (b *BufferPool) Used() map[int]struct{} {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if len(b.buffs) == 0 {
b.mkBuffs()
}
used := make(map[int]struct{}) used := make(map[int]struct{})
for _, b := range b.buffs { for _, b := range b.buffs {
if b.used { if b.used {
@@ -82,13 +106,5 @@ func (b *BufferPool) Used() map[int]struct{} {
} }
func (b *BufferPool) Len() int { func (b *BufferPool) Len() int {
b.mu.Lock() return b.frees
defer b.mu.Unlock()
count := 0
for _, b := range b.buffs {
if b.used {
count++
}
}
return count
} }

View File

@@ -1,8 +1,6 @@
package memcache package memcache
import ( import (
"fmt"
"sort"
"sync" "sync"
"server/torr/storage/state" "server/torr/storage/state"
@@ -10,11 +8,15 @@ import (
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"log"
"server/torr/reader"
) )
type Cache struct { type Cache struct {
storage.TorrentImpl storage.TorrentImpl
s *Storage
capacity int64 capacity int64
filled int64 filled int64
hash metainfo.Hash hash metainfo.Hash
@@ -25,37 +27,39 @@ type Cache struct {
muPiece sync.Mutex muPiece sync.Mutex
muRemove sync.Mutex muRemove sync.Mutex
muReader sync.Mutex
isRemove bool isRemove bool
pieces map[int]*Piece pieces map[int]*Piece
bufferPull *BufferPool bufferPull *BufferPool
prcLoaded int readers map[*reader.Reader]struct{}
position int
} }
func NewCache(capacity int64) *Cache { func NewCache(capacity int64, storage *Storage) *Cache {
ret := &Cache{ ret := &Cache{
capacity: capacity, capacity: capacity,
filled: 0, filled: 0,
pieces: make(map[int]*Piece), pieces: make(map[int]*Piece),
readers: make(map[*reader.Reader]struct{}),
s: storage,
} }
return ret return ret
} }
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
fmt.Println("Create cache for:", info.Name) log.Println("Create cache for:", info.Name)
//Min capacity of 2 pieces length //Min capacity of 2 pieces length
cap := info.PieceLength * 2 caps := info.PieceLength * 2
if c.capacity < cap { if c.capacity < caps {
c.capacity = cap c.capacity = caps
} }
c.pieceLength = info.PieceLength c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces() c.pieceCount = info.NumPieces()
c.piecesBuff = int(c.capacity / c.pieceLength) c.piecesBuff = int(c.capacity / c.pieceLength)
c.hash = hash c.hash = hash
c.bufferPull = NewBufferPool(c.pieceLength) c.bufferPull = NewBufferPool(c.pieceLength, c.capacity)
for i := 0; i < c.pieceCount; i++ { for i := 0; i < c.pieceCount; i++ {
c.pieces[i] = &Piece{ c.pieces[i] = &Piece{
@@ -69,7 +73,10 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock() c.muPiece.Lock()
defer c.muPiece.Unlock() defer func() {
c.muPiece.Unlock()
go utils.FreeOSMemGC(c.capacity)
}()
if val, ok := c.pieces[m.Index()]; ok { if val, ok := c.pieces[m.Index()]; ok {
return val return val
} }
@@ -78,10 +85,14 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
func (c *Cache) Close() error { func (c *Cache) Close() error {
c.isRemove = false c.isRemove = false
fmt.Println("Close cache for:", c.hash) log.Println("Close cache for:", c.hash)
if _, ok := c.s.caches[c.hash]; ok {
delete(c.s.caches, c.hash)
}
c.pieces = nil c.pieces = nil
c.bufferPull = nil c.bufferPull = nil
utils.FreeOSMemGC() c.readers = nil
utils.FreeOSMemGC(0)
return nil return nil
} }
@@ -109,11 +120,6 @@ func (c *Cache) GetState() state.CacheState {
return cState return cState
} }
func (c *Cache) setPos(pos int) {
c.position = (c.position + pos) / 2
//fmt.Println("Read:", c.position)
}
func (c *Cache) cleanPieces() { func (c *Cache) cleanPieces() {
if c.isRemove { if c.isRemove {
return return
@@ -127,70 +133,74 @@ func (c *Cache) cleanPieces() {
defer func() { c.isRemove = false }() defer func() { c.isRemove = false }()
c.muRemove.Unlock() c.muRemove.Unlock()
remPieces := c.getRemPieces() bufPieces := c.getBufferedPieces()
if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) {
remCount := int((c.filled - c.capacity) / c.pieceLength)
if remCount < 1 {
remCount = 1
}
if remCount > len(remPieces) {
remCount = len(remPieces)
}
remPieces = remPieces[:remCount] if len(bufPieces) > 0 && c.filled >= c.capacity {
c.muReader.Lock()
for _, p := range remPieces { for reader := range c.readers {
c.removePiece(p) beg, end := c.getReaderPieces(reader)
for id := range bufPieces {
if id >= beg && id <= end {
delete(bufPieces, id)
}
}
}
c.muReader.Unlock()
if len(bufPieces) > 0 {
for _, p := range bufPieces {
p.Release()
}
bufPieces = nil
go utils.FreeOSMemGC(c.capacity)
} }
} }
} }
func (c *Cache) getRemPieces() []*Piece { func (c *Cache) getBufferedPieces() map[int]*Piece {
pieces := make([]*Piece, 0) pieces := make(map[int]*Piece)
fill := int64(0) fill := int64(0)
loading := 0
used := c.bufferPull.Used() used := c.bufferPull.Used()
fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength)
low := c.position - fpices + 1
high := c.position + c.piecesBuff - fpices + 3
for u := range used { for u := range used {
v := c.pieces[u] piece := c.pieces[u]
if v.Size > 0 { if piece.Size > 0 {
if v.Id > 0 && (v.Id < low || v.Id > high) { if piece.Id > 0 {
pieces = append(pieces, v) pieces[piece.Id] = piece
} //pieces = append(pieces, piece)
fill += v.Size
if !v.complete {
loading++
} }
fill += piece.Size
} }
} }
c.filled = fill c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed < pieces[j].accessed
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces return pieces
} }
func (c *Cache) removePiece(piece *Piece) { func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release() piece.Release()
return
}
//st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash) func (c *Cache) AddReader(r *reader.Reader) {
if c.prcLoaded >= 95 { c.muReader.Lock()
//fmt.Println("Clean memory GC:", st) defer c.muReader.Unlock()
utils.FreeOSMemGC() c.readers[r] = struct{}{}
} else { }
//fmt.Println("Clean memory:", st)
utils.FreeOSMem() func (c *Cache) RemReader(r *reader.Reader) {
c.muReader.Lock()
defer c.muReader.Unlock()
delete(c.readers, r)
}
func (c *Cache) ReadersLen() int {
if c == nil || c.readers == nil {
return 0
} }
return len(c.readers)
} }
func prc(val, of int) int { func (c *Cache) getReaderPieces(reader *reader.Reader) (begin, end int) {
return int(float64(val) * 100.0 / float64(of)) end = int((reader.Offset() + reader.Readahead()) / c.pieceLength)
begin = int((reader.Offset() - c.capacity + reader.Readahead()) / c.pieceLength)
return
} }

View File

@@ -42,13 +42,14 @@ func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
} }
n = copy(p.buffer[off:], b[:]) n = copy(p.buffer[off:], b[:])
p.Size += int64(n) p.Size += int64(n)
p.accessed = time.Now().Unix() + 2000 p.accessed = time.Now().Unix()
return return
} }
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
size := len(b) size := len(b)
if size+int(off) > len(p.buffer) { if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off) size = len(p.buffer) - int(off)
@@ -60,19 +61,13 @@ func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
return 0, io.ErrUnexpectedEOF return 0, io.ErrUnexpectedEOF
} }
n = copy(b, p.buffer[int(off) : int(off)+size][:]) n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.accessed = time.Now().Unix()
if int(off)+size >= len(p.buffer) { if int(off)+size >= len(p.buffer) {
p.readed = true p.readed = true
} }
if int64(len(b))+off >= p.Size { if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces() go p.cache.cleanPieces()
} }
if p.complete {
p.accessed = time.Now().Unix()
p.cache.setPos(p.Id)
}
return n, nil return n, nil
} }
@@ -86,13 +81,12 @@ func (p *Piece) MarkComplete() error {
func (p *Piece) MarkNotComplete() error { func (p *Piece) MarkNotComplete() error {
p.complete = false p.complete = false
p.accessed = 0
return nil return nil
} }
func (p *Piece) Completion() storage.Completion { func (p *Piece) Completion() storage.Completion {
return storage.Completion{ return storage.Completion{
Complete: p.complete, Complete: p.complete && len(p.buffer) > 0,
Ok: true, Ok: true,
} }
} }

View File

@@ -3,32 +3,31 @@ package memcache
import ( import (
"sync" "sync"
"server/torr/storage"
"server/torr/storage/state" "server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
storage2 "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
type Storage struct { type Storage struct {
storage.Storage storage.TorrentImpl
caches map[metainfo.Hash]*Cache caches map[metainfo.Hash]*Cache
capacity int64 capacity int64
mu sync.Mutex mu sync.Mutex
} }
func NewStorage(capacity int64) storage.Storage { func NewStorage(capacity int64) *Storage {
stor := new(Storage) stor := new(Storage)
stor.capacity = capacity stor.capacity = capacity
stor.caches = make(map[metainfo.Hash]*Cache) stor.caches = make(map[metainfo.Hash]*Cache)
return stor return stor
} }
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
ch := NewCache(s.capacity) ch := NewCache(s.capacity, s)
ch.Init(info, infoHash) ch.Init(info, infoHash)
s.caches[infoHash] = ch s.caches[infoHash] = ch
return ch, nil return ch, nil
@@ -64,3 +63,14 @@ func (s *Storage) Close() error {
} }
return nil return nil
} }
func (s *Storage) GetCache(hash metainfo.Hash) *Cache {
s.mu.Lock()
defer s.mu.Unlock()
for _, c := range s.caches {
if c.hash == hash {
return c
}
}
return nil
}

View File

@@ -1,94 +0,0 @@
package memcacheV2
import (
"fmt"
"sync"
"server/utils"
)
type buffer struct {
pieceId int
buf []byte
used bool
}
type BufferPool struct {
buffs map[int]*buffer
bufferLength int64
bufferCount int
mu sync.Mutex
}
func NewBufferPool(bufferLength int64) *BufferPool {
bp := new(BufferPool)
bp.bufferLength = bufferLength
bp.buffs = make(map[int]*buffer)
return bp
}
func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) {
b.mu.Lock()
defer b.mu.Unlock()
for id, buf := range b.buffs {
if !buf.used {
fmt.Println("Get buffer:", id)
buf.used = true
buf.pieceId = p.Id
buff = buf.buf
index = id
return
}
}
fmt.Println("Create buffer:", b.bufferCount)
buf := new(buffer)
buf.buf = make([]byte, b.bufferLength)
buf.used = true
buf.pieceId = p.Id
b.buffs[b.bufferCount] = buf
index = b.bufferCount
buff = buf.buf
b.bufferCount++
return
}
func (b *BufferPool) ReleaseBuffer(index int) {
if index == -1 {
utils.FreeOSMem()
return
}
b.mu.Lock()
defer b.mu.Unlock()
if buff, ok := b.buffs[index]; ok {
fmt.Println("Release buffer:", index)
buff.used = false
buff.pieceId = -1
} else {
utils.FreeOSMem()
}
}
func (b *BufferPool) Used() map[int]struct{} {
b.mu.Lock()
defer b.mu.Unlock()
used := make(map[int]struct{})
for _, b := range b.buffs {
if b.used {
used[b.pieceId] = struct{}{}
}
}
return used
}
func (b *BufferPool) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
count := 0
for _, b := range b.buffs {
if b.used {
count++
}
}
return count
}

View File

@@ -1,197 +0,0 @@
package memcacheV2
import (
"fmt"
"sort"
"sync"
"server/torr/storage/state"
"server/utils"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Cache struct {
storage.TorrentImpl
capacity int64
filled int64
hash metainfo.Hash
pieceLength int64
pieceCount int
piecesBuff int
muPiece sync.Mutex
muRemove sync.Mutex
isRemove bool
pieces map[int]*Piece
bufferPull *BufferPool
prcLoaded int
position int
}
func NewCache(capacity int64) *Cache {
ret := &Cache{
capacity: capacity,
filled: 0,
pieces: make(map[int]*Piece),
}
return ret
}
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
fmt.Println("Create cache for:", info.Name)
//Min capacity of 2 pieces length
cap := info.PieceLength * 2
if c.capacity < cap {
c.capacity = cap
}
c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces()
c.piecesBuff = int(c.capacity / c.pieceLength)
c.hash = hash
c.bufferPull = NewBufferPool(c.pieceLength)
for i := 0; i < c.pieceCount; i++ {
c.pieces[i] = &Piece{
Id: i,
Length: info.Piece(i).Length(),
Hash: info.Piece(i).Hash().HexString(),
cache: c,
}
}
}
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock()
defer c.muPiece.Unlock()
if val, ok := c.pieces[m.Index()]; ok {
return val
}
return nil
}
func (c *Cache) Close() error {
c.isRemove = false
fmt.Println("Close cache for:", c.hash)
c.pieces = nil
c.bufferPull = nil
utils.FreeOSMemGC()
return nil
}
func (c *Cache) GetState() state.CacheState {
cState := state.CacheState{}
cState.Capacity = c.capacity
cState.PiecesLength = c.pieceLength
cState.PiecesCount = c.pieceCount
cState.Hash = c.hash.HexString()
stats := make(map[int]state.ItemState, 0)
c.muPiece.Lock()
var fill int64 = 0
for _, value := range c.pieces {
stat := value.Stat()
if stat.BufferSize > 0 {
fill += stat.BufferSize
stats[stat.Id] = stat
}
}
c.filled = fill
c.muPiece.Unlock()
cState.Filled = c.filled
cState.Pieces = stats
return cState
}
func (c *Cache) SetPos(pos int) {
//c.position = (c.position + pos) / 2
c.position = pos
//fmt.Println("Read:", c.position)
}
func (c *Cache) cleanPieces() {
if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
remPieces := c.getRemPieces()
if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) {
remCount := int((c.filled - c.capacity) / c.pieceLength)
if remCount < 1 {
remCount = 1
}
if remCount > len(remPieces) {
remCount = len(remPieces)
}
remPieces = remPieces[:remCount]
for _, p := range remPieces {
c.removePiece(p)
}
}
}
func (c *Cache) getRemPieces() []*Piece {
pieces := make([]*Piece, 0)
fill := int64(0)
loading := 0
used := c.bufferPull.Used()
fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength)
low := c.position - fpices + 1
high := c.position + c.piecesBuff - fpices + 3
for u := range used {
v := c.pieces[u]
if v.Size > 0 {
if v.Id > 0 && (v.Id < low || v.Id > high) {
pieces = append(pieces, v)
}
fill += v.Size
if !v.complete {
loading++
}
}
}
c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed < pieces[j].accessed
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces
}
func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release()
//st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash)
if c.prcLoaded >= 95 {
//fmt.Println("Clean memory GC:", st)
utils.FreeOSMemGC()
} else {
//fmt.Println("Clean memory:", st)
utils.FreeOSMem()
}
}
func prc(val, of int) int {
return int(float64(val) * 100.0 / float64(of))
}

View File

@@ -1,118 +0,0 @@
package memcacheV2
import (
"errors"
"io"
"sync"
"time"
"server/torr/storage/state"
"github.com/anacrolix/torrent/storage"
)
type Piece struct {
storage.PieceImpl
Id int
Hash string
Length int64
Size int64
complete bool
accessed int64
buffer []byte
bufIndex int
mu sync.RWMutex
cache *Cache
}
func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer == nil {
go p.cache.cleanPieces()
p.buffer, p.bufIndex = p.cache.bufferPull.GetBuffer(p)
if p.buffer == nil {
return 0, errors.New("Can't get buffer write")
}
}
n = copy(p.buffer[off:], b[:])
p.Size += int64(n)
p.accessed = time.Now().Unix() + 2000
return
}
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.ErrUnexpectedEOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces()
time.Sleep(time.Millisecond * 2000)
}
if p.complete {
p.accessed = time.Now().Unix()
p.cache.SetPos(p.Id)
}
return n, nil
}
func (p *Piece) MarkComplete() error {
if len(p.buffer) == 0 {
return errors.New("piece is not complete")
}
p.complete = true
return nil
}
func (p *Piece) MarkNotComplete() error {
p.complete = false
p.accessed = 0
return nil
}
func (p *Piece) Completion() storage.Completion {
return storage.Completion{
Complete: p.complete,
Ok: true,
}
}
func (p *Piece) Release() {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer != nil {
p.buffer = nil
p.cache.bufferPull.ReleaseBuffer(p.bufIndex)
p.bufIndex = -1
}
p.Size = 0
p.complete = false
}
func (p *Piece) Stat() state.ItemState {
itm := state.ItemState{
Id: p.Id,
Hash: p.Hash,
Accessed: p.accessed,
Completed: p.complete,
BufferSize: p.Size,
}
return itm
}

View File

@@ -1,27 +0,0 @@
package memcacheV2
import "github.com/anacrolix/torrent"
type Reader struct {
torrent.Reader
pos int64
}
func NewReader(file torrent.File) *Reader {
r := new(Reader)
r.Reader = file.NewReader()
return r
}
func (r *Reader) Read(p []byte) (n int, err error) {
n, err = r.Read(p)
r.pos += int64(n)
return
}
func (r *Reader) Seek(offset int64, whence int) (ret int64, err error) {
ret, err = r.Reader.Seek(offset, whence)
r.pos = ret
return
}

View File

@@ -1,75 +0,0 @@
package memcacheV2
import (
"sync"
"server/torr/storage"
"server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo"
storage2 "github.com/anacrolix/torrent/storage"
)
type Storage struct {
storage.Storage
caches map[metainfo.Hash]*Cache
capacity int64
mu sync.Mutex
}
func NewStorage(capacity int64) storage.Storage {
stor := new(Storage)
stor.capacity = capacity
stor.caches = make(map[metainfo.Hash]*Cache)
return stor
}
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) {
s.mu.Lock()
defer s.mu.Unlock()
ch := NewCache(s.capacity)
ch.Init(info, infoHash)
s.caches[infoHash] = ch
return ch, nil
}
func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState {
s.mu.Lock()
defer s.mu.Unlock()
if c, ok := s.caches[hash]; ok {
st := c.GetState()
return &st
}
return nil
}
func (s *Storage) GetCache(hash metainfo.Hash) interface{} {
s.mu.Lock()
defer s.mu.Unlock()
if c, ok := s.caches[hash]; ok {
return c
}
return nil
}
func (s *Storage) CloseHash(hash metainfo.Hash) {
if s.caches == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if ch, ok := s.caches[hash]; ok {
ch.Close()
delete(s.caches, hash)
}
}
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.caches {
ch.Close()
}
return nil
}