Звезда — на сегодняшний день самая распространенная топология компьютерных сетей. Такая структура обладает рядом преимуществ: легкостью масштабирования, надежностью (выход из строя одной машины не сказывается на других) и простота администрирования. Конечно это решение из физического уровня давно реализовано и на программном уровне. Тем не менее, представляю на суд читателей свою версию инструментария .Net для построения распределенных систем с топологией звезда.

Системы, построенные на основе такой топологии могут быть структурно организованы, например как на изображении ниже.



В этой статье я опишу создание минимального инструментария, без многих полезных фич, которые некогда использовал в своих разработках на базе представленной архитектуры. Однако этого вполне достаточно для построения реально полезных систем.

Methanum





Проект получил кодовое название Methanum исключительно из-за структурной схожести топологии с молекулой метана :). Центральный узел выполняющий роль комуникатора назван «Core». К ядру подключаются остальные приложения сети и подписываются на события. Так же каждое приложение сети может испускать события. Таким образом, через события осуществляется обмен данными в сети. События — это сериализуемый класс Event, который может содержать произвольные данные. Event минимально содержит 2 поля — строковое поле Destination, классифицирующее событие, и поле Data, содержащее словарь key value. Key — это строка, имя аргумента, Value — имеет тип object и может содержать примитивы (int, double, bool…). Для структур приходится несколько помочь системе сериализовать их.

Для начала создадим проект “methanum” библиотеки классов на C# и по ходу текста будем добавлять в него файлы.

Event





Как уже было сказано данные передаются посредством событий. Событие — это класс, включающий поле с данными Data и поле для идентификации события Destination. Также я оставил еще два поля: Id — уникальный идентификатор и DataTime содержащее время создания события. Эти дополнительные поля нужны исключительно для удобства, например, для разбора логов. Класс события так же содержит некоторое количество методов, призванных упростить жизнь программиста, их назначение думаю будет понятно из кода и в дополнительных пояснениях не нуждается.

Event.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Web.Script.Serialization;

namespace methanum
{
    [DataContract]
    [KnownType(typeof(List<DateTime>))]
    public class Event {
        /// <summary>
        /// A unique id of the event
        /// </summary>
        [DataMember]
        public Guid Id { set; get; }

        /// <summary>
        /// DateTime of event creation
        /// </summary>
        [DataMember]
        public DateTime DataTime { get; set; }

        /// <summary>
        /// Target
        /// </summary>
        [DataMember]
        public string Destination { get; set; }


        /// <summary>
        /// Data container
        /// </summary>
        [DataMember]
        public Dictionary<string, object> Data { get; set; }

        public Event() {
            Init();
        }

        public Event(string destination) {
            Init();
            Destination = destination;
        }

        private void Init() {
            Data = new Dictionary<string, object>();
            Id = Guid.NewGuid();
            DataTime = DateTime.Now;
        }


        public override string ToString() {

            var properties = GetType().GetProperties();

            var sb = new StringBuilder();
            sb.AppendFormat("[{0}]", GetType().Name);

            foreach (var property in properties) {
                if (property.Name == "Data") {
                    sb.Append("\nData = ");
                    string s = string.Format(" {0}", '{');
                    s = Data.Keys.Aggregate(s,
                        (current, key) => current + String.Format("\n  {0}\t:{1}", key, Data[key]));
                    sb.AppendFormat("{0}\n{1}", s, '}');

                }
                else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null));
            }

            return sb.ToString();
        }

        public void SetData(string key, object obj) {
            Data[key] = obj;
        }

        public object GetObj(string key) {
            return !Data.ContainsKey(key) ? null : Data[key];
        }

        public double GetDbl(string key) {
            return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]);
        }

        public int GetInt(string key) {
            return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]);
        }

        public bool GetBool(string key) {
            return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]);
        }


        public string GetStr(string key) {
            return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]);
        }

        public void SetCustomData(string key, object value) {
            var serializer = new JavaScriptSerializer();
            var str = serializer.Serialize(value);
            SetData(key, str);
        }

        public object GetCustom(string key, Type valueType) {
            if (!Data.ContainsKey(key))
                return null;

            if (Data[key].GetType() != typeof(string))
                return null;

            var serializer = new JavaScriptSerializer();
            var str = (string) Data[key];
            var obj = serializer.Deserialize(str, valueType);

            return obj;
        }
    }
}



