Skip to content

Commit

Permalink
update proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyu8512 committed Jul 9, 2021
1 parent 9dc557e commit 7c90148
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 59 deletions.
4 changes: 2 additions & 2 deletions Telega.Rpc.Dto.Generator/Generator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.IO;
using System.Linq;
using System.Net;
Expand All @@ -21,7 +21,7 @@ public sealed class Generator : ISourceGenerator {
static readonly string[] SchemeUrls = { $"{RepoPath}/api.tl", $"{RepoPath}/mtproto.tl" };

static string[] DownloadLatestTgScheme() =>
SchemeUrls.AsParallel().Select(x => new WebClient().DownloadString(x)).ToArray();
SchemeUrls.AsParallel().Select(x => new WebClient() { Proxy = new WebProxy("127.0.0.1:8889")}.DownloadString(x)).ToArray();

// TODO: Tru to use caching interface when it become public
// https://github.com/dotnet/roslyn/blob/main/docs/features/source-generators.cookbook.md#participate-in-the-ide-experience
Expand Down
31 changes: 22 additions & 9 deletions Telega/Client/TelegramClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using Telega.Connect;
using Telega.Rpc.Dto;

namespace Telega.Client {
public sealed class TelegramClient : IDisposable {
namespace Telega.Client
{
public sealed class TelegramClient : IDisposable
{
const string DefaultTelegramIp = "149.154.167.50";
const int DefaultTelegramPort = 443;
const string DefaultSessionName = "session.dat";
Expand All @@ -29,7 +31,8 @@ public sealed class TelegramClient : IDisposable {
ILogger logger,
TgBellhop bellhop,
ISessionStore sessionStore
) {
)
{
_bellhop = bellhop;
_storeSync = SessionStoreSync.Init(_bellhop.SessionVar, sessionStore);

Expand All @@ -41,7 +44,8 @@ ISessionStore sessionStore
Updates = new TelegramClientUpdates(_bellhop);
}

public void Dispose() {
public void Dispose()
{
_bellhop.ConnectionPool.Dispose();
_storeSync.Stop();
}
Expand All @@ -51,14 +55,17 @@ static async Task<TelegramClient> Connect(
ConnectInfo connectInfo,
ISessionStore store,
TgCallMiddlewareChain? callMiddlewareChain = null,
TgProxy? proxy = null,
TcpClientConnectionHandler? tcpClientConnectionHandler = null,
ILogger? logger = null
) {
)
{
logger ??= NullLogger.Instance;
var bellhop = await TgBellhop.Connect(
logger,
connectInfo,
callMiddlewareChain,
proxy,
tcpClientConnectionHandler
).ConfigureAwait(false);
return new TelegramClient(logger, bellhop, store);
Expand All @@ -69,28 +76,34 @@ public static async Task<TelegramClient> Connect(
ISessionStore? store = null,
IPEndPoint? endpoint = null,
TgCallMiddlewareChain? callMiddlewareChain = null,
TgProxy? proxy = null,
TcpClientConnectionHandler? tcpClientConnectionHandler = null
) {
)
{
store ??= new FileSessionStore(DefaultSessionName);
var ep = endpoint ?? DefaultEndpoint;
var session = await store.Load().ConfigureAwait(false);
var connectInfo = session != null
? ConnectInfo.FromSession(session)
: ConnectInfo.FromInfo(apiId, ep);

return await Connect(connectInfo, store, callMiddlewareChain, tcpClientConnectionHandler).ConfigureAwait(false);
return await Connect(connectInfo, store, callMiddlewareChain, proxy, tcpClientConnectionHandler)
.ConfigureAwait(false);
}

public static async Task<TelegramClient> Connect(
Session session,
ISessionStore? store = null,
TgCallMiddlewareChain? callMiddlewareChain = null,
TgProxy? proxy = null,
TcpClientConnectionHandler? tcpClientConnectionHandler = null
) {
)
{
store ??= new FileSessionStore(DefaultSessionName);
var connectInfo = ConnectInfo.FromSession(session);

return await Connect(connectInfo, store, callMiddlewareChain, tcpClientConnectionHandler).ConfigureAwait(false);
return await Connect(connectInfo, store, callMiddlewareChain, proxy, tcpClientConnectionHandler)
.ConfigureAwait(false);
}

public Task<T> Call<T>(ITgFunc<T> func) =>
Expand Down
42 changes: 28 additions & 14 deletions Telega/Connect/TgBellhop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using Telega.Rpc.Dto.Types;
using Telega.Utils;

namespace Telega.Connect {
sealed class TgBellhop {
namespace Telega.Connect
{
sealed class TgBellhop
{
public TgConnectionPool ConnectionPool { get; }
public Var<TgConnection> CurrentConnection { get; }
public CustomObservable<UpdatesType> Updates { get; } = new();
Expand All @@ -22,23 +24,26 @@ sealed class TgBellhop {
public void SetSession(Func<Session, Session> func) =>
CurrentConnection.Get().Session.SetWith(func);

void MirrorUpdates(TgConnection conn) {
void MirrorUpdates(TgConnection conn)
{
conn.Transport.Transport.Updates.Subscribe(
onNext: Updates.OnNext,
onError: Updates.OnError,
onCompleted: Updates.OnCompleted
);
}

async Task<TgConnection> ChangeConn(Func<TgConnection, Task<TgConnection>> f) {
async Task<TgConnection> ChangeConn(Func<TgConnection, Task<TgConnection>> f)
{
var oldConn = CurrentConnection.Get();
var newConn = await f(oldConn).ConfigureAwait(false);
CurrentConnection.Set(newConn);
MirrorUpdates(newConn);
return newConn;
}

public TgBellhop(TgConnectionPool connectionPool, TgConnection currentConnection) {
public TgBellhop(TgConnectionPool connectionPool, TgConnection currentConnection)
{
ConnectionPool = connectionPool;
CurrentConnection = currentConnection.AsVar();
MirrorUpdates(currentConnection);
Expand All @@ -51,23 +56,29 @@ public static async Task<TgBellhop> Connect(
ILogger logger,
ConnectInfo connectInfo,
TgCallMiddlewareChain? callMiddlewareChain = null,
TgProxy? proxy = null,
TcpClientConnectionHandler? connHandler = null
) {
)
{
callMiddlewareChain ??= TgCallMiddlewareChain.Default;
var conn = await TaskWrapper.Wrap(() =>
TgConnectionEstablisher.EstablishConnection(logger, connectInfo, callMiddlewareChain, connHandler)
TgConnectionEstablisher.EstablishConnection(logger, connectInfo, callMiddlewareChain, proxy,
connHandler)
).ConfigureAwait(false);
var pool = new TgConnectionPool(logger, conn, callMiddlewareChain, connHandler);
var pool = new TgConnectionPool(logger, conn, callMiddlewareChain, proxy, connHandler);
return new TgBellhop(pool, conn);
}


async Task<T> CallWithReConnect<T>(ITgFunc<T> func) {
try {
async Task<T> CallWithReConnect<T>(ITgFunc<T> func)
{
try
{
var conn = CurrentConnection.Get();
return await conn.Transport.Call(func).ConfigureAwait(false);
}
catch (TgTransportException) {
catch (TgTransportException)
{
var oldConn = CurrentConnection.Get();
oldConn.Dispose();

Expand All @@ -76,11 +87,14 @@ async Task<T> CallWithReConnect<T>(ITgFunc<T> func) {
}
}

async Task<T> CallWithMigration<T>(ITgFunc<T> func) {
try {
async Task<T> CallWithMigration<T>(ITgFunc<T> func)
{
try
{
return await CallWithReConnect(func).ConfigureAwait(false);
}
catch (TgDataCenterMigrationException e) {
catch (TgDataCenterMigrationException e)
{
await ChangeConn(x => ConnectionPool.Connect(x, e.Dc)).ConfigureAwait(false);
return await CallWithReConnect(func).ConfigureAwait(false);
}
Expand Down
47 changes: 36 additions & 11 deletions Telega/Connect/TgConnectionEstablisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using ProxyLib.Proxy;
using Telega.Auth;
using Telega.CallMiddleware;
using Telega.Rpc;
Expand All @@ -11,41 +13,64 @@
using Telega.Rpc.ServiceTransport;
using Telega.Utils;

namespace Telega.Connect {
static class TgConnectionEstablisher {
static async Task<System.Net.Sockets.TcpClient> CreateTcpClient(
namespace Telega.Connect
{
public record TgProxy(ProxyType ProxyType, string Address, int Port);

static class TgConnectionEstablisher
{
private static readonly ProxyClientFactory ProxyClientFactory = new();

static async Task<TcpClient> CreateTcpClient(
IPEndPoint endpoint,
TgProxy? proxy = null,
TcpClientConnectionHandler? connHandler = null
) {
if (connHandler != null) {
)
{
if (connHandler != null)
{
return await connHandler(endpoint).ConfigureAwait(false);
}

var res = new System.Net.Sockets.TcpClient(endpoint.AddressFamily);
await res.ConnectAsync(endpoint.Address, endpoint.Port).ConfigureAwait(false);
TcpClient res;
if (proxy != null)
{
var pc = ProxyClientFactory.CreateProxyClient(proxy.ProxyType, proxy.Address, proxy.Port);
res = pc.CreateConnection(endpoint.Address.ToString(), endpoint.Port);
}
else
{
res = new TcpClient(endpoint.AddressFamily);
await res.ConnectAsync(endpoint.Address, endpoint.Port).ConfigureAwait(false);
}

return res;
}

public static async Task<TgConnection> EstablishConnection(
ILogger logger,
ConnectInfo connectInfo,
TgCallMiddlewareChain callMiddlewareChain,
TgProxy? proxy = null,
TcpClientConnectionHandler? connHandler = null
) {
)
{
var endpoint = connectInfo.Endpoint;
Helpers.Assert(endpoint != null, "endpoint == null");
var tcpClient = await CreateTcpClient(endpoint!, connHandler).ConfigureAwait(false);
var tcpClient = await CreateTcpClient(endpoint!, proxy, connHandler).ConfigureAwait(false);
var tcpTransport = new TcpTransport(tcpClient);

if (connectInfo.NeedsInAuth) {
if (connectInfo.NeedsInAuth)
{
var mtPlainTransport = new MtProtoPlainTransport(tcpTransport);
var result = await Authenticator.DoAuthentication(mtPlainTransport).ConfigureAwait(false);
connectInfo.SetAuth(result);
}

var session = connectInfo.ToSession().AsVar();
var mtCipherTransport = new MtProtoCipherTransport(tcpTransport, session);
var transport = new TgCustomizedTransport(new TgTransport(logger, mtCipherTransport, session), callMiddlewareChain);
var transport = new TgCustomizedTransport(new TgTransport(logger, mtCipherTransport, session),
callMiddlewareChain);

// TODO: separate Config
var config = new GetConfig();
Expand Down
Loading

0 comments on commit 7c90148

Please sign in to comment.