Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auth interceptor #10

Merged
merged 4 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions frontend/src/components/Channels.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

const selectChannel = (e: any, channelName: string) => {
e.preventDefault();
console.log('selectedChannel: ' + channelName);
channel.set(channelName);
};

Expand All @@ -26,7 +25,6 @@
SendMessage({
channelId: 'system',
text: `channel_add ${name}`,
jwt: pb.authStore.token,
userId: pb.authStore.model?.name || ''
});
newChannelActive = false;
Expand Down
16 changes: 5 additions & 11 deletions frontend/src/lib/grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const transport = new GrpcWebFetchTransport({

let controller = new AbortController();

export const Connect = async (serverId: string, userId: string, timestamp: string, jwt: string) => {
export const Connect = async (serverId: string, userId: string, timestamp: string) => {
// While the connection is attempting to open, let the UI show a pending state
status.pending();

Expand All @@ -37,7 +37,7 @@ export const Connect = async (serverId: string, userId: string, timestamp: strin
}

// The abort controller is used to signal the server to close the stream
const opts = transport.mergeOptions({ abort: controller.signal });
const opts = transport.mergeOptions({ abort: controller.signal, meta: { jwt: pb.authStore.token } });

// Get the last timestamp from the cache
const lastTs = get(chat_cache).lastTs
Expand All @@ -47,7 +47,6 @@ export const Connect = async (serverId: string, userId: string, timestamp: strin
serverId: serverId,
userId: userId,
lastTs: timestamp,
jwt: jwt,
}, opts);


Expand Down Expand Up @@ -98,18 +97,16 @@ export const Disconnect = async () => {

export const SendMessage = (msg: OutgoingMessage) => {
const client = new ChatServiceClient(transport);

const opts = transport.mergeOptions({ meta: { jwt: pb.authStore.token } })
const request: ChatMessage = {
channelId: msg.channelId,
userId: msg.userId,
text: msg.text,
ts: "0", // The server will set the timestamp
jwt: msg.jwt,
};

client.send(request).then((response) => {

console.log(response.status.code);
client.send(request, opts).then((_) => {
// nothing
}).catch((e) => {
console.log(e);
});
Expand Down Expand Up @@ -157,18 +154,15 @@ const filter_system_messages = (msg: ChatMessage): boolean => {
// Tell UI to show new channel when another user adds one
if (msg.text.startsWith("channel_add") && msg.userId !== pb.authStore.model?.name) {
const channel_name = msg.text.split(" ")[1]
console.log(channel_name)
channels.add(channel_name);
}

if (msg.text.startsWith("connected")) {
console.log("connected", msg.userId)
const user: User = { name: msg.userId, presence: true }
users.upd(user);
}

if (msg.text.startsWith("disconnected")) {
console.log("disconnected", msg.userId)
const user: User = { name: msg.userId, presence: false }
users.upd(user);
}
Expand Down
30 changes: 4 additions & 26 deletions frontend/src/lib/proto/chat/v1/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ export interface ConnectRequest {
* @generated from protobuf field: string last_ts = 3;
*/
lastTs: string; // last timestamp received by the client
/**
* @generated from protobuf field: string jwt = 4;
*/
jwt: string;
}
/**
* Chat server stream after subscribing to a channel
Expand All @@ -57,10 +53,6 @@ export interface ChatMessage {
* @generated from protobuf field: string ts = 4;
*/
ts: string; // timestamp
/**
* @generated from protobuf field: string jwt = 5;
*/
jwt: string;
}
/**
* The response payload after sending a notification
Expand All @@ -83,12 +75,11 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
super("proto.chat.v1.ConnectRequest", [
{ no: 1, name: "server_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "user_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "last_ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "jwt", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
{ no: 3, name: "last_ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<ConnectRequest>): ConnectRequest {
const message = { serverId: "", userId: "", lastTs: "", jwt: "" };
const message = { serverId: "", userId: "", lastTs: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<ConnectRequest>(this, message, value);
Expand All @@ -108,9 +99,6 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
case /* string last_ts */ 3:
message.lastTs = reader.string();
break;
case /* string jwt */ 4:
message.jwt = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
Expand All @@ -132,9 +120,6 @@ class ConnectRequest$Type extends MessageType<ConnectRequest> {
/* string last_ts = 3; */
if (message.lastTs !== "")
writer.tag(3, WireType.LengthDelimited).string(message.lastTs);
/* string jwt = 4; */
if (message.jwt !== "")
writer.tag(4, WireType.LengthDelimited).string(message.jwt);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
Expand All @@ -152,12 +137,11 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
{ no: 1, name: "channel_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "user_id", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "text", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 5, name: "jwt", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
{ no: 4, name: "ts", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<ChatMessage>): ChatMessage {
const message = { channelId: "", userId: "", text: "", ts: "", jwt: "" };
const message = { channelId: "", userId: "", text: "", ts: "" };
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<ChatMessage>(this, message, value);
Expand All @@ -180,9 +164,6 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
case /* string ts */ 4:
message.ts = reader.string();
break;
case /* string jwt */ 5:
message.jwt = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
Expand All @@ -207,9 +188,6 @@ class ChatMessage$Type extends MessageType<ChatMessage> {
/* string ts = 4; */
if (message.ts !== "")
writer.tag(4, WireType.LengthDelimited).string(message.ts);
/* string jwt = 5; */
if (message.jwt !== "")
writer.tag(5, WireType.LengthDelimited).string(message.jwt);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
Expand Down
1 change: 0 additions & 1 deletion frontend/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ export interface OutgoingMessage {
channelId: string;
userId: string;
text: string;
jwt: string;
}
2 changes: 0 additions & 2 deletions proto/chat/v1/chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ message ConnectRequest {
string server_id = 1;
string user_id = 2;
string last_ts = 3; // last timestamp received by the client
string jwt = 4;
}

// Chat server stream after subscribing to a channel
Expand All @@ -23,7 +22,6 @@ message ChatMessage {
string user_id = 2;
string text = 3;
string ts = 4; // timestamp
string jwt = 5;
}

// The response payload after sending a notification
Expand Down
80 changes: 31 additions & 49 deletions relay/internal/proto/chat/v1/chat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 18 additions & 15 deletions relay/internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"fmt"
"log"
"net"
"time"

"github.com/inveracity/svelte-grpc-stream/internal/auth"
"github.com/inveracity/svelte-grpc-stream/internal/cache"
pb "github.com/inveracity/svelte-grpc-stream/internal/proto/chat/v1"
"github.com/inveracity/svelte-grpc-stream/internal/server"

"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type Relay struct {
server *server.Server
port int
server *server.Server
port int
pbURL string
pbAdmin string
pbPass string
}

func NewRelay(port int, natsURL, redisURL, pbURL, pbAdmin, pbPass string) *Relay {
Expand All @@ -29,10 +31,13 @@ func NewRelay(port int, natsURL, redisURL, pbURL, pbAdmin, pbPass string) *Relay

cache := cache.NewCache(redisClient)

grpcServer := server.NewServer(natsURL, pbURL, pbAdmin, pbPass, cache)
grpcServer := server.NewServer(natsURL, cache)
return &Relay{
port: port,
server: grpcServer,
port: port,
server: grpcServer,
pbURL: pbURL,
pbAdmin: pbAdmin,
pbPass: pbPass,
}
}

Expand All @@ -42,16 +47,14 @@ func (r *Relay) Run() error {
log.Fatalf("failed to listen: %v", err)
}

authMgr := auth.New(r.pbURL, r.pbAdmin, r.pbPass)
interceptor := server.NewAuthInterceptor(authMgr)

s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Duration(1 * time.Second),
PermitWithoutStream: true, // Allow pings even when there are no active streams
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: time.Duration(2 * time.Hour),
Timeout: time.Duration(20 * time.Second),
}),
grpc.UnaryInterceptor(interceptor.Unary()),
grpc.StreamInterceptor(interceptor.Stream()),
)

pb.RegisterChatServiceServer(s, r.server)

log.Printf("GRPC: server listening at %v", lis.Addr())
Expand Down
Loading
Loading