Gate





Суть ядра заключается в реализации интерфейса, назовем его «интерфейс ворот». Основная цель ворот — предоставление функционала для регистрации клиентов и асинхронной посылки событий в обоих направлениях (от приложения к ядру и обратно).

IGate.cs
using System.ServiceModel;

namespace methanum {
    [ServiceContract(CallbackContract = typeof(IListener))]
    public interface IGate {
        [OperationContract]
        void Subscribe();

        [OperationContract]
        void KillConnection();

        [OperationContract]
        void Fire(Event evt);
    }
}



Контракт данных у нас дуплексный, в прямом направлении — от приложения к ядру — стреляем события через IGate вызовом метода void Fire(Event evt). Обратный вызов — от ядра к приложению — происходит через IListener интерфейс, о котором будет позже.
Ворота работают по следующему принципу. Когда стартует ядро, создается объект класса Gate, унаследованного от интерфейса IGate. В Gate имеется статическое поле _subscribers, в котором хранятся все активные подключения к ядру. При вызове метода Subscribe(), добавляем текущее подключение, если оно еще не было добавлено. Метод KillConnection() служит для удаления текущего подключения. Самым интересным является метод Fire(Event evt), но и в нем нет ни чего сложного. Половину метода докапываемся до Ip адреса и порта, только чтобы вывести информацию в консоль. Я оставил эту часть кода исключительно для того, чтобы продемонстрировать, как получить доступ к адресу соединения, например, чтобы фильтровать или логировать события по разрешенным адресам. Основная работа этого метода заключается в обходе всех существующих подключений и асинхронного вызова метода Receive у их слушателей IListener. Если обнаруживаем закрытое подключение, то его незамедлительно удаляем из списка активных подключений.

Gate.cs
using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.ServiceModel.Channels;

namespace methanum {
    public class Gate : IGate {
        private static List<OperationContext> _subscribers;

        public Gate() {
            if (_subscribers == null)
                _subscribers = new List<OperationContext>();
        }

        public void Subscribe() {
            var oc = OperationContext.Current;

            if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) {
                _subscribers.Add(oc);
                Console.WriteLine("(subscribe \"{0}\")", oc.SessionId);
            }
        }

        public void KillConnection() {
            var oc = OperationContext.Current;
            _subscribers.RemoveAll(c => c.SessionId == oc.SessionId);

            Console.WriteLine("(kill \"{0}\")", oc.SessionId);
        }

        public void Fire(Event evt) {
            var currentOperationContext = OperationContext.Current;
            var remoteEndpointMessageProperty =
                currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as
                    RemoteEndpointMessageProperty;
            var ip = "";
            var port = 0;

            if (remoteEndpointMessageProperty != null) {
                ip = remoteEndpointMessageProperty.Address;
                port = remoteEndpointMessageProperty.Port;
            }

            Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count);

            for (var i = _subscribers.Count - 1; i >= 0; i--) {
                var oc = _subscribers[i];

                if (oc.Channel.State == CommunicationState.Opened) {
                    var channel = oc.GetCallbackChannel<IListener>();


                    try {
                        ((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null);
                    }
                    catch (Exception e) {
                        Console.WriteLine(e.Message);
                    }
                }
                else {
                    _subscribers.RemoveAt(i);
                    Console.WriteLine("(dead . \"{0}\")", oc.SessionId);
                }
            }
        }
    }
}



Listener





Чтобы передать сообщение от ядра к клиенту достаточно одного метода Receive, который определен в интерфейсе IListener.

