From 99467f229c10b3725b280f71609eff38d6d3e6bf Mon Sep 17 00:00:00 2001 From: Ayush Mukherjee Date: Fri, 12 Mar 2021 17:53:50 +0530 Subject: [PATCH] implemented per-stream data views --- src/index.js | 23 +++++++++++++++---- src/models/casters.js | 2 +- src/models/events.js | 4 ++-- src/models/hosts.js | 2 +- src/models/matches.js | 2 +- src/models/rosters.js | 5 ---- src/models/streams.js | 14 ++++++++---- src/ws/crud.js | 24 ++++++++++---------- src/ws/handler.js | 9 +++++--- src/ws/streams.js | 53 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 105 insertions(+), 33 deletions(-) create mode 100644 src/ws/streams.js diff --git a/src/index.js b/src/index.js index 9b7e7d7..3c37c27 100644 --- a/src/index.js +++ b/src/index.js @@ -4,6 +4,7 @@ const url = require('url') const app = require('./app') const { get } = require('./uuids') const handler = require('./ws/handler') +const { subscribe } = require('./ws/streams') mongoose.connect(process.env.MONGO_URI, { useNewUrlParser: true, @@ -15,15 +16,24 @@ db.once('open', () => console.log('Connected to mongodb instance!')) const port = process.env.PORT || 5000; -wss = new WebSocket.Server({ +wss1 = new WebSocket.Server({ + noServer: true, +}) + +wss2 = new WebSocket.Server({ noServer: true, }) -wss.on('connection', (ws, req) => { +wss1.on('connection', (ws, req) => { const id = req.id handler(ws, id) }) +wss2.on('connection', (ws, req) => { + const id = req.id + subscribe(ws, id) +}) + const server = app.listen(port, () => { console.log(`Express listening: http://localhost:${port}`) }) @@ -32,9 +42,14 @@ server.on('upgrade', (req, socket, head) => { const pathname = url.parse(req.url).pathname.split('/') const token = pathname[2] if (pathname[1] === 'ws' && get(token)) { - wss.handleUpgrade(req, socket, head, (socket) => { + wss1.handleUpgrade(req, socket, head, (socket) => { + req.id = token + wss1.emit('connection', socket, req); + }) + } else if (pathname[1] === 'stream') { + wss2.handleUpgrade(req, socket, head, (socket) => { req.id = token - wss.emit('connection', socket, req); + wss2.emit('connection', socket, req); }) } }) \ No newline at end of file diff --git a/src/models/casters.js b/src/models/casters.js index 112fa9d..42e659d 100644 --- a/src/models/casters.js +++ b/src/models/casters.js @@ -16,7 +16,7 @@ const casters = new Schema({ image: { type: String, required: true, - }, + } }); const Caster = model('Casters', casters) diff --git a/src/models/events.js b/src/models/events.js index 6a3ae5c..a51457c 100644 --- a/src/models/events.js +++ b/src/models/events.js @@ -5,9 +5,9 @@ const events = new mongoose.Schema({ type: String, required: true, unique: true, - }, + } }); -const Event = mongoose.model('Events', events) +const Event = mongoose.model('Event', events) module.exports = Event \ No newline at end of file diff --git a/src/models/hosts.js b/src/models/hosts.js index 72a05a0..83e1d7e 100644 --- a/src/models/hosts.js +++ b/src/models/hosts.js @@ -16,7 +16,7 @@ const hosts = new Schema({ image: { type: String, required: true, - }, + } }); const Host = model('Hosts', hosts) diff --git a/src/models/matches.js b/src/models/matches.js index 0fcb5f1..fc4b978 100644 --- a/src/models/matches.js +++ b/src/models/matches.js @@ -51,7 +51,7 @@ const matches = new Schema({ games: { type: [gameScores], required: true, - }, + } }) const Match = model('Matches', matches) diff --git a/src/models/rosters.js b/src/models/rosters.js index 63c7210..5289397 100644 --- a/src/models/rosters.js +++ b/src/models/rosters.js @@ -39,11 +39,6 @@ const players = new Schema({ }) const rosters = new Schema({ - event: { - type: Schema.Types.ObjectId, - ref: 'Events', - required: true, - }, name: { type: String, required: true, diff --git a/src/models/streams.js b/src/models/streams.js index 6bbd74d..5f7b87d 100644 --- a/src/models/streams.js +++ b/src/models/streams.js @@ -7,13 +7,19 @@ const streams = new Schema({ }, event: { type: Schema.Types.ObjectId, - refs: 'Events', - requried: true, + ref: 'Event', }, matches: { type: [Schema.Types.ObjectId], - refs: 'matches', - requried: true, + ref: 'Matches', + }, + casters: { + type: [Schema.Types.ObjectId], + ref: 'Casters', + }, + hosts: { + type: [Schema.Types.ObjectId], + refs: 'Hosts', }, }) diff --git a/src/ws/crud.js b/src/ws/crud.js index d8becd2..5acd3c7 100644 --- a/src/ws/crud.js +++ b/src/ws/crud.js @@ -7,10 +7,10 @@ const hosts = require('../models/hosts') const eventFns = { getAll: async () => { - return await events.find().exec() + return await events.find().populate('streams').exec() }, getById: async (id) => { - return await events.findById(id).exec() + return await events.findById(id).populate('streams').exec() }, update: async(id, data) => { return await events.findByIdAndUpdate(id, data).exec() @@ -33,10 +33,10 @@ const eventFns = { const rosterFns = { getAll: async () => { - return await rosters.find().exec() + return await rosters.find().populate('event').populate('match').exec() }, getById: async (id) => { - return await rosters.findById(id).exec() + return await rosters.findById(id).populate('event').populate('match').exec() }, update: async(id, data) => { return await rosters.findByIdAndUpdate(id, data).exec() @@ -59,10 +59,10 @@ const rosterFns = { const matchFns = { getAll: async () => { - return await matches.find().exec() + return await matches.find().populate('orange').populate('blue').populate('stream').exec() }, getById: async (id) => { - return await matches.findById(id).exec() + return await matches.findById(id).populate('orange').populate('blue').populate('stream').exec() }, update: async(id, data) => { return await matches.findByIdAndUpdate(id, data).exec() @@ -85,10 +85,10 @@ const matchFns = { const streamFns = { getAll: async () => { - return await streams.find().exec() + return await streams.find().populate('event').populate('matches').populate('casters').populate('hosts').exec() }, getById: async (id) => { - return await streams.findById(id).exec() + return await streams.findById(id).populate('event').populate('matches').populate('casters').populate('hosts').exec() }, update: async(id, data) => { return await streams.findByIdAndUpdate(id, data).exec() @@ -111,10 +111,10 @@ const streamFns = { const casterFns = { getAll: async () => { - return await casters.find().exec() + return await casters.find().populate('streams').exec() }, getById: async (id) => { - return await casters.findById(id).exec() + return await casters.findById(id).populate('streams').exec() }, update: async(id, data) => { return await casters.findByIdAndUpdate(id, data).exec() @@ -137,10 +137,10 @@ const casterFns = { const hostFns = { getAll: async () => { - return await hosts.find().exec() + return await hosts.find().populate('streams').exec() }, getById: async (id) => { - return await hosts.findById(id).exec() + return await hosts.findById(id).populate('streams').exec() }, update: async(id, data) => { return await hosts.findByIdAndUpdate(id, data).exec() diff --git a/src/ws/handler.js b/src/ws/handler.js index 339507b..d741e05 100644 --- a/src/ws/handler.js +++ b/src/ws/handler.js @@ -12,7 +12,7 @@ const handler = (ws, id) => { ws.send(JSON.stringify({ event: 'info', - data: 'Welcome to APL Nuke v1.0.0!' + data: 'Welcome to APL Nuke v1.1.0!' })) ws.on('message', (msg) => handleMsg(msg, id)) @@ -36,11 +36,14 @@ const handleMsg = async (msg, id) => { console.log(`received event for ${channel} with data %s`, d.data) switch (event) { case 'create': - await crud[channel].create(d.data) + const da = await crud[channel].create(d.data) + connections[id].connection.send(JSON.stringify(da)) + fanoutMsg(channel, await crud[channel].getAll()) fanoutMsg(channel, await crud[channel].getAll()) break; case 'update': - await crud[channel].update(d.data.id, d.data.data) + const du = await crud[channel].update(d.data.id, d.data.data) + connections[id].connection.send(JSON.stringify(du)) fanoutMsg(channel, await crud[channel].getAll()) break; case 'delete': diff --git a/src/ws/streams.js b/src/ws/streams.js new file mode 100644 index 0000000..a14b7a8 --- /dev/null +++ b/src/ws/streams.js @@ -0,0 +1,53 @@ +const { v4 } = require('uuid') +const crud = require('./crud') + +const connections = {} + +const subscribe = (ws, sid) => { + const id = v4() + connections[id] = { + connection: ws, + streamid: sid, + } + + ws.send(JSON.stringify({ + event: 'info', + data: 'Welcome to APL Nuke v1.1.0!', + })) + + sendInitial(id) + + ws.on('close', () => { + delete connections[id] + }) +} + +const cModel = { + 'events': 'event', + 'matches': 'matches', + 'casters': 'casters', + 'hosts': 'hosts', +} + +const recvUpdate = async (channel, data) => { + const streams = await crud['streams'].getAll() + const stream = streams.filter(x => x[cModel[channel]]._id === data._id)[0] + const c = connections.filter(x => x.streamid === stream._id) + Object.keys(c).forEach((k) => { + c[k].connection.send(JSON.stringify(stream)) + }) +} + +const sendInitial = async (id) => { + const c = connections[id] + const stream = await crud['streams'].getById(c.streamid) + c.connection.send(JSON.stringify({ + event: 'streams:read', + data: stream, + })) +} + +module.exports = { + subscribe, + recvUpdate, +} \ No newline at end of file