Skip to content

Commit

Permalink
Auth interceptor (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
Inveracity authored Jul 22, 2023
1 parent df25b59 commit 8b95bb2
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 135 deletions.
2 changes: 0 additions & 2 deletions frontend/src/components/Channels.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
const selectChannel = (e: any, channelName: string) => {
e.preventDefault();
console.log('selectedChannel: ' + channelName);
channel.set(channelName);
};
Expand All @@ -26,7 +25,6 @@
SendMessage({
channelId: 'system',
text: `channel_add ${name}`,
jwt: pb.authStore.token,
userId: pb.authStore.model?.name || ''
});
newChannelActive = false;
Expand Down
16 changes: 5 additions & 11 deletions frontend/src/lib/grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const transport = new GrpcWebFetchTransport({

let controller = new AbortController();

export const Connect = async (serverId: string, userId: string, timestamp: string, jwt: string) => {
export const Connect = async (serverId: string, userId: string, timestamp: string) => {
// While the connection is attempting to open, let the UI show a pending state
status.pending();

Expand All @@ -37,7 +37,7 @@ export const Connect = async (serverId: string, userId: string, timestamp: strin
}

// The abort controller is used to signal the server to close the stream
const opts = transport.mergeOptions({ abort: controller.signal });
const opts = transport.mergeOptions({ abort: controller.signal, meta: { jwt: pb.authStore.token } });

// Get the last timestamp from the cache
const lastTs = get(chat_cache).lastTs
Expand All @@ -47,7 +47,6 @@ export const Connect = async (serverId: string, userId: string, timestamp: strin
serverId: serverId,
userId: userId,
lastTs: timestamp,
jwt: jwt,
}, opts);


Expand Down Expand Up @@ -98,18 +97,16 @@ export const Disconnect = async () => {

export const SendMessage = (msg: OutgoingMessage) => {
const client = new ChatServiceClient(transport);

const opts = transport.mergeOptions({ meta: { jwt: pb.authStore.token } })
const request: ChatMessage = {
channelId: msg.channelId,
userId: msg.userId,
text: msg.text,
ts: "0", // The server will set the timestamp
jwt: msg.jwt,
};

client.send(request).then((response) => {

console.log(response.status.code);
client.send(request, opts).then((_) => {
// nothing
}).catch((e) => {
console.log(e);
});
Expand Down Expand Up @@ -157,18 +154,15 @@ const filter_system_messages = (msg: ChatMessage): boolean => {
// Tell UI to show new channel when another user adds one
if (msg.text.startsWith("channel_add") && msg.userId !== pb.authStore.model?.name) {
const channel_name = msg.text.split(" ")[1]
console.log(channel_name)
channels.add(channel_name);
}

if (msg.text.startsWith("connected")) {
console.log("connected", msg.userId)
const user: User = { name: msg.userId, presence: true }
users.upd(user);
}

if (msg.text.startsWith("disconnected")) {
console.log("disconnected", msg.userId)
const user: User = { name: msg.userId, presence: false }
users.upd(user);
}
Expand Down
30 changes: 4 additions & 26 deletions frontend/src/lib/proto/chat/v1/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ export interface ConnectRequest {
* @generated from protobuf field: string last_ts = 3;
*/
lastTs: string; // last timestamp received by the client
/**
* @generated from protobuf field: string jwt = 4;
*/
jwt: string;
}
/**
* Chat server stream after subscribing to a channel
Expand All @@ -57,10 +53,6 @@ export interface ChatMessage {
* @generated from protobuf field: string ts = 4;
*/
ts: string; // timestamp
/**
* @generated from protobuf field: string jwt = 5;
*/
jwt: string;
}
/**
* The response payload after sending a notification
Expand All @@ -83,12 +75,11 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
super("proto.chat.v1.ConnectRequest", [
{ no: 1, name: "server_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "user_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "last_ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "jwt", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
{ no: 3, name: "last_ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<ConnectRequest>): ConnectRequest {
const message = { serverId: "", userId: "", lastTs: "", jwt: "" };
const message = { serverId: "", userId: "", lastTs: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<ConnectRequest>(this, message, value);
Expand All @@ -108,9 +99,6 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
case /* string last_ts */ 3:
message.lastTs = reader.string();
break;
case /* string jwt */ 4:
message.jwt = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
Expand All @@ -132,9 +120,6 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
/* string last_ts = 3; */
if (message.lastTs !== "")
writer.tag(3, WireType.LengthDelimited).string(message.lastTs);
/* string jwt = 4; */
if (message.jwt !== "")
writer.tag(4, WireType.LengthDelimited).string(message.jwt);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
Expand All @@ -152,12 +137,11 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
{ no: 1, name: "channel_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "user_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "text", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 5, name: "jwt", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
{ no: 4, name: "ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<ChatMessage>): ChatMessage {
const message = { channelId: "", userId: "", text: "", ts: "", jwt: "" };
const message = { channelId: "", userId: "", text: "", ts: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<ChatMessage>(this, message, value);
Expand All @@ -180,9 +164,6 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
case /* string ts */ 4:
message.ts = reader.string();
break;
case /* string jwt */ 5:
message.jwt = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
Expand All @@ -207,9 +188,6 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
/* string ts = 4; */
if (message.ts !== "")
writer.tag(4, WireType.LengthDelimited).string(message.ts);
/* string jwt = 5; */
if (message.jwt !== "")
writer.tag(5, WireType.LengthDelimited).string(message.jwt);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
Expand Down
1 change: 0 additions & 1 deletion frontend/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ export interface OutgoingMessage {
channelId: string;
userId: string;
text: string;
jwt: string;
}
2 changes: 0 additions & 2 deletions proto/chat/v1/chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ message ConnectRequest {
string server_id = 1;
string user_id = 2;
string last_ts = 3; // last timestamp received by the client
string jwt = 4;
}

// Chat server stream after subscribing to a channel
Expand All @@ -23,7 +22,6 @@ message ChatMessage {
string user_id = 2;
string text = 3;
string ts = 4; // timestamp
string jwt = 5;
}

// The response payload after sending a notification
Expand Down
80 changes: 31 additions & 49 deletions relay/internal/proto/chat/v1/chat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 18 additions & 15 deletions relay/internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"fmt"
"log"
"net"
"time"

"github.com/inveracity/svelte-grpc-stream/internal/auth"
"github.com/inveracity/svelte-grpc-stream/internal/cache"
pb "github.com/inveracity/svelte-grpc-stream/internal/proto/chat/v1"
"github.com/inveracity/svelte-grpc-stream/internal/server"

"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type Relay struct {
server *server.Server
port int
server *server.Server
port int
pbURL string
pbAdmin string
pbPass string
}

func NewRelay(port int, natsURL, redisURL, pbURL, pbAdmin, pbPass string) *Relay {
Expand All @@ -29,10 +31,13 @@ func NewRelay(port int, natsURL, redisURL, pbURL, pbAdmin, pbPass string) *Relay

cache := cache.NewCache(redisClient)

grpcServer := server.NewServer(natsURL, pbURL, pbAdmin, pbPass, cache)
grpcServer := server.NewServer(natsURL, cache)
return &Relay{
port: port,
server: grpcServer,
port: port,
server: grpcServer,
pbURL: pbURL,
pbAdmin: pbAdmin,
pbPass: pbPass,
}
}

Expand All @@ -42,16 +47,14 @@ func (r *Relay) Run() error {
log.Fatalf("failed to listen: %v", err)
}

authMgr := auth.New(r.pbURL, r.pbAdmin, r.pbPass)
interceptor := server.NewAuthInterceptor(authMgr)

s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Duration(1 * time.Second),
PermitWithoutStream: true, // Allow pings even when there are no active streams
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: time.Duration(2 * time.Hour),
Timeout: time.Duration(20 * time.Second),
}),
grpc.UnaryInterceptor(interceptor.Unary()),
grpc.StreamInterceptor(interceptor.Stream()),
)

pb.RegisterChatServiceServer(s, r.server)

log.Printf("GRPC: server listening at %v", lis.Addr())
Expand Down
Loading

0 comments on commit 8b95bb2

Please sign in to comment.