IListener.cs
using System.ServiceModel;

namespace methanum {
    public delegate void DelegateReceive(Event evt);
    interface IListener {
        [OperationContract(IsOneWay = true)]
        void Receive(Event evt);
    }
}



От интерфейса IListener наследуется класс Connector, который реализует всю логику взаимодействия клиентского приложения и ядра. При создании экземпляра класса создается подключение к ядру, через которое передаются и принимаются сообщения. Отправка и принятие сообщений происходит в отдельных потоках, чтобы исключить блокировку приложений и ядра. Чтобы различать события, в них есть поле Destination. Фильтровать события при помощи if-then-else или switch-case конструкций неудобно, поэтому был реализован механизм, позволяющий сопоставить каждому интересующему событию в соответствие обработчик. Такое сопоставление хранится в словаре Dictionary<string, CbHandler> _handlers;. Когда событие принято, происходит поиск в словаре и, если ключ найден, вызывается соответствующий обработчик.

Connector.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Threading;

namespace methanum {
    public delegate void CbHandler(Event evt);

    public class Connector : IListener {
        private Dictionary<string, CbHandler> _handlers;
        private NetTcpBinding _binding;
        private EndpointAddress _endpointToAddress;
        private InstanceContext _instance;
        private DuplexChannelFactory<IGate> _channelFactory;
        private IGate _channel;
        private Thread _fireThread;
        private List<Event> _eventQueue;

        public event CbHandler ReceiveEvent;

        private bool _isSubscribed;

        private object _channelSync = new object();

        protected virtual void OnReceive(Event evt) {
            CbHandler handler = ReceiveEvent;
            if (handler != null) handler.BeginInvoke(evt, null, null);
        }

        //localhost:2255
        public Connector(string ipAddress) {
            init(ipAddress);
        }

        private void init(string ipAddress) {
            _handlers = new Dictionary<string, CbHandler>();
            _binding = new NetTcpBinding();
            _endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress));
            _instance = new InstanceContext(this);

            Conect();

            _eventQueue = new List<Event>();
            
            _fireThread = new Thread(FireProc);
            _fireThread.IsBackground = true;
            _fireThread.Start();
        }

        private void Conect() {
            _isSubscribed = false;

            while (!_isSubscribed) {
                try {
                    _channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress);
                    
                    _channel = _channelFactory.CreateChannel();

                    _channel.Subscribe();
                    _isSubscribed = true;
                }
                catch (Exception e) {
                    if (!(e is EndpointNotFoundException)) throw e;

                    Thread.Sleep(1000);
                }
            }
        }

        private void ReConect() {
            lock (_channelSync) {
                try {
                    _channel.KillConnection();
                }
                catch (Exception e) {
                    Console.WriteLine("(ReConect-exception  \"{0}\"", e.Message);
                }
                Conect();
            }
        }

        public void Fire(Event evt) {
            lock (_eventQueue) {
                _eventQueue.Add(evt);
            }
        }

        private void FireProc() {
            while (true) {
                var isHasEventsToFire = false;

                lock (_eventQueue) {
                    isHasEventsToFire = _eventQueue.Any();
                }

                if (_isSubscribed && isHasEventsToFire) {
                    Event evt;

                    lock (_eventQueue) {
                        evt = _eventQueue.First();
                    }

                    try {
                        lock (_eventQueue) {
                            _eventQueue.Remove(evt);
                        }

                        _channel.Fire(evt); 
                    }
                    catch (Exception) {
                        if (_isSubscribed)
                            _isSubscribed = false;
                        ReConect();
                    }
                } else Thread.Sleep(10);
            }
        }

        public void SetHandler(string destination, CbHandler handler) {
            _handlers[destination] = handler;
        }

        public void DeleteHandler(string destination) {
            if(_handlers.ContainsKey(destination)) _handlers.Remove(destination);
        }

        public void Receive(Event evt) {
            if (_handlers.ContainsKey(evt.Destination)) {
                _handlers[evt.Destination].BeginInvoke(evt, null, null);
            }

            OnReceive(evt);
        }

        static public void HoldProcess() {
            var processName = Process.GetCurrentProcess().ProcessName;
            var defColor = Console.ForegroundColor;

            Console.ForegroundColor = ConsoleColor.Green;

            Console.WriteLine("The {0} is ready", processName);
            Console.WriteLine("Press <Enter> to terminate {0}", processName);

            Console.ForegroundColor = defColor;

            Console.ReadLine();
        }
    }
}



