ledger.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. // Copyright © 2021 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package blockchain
  16. import (
  17. "bytes"
  18. "compress/gzip"
  19. "context"
  20. "encoding/json"
  21. "io"
  22. "io/ioutil"
  23. "log"
  24. "sync"
  25. "time"
  26. "github.com/mudler/edgevpn/pkg/hub"
  27. "github.com/pkg/errors"
  28. )
  29. type Ledger struct {
  30. sync.Mutex
  31. blockchain Store
  32. channel io.Writer
  33. onDisk bool
  34. wantedBlock *Block
  35. }
  36. type Store interface {
  37. Add(Block)
  38. Len() int
  39. Last() Block
  40. }
  41. // New returns a new ledger which writes to the writer
  42. func New(w io.Writer, s Store) *Ledger {
  43. c := &Ledger{channel: w, blockchain: s}
  44. c.newGenesis()
  45. return c
  46. }
  47. func (l *Ledger) newGenesis() {
  48. t := time.Now()
  49. genesisBlock := Block{}
  50. genesisBlock = Block{0, t.String(), map[string]map[string]Data{}, genesisBlock.Checksum(), ""}
  51. l.blockchain.Add(genesisBlock)
  52. }
  53. // Syncronizer starts a goroutine which
  54. // writes the blockchain to the periodically
  55. func (l *Ledger) Syncronizer(ctx context.Context, t time.Duration) {
  56. go func() {
  57. t := time.NewTicker(t)
  58. defer t.Stop()
  59. for {
  60. select {
  61. case <-t.C:
  62. l.Lock()
  63. b := l.blockchain.Last()
  64. if l.wantedBlock != nil && l.wantedBlock.Index > b.Index {
  65. b = *l.wantedBlock
  66. }
  67. bytes, err := json.Marshal(b)
  68. if err != nil {
  69. log.Println(err)
  70. }
  71. l.channel.Write(compress(bytes).Bytes())
  72. l.Unlock()
  73. case <-ctx.Done():
  74. return
  75. }
  76. }
  77. }()
  78. }
  79. func compress(b []byte) *bytes.Buffer {
  80. var buf bytes.Buffer
  81. gz := gzip.NewWriter(&buf)
  82. gz.Write(b)
  83. gz.Close()
  84. return &buf
  85. }
  86. func deCompress(b []byte) (*bytes.Buffer, error) {
  87. r, err := gzip.NewReader(bytes.NewReader(b))
  88. if err != nil {
  89. return nil, err
  90. }
  91. result, err := ioutil.ReadAll(r)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return bytes.NewBuffer(result), nil
  96. }
  97. // Update the blockchain from a message
  98. func (l *Ledger) Update(h *hub.Message) (err error) {
  99. //chain := make(Blockchain, 0)
  100. block := &Block{}
  101. b, err := deCompress([]byte(h.Message))
  102. if err != nil {
  103. err = errors.Wrap(err, "failed decompressing")
  104. return
  105. }
  106. err = json.Unmarshal(b.Bytes(), block)
  107. if err != nil {
  108. err = errors.Wrap(err, "failed unmarshalling blockchain data")
  109. return
  110. }
  111. l.Lock()
  112. if block.Index > l.blockchain.Len() {
  113. l.blockchain.Add(*block)
  114. }
  115. l.Unlock()
  116. return
  117. }
  118. // Announce keeps updating async data to the blockchain.
  119. // Sends a broadcast at the specified interval
  120. // by making sure the async retrieved value is written to the
  121. // blockchain
  122. func (l *Ledger) Announce(ctx context.Context, t time.Duration, async func()) {
  123. go func() {
  124. t := time.NewTicker(t)
  125. defer t.Stop()
  126. for {
  127. select {
  128. case <-t.C:
  129. async()
  130. case <-ctx.Done():
  131. return
  132. }
  133. }
  134. }()
  135. }
  136. // AnnounceDeleteBucket Announce a deletion of a bucket. It stops when the bucket is deleted
  137. // It takes an interval time and a max timeout.
  138. // It is best effort, and the timeout is necessary, or we might flood network with requests
  139. // if more writers are attempting to write to the same resource
  140. func (l *Ledger) AnnounceDeleteBucket(ctx context.Context, interval, timeout time.Duration, bucket string) {
  141. del, cancel := context.WithTimeout(ctx, timeout)
  142. l.Announce(del, interval, func() {
  143. _, exists := l.CurrentData()[bucket]
  144. if exists {
  145. l.DeleteBucket(bucket)
  146. } else {
  147. cancel()
  148. }
  149. })
  150. }
  151. // AnnounceDeleteBucketKey Announce a deletion of a key from a bucket. It stops when the key is deleted
  152. func (l *Ledger) AnnounceDeleteBucketKey(ctx context.Context, interval, timeout time.Duration, bucket, key string) {
  153. del, cancel := context.WithTimeout(ctx, timeout)
  154. l.Announce(del, interval, func() {
  155. _, exists := l.CurrentData()[bucket][key]
  156. if exists {
  157. l.Delete(bucket, key)
  158. } else {
  159. cancel()
  160. }
  161. })
  162. }
  163. // AnnounceUpdate Keeps announcing something into the blockchain if state is differing
  164. func (l *Ledger) AnnounceUpdate(ctx context.Context, interval time.Duration, bucket, key string, value interface{}) {
  165. l.Announce(ctx, interval, func() {
  166. v, exists := l.CurrentData()[bucket][key]
  167. realv, _ := json.Marshal(value)
  168. switch {
  169. case !exists || string(v) != string(realv):
  170. l.Add(bucket, map[string]interface{}{key: value})
  171. }
  172. })
  173. }
  174. // Persist Keeps announcing something into the blockchain until it is reconciled
  175. func (l *Ledger) Persist(ctx context.Context, interval, timeout time.Duration, bucket, key string, value interface{}) {
  176. put, cancel := context.WithTimeout(ctx, timeout)
  177. l.Announce(put, interval, func() {
  178. v, exists := l.CurrentData()[bucket][key]
  179. realv, _ := json.Marshal(value)
  180. switch {
  181. case !exists || string(v) != string(realv):
  182. l.Add(bucket, map[string]interface{}{key: value})
  183. case exists && string(v) == string(realv):
  184. cancel()
  185. }
  186. })
  187. }
  188. // GetKey retrieve the current key from the blockchain
  189. func (l *Ledger) GetKey(b, s string) (value Data, exists bool) {
  190. l.Lock()
  191. defer l.Unlock()
  192. if l.blockchain.Len() > 0 {
  193. last := l.blockchain.Last()
  194. if _, exists = last.Storage[b]; !exists {
  195. return
  196. }
  197. value, exists = last.Storage[b][s]
  198. if exists {
  199. return
  200. }
  201. }
  202. return
  203. }
  204. // Exists returns true if there is one element with a matching value
  205. func (l *Ledger) Exists(b string, f func(Data) bool) (exists bool) {
  206. l.Lock()
  207. defer l.Unlock()
  208. if l.blockchain.Len() > 0 {
  209. for _, bv := range l.blockchain.Last().Storage[b] {
  210. if f(bv) {
  211. exists = true
  212. return
  213. }
  214. }
  215. }
  216. return
  217. }
  218. // CurrentData returns the current ledger data (locking)
  219. func (l *Ledger) CurrentData() map[string]map[string]Data {
  220. l.Lock()
  221. defer l.Unlock()
  222. return buckets(l.blockchain.Last().Storage).copy()
  223. }
  224. // LastBlock returns the last block in the blockchain
  225. func (l *Ledger) LastBlock() Block {
  226. l.Lock()
  227. defer l.Unlock()
  228. return l.blockchain.Last()
  229. }
  230. type bucket map[string]Data
  231. func (b bucket) copy() map[string]Data {
  232. copy := map[string]Data{}
  233. for k, v := range b {
  234. copy[k] = v
  235. }
  236. return copy
  237. }
  238. type buckets map[string]map[string]Data
  239. func (b buckets) copy() map[string]map[string]Data {
  240. copy := map[string]map[string]Data{}
  241. for k, v := range b {
  242. copy[k] = bucket(v).copy()
  243. }
  244. return copy
  245. }
  246. // Add data to the blockchain
  247. func (l *Ledger) Add(b string, s map[string]interface{}) {
  248. l.Lock()
  249. current := buckets(l.blockchain.Last().Storage).copy()
  250. for s, k := range s {
  251. if _, exists := current[b]; !exists {
  252. current[b] = make(map[string]Data)
  253. }
  254. dat, _ := json.Marshal(k)
  255. current[b][s] = Data(string(dat))
  256. }
  257. l.Unlock()
  258. l.writeData(current)
  259. }
  260. // Delete data from the ledger (locking)
  261. func (l *Ledger) Delete(b string, k string) {
  262. l.Lock()
  263. new := make(map[string]map[string]Data)
  264. for bb, kk := range l.blockchain.Last().Storage {
  265. if _, exists := new[bb]; !exists {
  266. new[bb] = make(map[string]Data)
  267. }
  268. // Copy all keys/v except b/k
  269. for kkk, v := range kk {
  270. if !(bb == b && kkk == k) {
  271. new[bb][kkk] = v
  272. }
  273. }
  274. }
  275. l.Unlock()
  276. l.writeData(new)
  277. }
  278. // DeleteBucket deletes a bucket from the ledger (locking)
  279. func (l *Ledger) DeleteBucket(b string) {
  280. l.Lock()
  281. new := make(map[string]map[string]Data)
  282. for bb, kk := range l.blockchain.Last().Storage {
  283. // Copy all except the specified bucket
  284. if bb == b {
  285. continue
  286. }
  287. if _, exists := new[bb]; !exists {
  288. new[bb] = make(map[string]Data)
  289. }
  290. for kkk, v := range kk {
  291. new[bb][kkk] = v
  292. }
  293. }
  294. l.Unlock()
  295. l.writeData(new)
  296. }
  297. // String returns the blockchain as string
  298. func (l *Ledger) String() string {
  299. bytes, _ := json.MarshalIndent(l.blockchain, "", " ")
  300. return string(bytes)
  301. }
  302. // Index returns last known blockchain index
  303. func (l *Ledger) Index() int {
  304. return l.blockchain.Len()
  305. }
  306. func (l *Ledger) writeData(s map[string]map[string]Data) {
  307. newBlock := l.blockchain.Last().NewBlock(s)
  308. if newBlock.IsValid(l.blockchain.Last()) {
  309. l.Lock()
  310. l.wantedBlock = &newBlock
  311. //l.blockchain.Add(newBlock)
  312. l.Unlock()
  313. }
  314. bytes, err := json.Marshal(l.blockchain.Last())
  315. if err != nil {
  316. log.Println(err)
  317. }
  318. l.channel.Write(compress(bytes).Bytes())
  319. }