room.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package hub
  16. import (
  17. "context"
  18. "encoding/json"
  19. "github.com/libp2p/go-libp2p/core/peer"
  20. pubsub "github.com/libp2p/go-libp2p-pubsub"
  21. )
  22. // Room represents a subscription to a single PubSub topic. Messages
  23. // can be published to the topic with Room.Publish, and received
  24. // messages are pushed to the Messages channel.
  25. type room struct {
  26. ctx context.Context
  27. ps *pubsub.PubSub
  28. Topic *pubsub.Topic
  29. sub *pubsub.Subscription
  30. roomName string
  31. self peer.ID
  32. }
  33. // connect tries to subscribe to the PubSub topic for the room name, returning
  34. // a Room on success.
  35. func connect(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, roomName string, messageChan chan *Message) (*room, error) {
  36. // join the pubsub topic
  37. topic, err := ps.Join(roomName)
  38. if err != nil {
  39. return nil, err
  40. }
  41. // and subscribe to it
  42. sub, err := topic.Subscribe()
  43. if err != nil {
  44. return nil, err
  45. }
  46. cr := &room{
  47. ctx: ctx,
  48. ps: ps,
  49. Topic: topic,
  50. sub: sub,
  51. self: selfID,
  52. roomName: roomName,
  53. }
  54. // start reading messages from the subscription in a loop
  55. go cr.readLoop(messageChan)
  56. return cr, nil
  57. }
  58. // publishMessage sends a message to the pubsub topic.
  59. func (cr *room) publishMessage(m *Message) error {
  60. msgBytes, err := json.Marshal(m)
  61. if err != nil {
  62. return err
  63. }
  64. return cr.Topic.Publish(cr.ctx, msgBytes)
  65. }
  66. // readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
  67. func (cr *room) readLoop(messageChan chan *Message) {
  68. for {
  69. msg, err := cr.sub.Next(cr.ctx)
  70. if err != nil {
  71. return
  72. }
  73. // only forward messages delivered by others
  74. if msg.ReceivedFrom == cr.self {
  75. continue
  76. }
  77. cm := new(Message)
  78. err = json.Unmarshal(msg.Data, cm)
  79. if err != nil {
  80. continue
  81. }
  82. cm.SenderID = msg.ReceivedFrom.String()
  83. // send valid messages onto the Messages channel
  84. messageChan <- cm
  85. }
  86. }