Для удобства создадим еще один небольшой класс, стартующий сервис.

SrvRunner.cs
using System;
using System.ServiceModel;

namespace methanum {
    public class SrvRunner {
        private ServiceHost _sHost;

        public void Start(int port) {
            var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) };
            
            _sHost = new ServiceHost(typeof (Gate), uris);

            _sHost.Open();

            foreach (var uri2 in _sHost.BaseAddresses) {
                Console.WriteLine("Start on: {0}", uri2.ToString());
            }
        }

        public void Stop() {
            _sHost.Close();
        }
    }
}



Core





Мы реализовали все классы, необходимые для комуникации наших приложений. Осталось создать ядро, к которому будут подключаться наши приложения. Для этого в решении создаем проект “Core” консольного приложения, к нему подключаем сборку methanum. Вообще, мы уже все написали, осталось только запустить.

CoreMain.cs
using System;
using System.Linq;
using methanum;

namespace Core {
    internal class CoreMain {
        private static void Main(string[] args) {
            int port = 0;
            if ((!args.Any()) || (!int.TryParse(args[0], out port))) {
                Console.WriteLine("Usage:");
                Console.WriteLine("Core.exe port");
                Environment.Exit(1);
            }

            try {
                var coreSrv = new SrvRunner();
                coreSrv.Start(port);

                Console.WriteLine("The Core is ready.");
                Console.WriteLine("Press <ENTER> to terminate Core.");
                Console.ReadLine();

                coreSrv.Stop();
            }
            catch (Exception e) {
                Console.WriteLine(e.Message);
            }
        }
    }
}



Пример использования



Для демострации создадим примитивный месенджер: создаем еще одно консольное приложение, добавляем ссылку на сборку methanum и вставляем содержимое файла Program.cs.

Program.cs
using System;
using System.Linq;
using methanum;

namespace ClentExamle {
    class Program {
        static void Main(string[] args) {

            if ((!args.Any())) {
                Console.WriteLine("Usage:");
                Console.WriteLine("ClentExample.exe coreAddress:port");
                Environment.Exit(1);
            }

            var userName = "";

            while (String.IsNullOrWhiteSpace(userName)) {
                Console.WriteLine("Please write user name:");
                userName = Console.ReadLine();   
            }

            try {
                var maingate = new Connector(args[0]);

                maingate.SetHandler("message", MsgHandler);

                Console.WriteLine("Hello {0}, now you can send messages", userName);

                while (true) {
                    var msg = Console.ReadLine();
                    var evt = new Event("message");
                    evt.SetData("name", userName);
                    evt.SetData("text", msg);

                    maingate.Fire(evt);
                }
            }
            catch (Exception e) {
                Console.WriteLine(e.Message);
            }
        }

        static private void MsgHandler(Event evt) {
            Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text"));
        }

    }
}



Теперь запускаем приложение Core.exe указав в командной строке порт, например “Core 2255”. Затем стартуем несколько экземпляров ClentExample.exe командой “ClentExample localhost:2255”. Приложения предлагают ввести имя пользователя, после чего подключаются к ядру. В результате, получается широковещательный примитивный чат: каждое новое сообщение посылается вызовом maingate.Fire(evt), принимается в обработчике MsgHandler(Event evt).



Полный исходник доступен на гихабе methanum.

Комментарии (5)


  1. GreyCat
    01.04.2016 19:52
    +1

    Звезда — на сегодняшний день самая распространенная топология компьютерных сетей.

    Зависит, конечно, от определений, но вообще нет. Если считать интернет самой большой и объемной компьютерной сетью, то в основе такой сети исторически дерево, дополненное до многоуровневого графа. Если считать топологией не только железки, но и гиперпространство поверх них, то всякие умные люди, пишущие статьи, придумали для этого термины Jellyfish (медуза), либо Bow Tie (галстук-бабочка).
    Такая структура обладает рядом преимуществ: [...] надежностью

    Я вас умоляю. По-моему, это в книжках только при сравнении с токенрингом или firewire пишут. Коммутатор — single point of failure, куда ж ненадежнее.
    А если по существу — то вы написали n+1-ую messaging bus. Поясните, что ли, чем она лучше всех существующих и стандартизированных решений (всех MQ, ESB / JMS, ORB, MQTT, да хоть того же dbus — в общем, в википедии есть не такой маленький список)? Насколько я понимаю, не то, что нет гарантированной доставки, а нет даже концепции очереди и какого-то ее хранения? Сообщения, судя по коду, пакуются в JSON — не кажется ли, что это как-то, несколько неоптимально для внутрипрограммной шины?


    1. icCE
      02.04.2016 13:05

      Ну застрял человек немного в прошлом, ну бывает :) Сам хотел написать про звезду и надежность.


    1. Beetle_ru
      04.04.2016 11:59

      Зависит, конечно, от определений, но вообще нет. Если считать интернет самой большой и объемной компьютерной сетью, то в основе такой сети исторически дерево, дополненное до многоуровневого графа. Если считать топологией не только железки, но и гиперпространство поверх них, то всякие умные люди, пишущие статьи, придумали для этого термины Jellyfish (медуза), либо Bow Tie (галстук-бабочка).

      У дерева a priori есть корень. Где же корень интернет?)
      Я уверен, что у большинства читателей как минимум дома стоит роутер, к которому цепляются остальные железки, а это и есть звезда.
      Вообще-то в тексте сразу поясняется, о какой надежности речь: «(выход из строя одной машины не сказывается на других)». Конечно, выход из строя концентратора приведет к выходу из строя всей сети. Именно поэтому программная реализация ядра должна быть предельно проста и надежна.
      А если по существу — то вы написали n+1-ую messaging bus
      – абсолютно верно.
      Поясните, что ли, чем она лучше всех существующих и стандартизированных решений
      – наверно минимализмом. Ну в самом деле, посмотрите в исходники существующих решений
      Насколько я понимаю, не то, что нет гарантированной доставки, а нет даже концепции очереди и какого-то ее хранения?
      – Если нужно, добавить очередь в methanum, не так-то и сложно.
      Сообщения, судя по коду, пакуются в JSON — не кажется ли, что это как-то, несколько неоптимально для внутрипрограммной шины?
      — Сами события сериализуются в бинарный формат, при этом по умолчанию они могут содержать словарь примитивов (int, double, bool…). Т.к. первоначально система создавалась для сбора и обработки данных с промышленных контроллеров, где примитивов было вполне достаточно. Если нужно передавать пользовательскую структуру, можно добавить ее в качестве известного типа, например «[KnownType(typeof(List))]». Или можно запаковать объект ну скажем в строку, в данном случае в JSON. И кстати нигде и речи не идет, что данное решение исключительно внутрипрограммное.

      Данная статья задумывалась, чтобы поделиться опытом, как сделать программную реализацию звезды своими руками, без каких либо зависимостей и сторонних проектов. Проект methanum абсолютно не претендует на звание нового и уникального инструмента. Однако, в некоторых случаях может быть очень полезен, его можно быстро заточить под свои нужды.


  1. Scratch
    02.04.2016 22:40
    +1

    EasyNetQ — и пусть весь мир подождет


  1. hensew
    03.04.2016 17:39

    Сравнения с MQTT, WAMP или ZeroMQ не хватает. Добавьте, если есть возможность.